Python数据科学速查表 - Spark SQL 基础.pdf
Python 数据科学 速查表 速查表PySpark-SQL 基础天善智能天善智能 商业智能与大数据社区 DataCampLearn Python for Data Science Interactively from pyspark.sql import SparkSession spark=SparkSession .builder .appName(Python Spark SQL basic example).config(spark.some.config.option,some-value).getOrCreate()PySpark 与 Spark SQL spark.stop()终止 SparkSession df.select(firstName,city).write .save(nameAndCity.parquet)df.select(firstName,age).write .save(namesAndAges.json,format=json)创建数据框从 Spark 数据源创建 查询 from pyspark.sql import functions as F Select df.select(firstName).show()显示 firstName 列的所有条目 df.select(firstName,lastName).show()df.select(firstName,显示 firstName、age 的所有条目和类型 age,explode(phoneNumber).alias(contactInfo).select(contactInfo.type,firstName,age).show()df.select(dffirstName,dfage+1).show()df.select(dfage 24).show()When df.select(firstName,F.when(df.age 30,1).otherwise(0).show()dfdf.firstName.isin(Jane,Boris).collect()Like df.select(firstName,df.lastName.like(Smith).show()Startswith-Endswith df.select(firstName,df.lastName.startswith(Sm).show()df.select(df.lastName.endswith(th).show()Substring df.select(df.firstName.substr(1,3).alias(name).collect()Between df.select(df.age.between(22,24).show()显示 firstName 和 age 列的所有记录,并对 age 记录添加1显示所有小于24岁的记录显示 firstName,且大于30岁显示1,小于30岁显示0显示符合指定条件的 firstName 列的记录显示 lastName 列中包含 Smith 的 firstName 列的记录显示 lastName 列中以 Sm 开头的 firstName 列的记录显示以 th 结尾的 lastName返回 firstName 的子字符串显示介于22岁至24岁之间的 age 列的记录运行 SQL 查询 df5=spark.sql(SELECT*FROM customer).show()peopledf2=spark.sql(SELECT*FROM global_temp.people).show()添加、修改、删除列 df=df.withColumn(city,df.address.city).withColumn(postalCode,df.address.postalCode).withColumn(state,df.address.state).withColumn(streetAddress,df.address.streetAddress).withColumn(telePhoneNumber,explode(df.phoneNumber.number).withColumn(telePhoneType,explode(df.phoneNumber.type)df=df.drop(address,phoneNumber)df=df.drop(df.address).drop(df.phoneNumber)df=df.dropDuplicates()df=df.withColumnRenamed(telePhoneNumber,phoneNumber)重复值添加列修改列删除列 JSON df=spark.read.json(customer.json)df.show()+-+-+-+-+-+|address|age|firstName|lastName|phoneNumber|+-+-+-+-+-+|New York,10021,N.|25|John|Smith|212 555-1234,ho.|New York,10021,N.|21|Jane|Doe|322 888-1234,ho.|+-+-+-+-+-+df2=spark.read.load(people.json,format=json)Parquet 文件 df3=spark.read.load(users.parquet)文本文件 df4=spark.read.text(people.txt)初始化 SparkSession from pyspark.sql.types import*推断 Schema sc=spark.sparkContext lines=sc.textFile(people.txt)parts=lines.map(lambda l:l.split(,)people=parts.map(lambda p:Row(name=p0,age=int(p1)peopledf=spark.createDataFrame(people)指定 Schema people=parts.map(lambda p:Row(name=p0,age=int(p1.strip()schemaString=name age fields=StructField(field_name,StringType(),True)for field_name in schemaString.split()schema=StructType(fields)spark.createDataFrame(people,schema).show()+-+-+|name|age|+-+-+|Mine|28|Filip|29|Jonathan|30|+-+-+查阅数据信息 排序 peopledf.sort(peopledf.age.desc().collect()df.sort(age,ascending=False).collect()df.orderBy(age,city,ascending=0,1).collect()替换缺失值 peopledf.createGlobalTempView(people)df.createTempView(customer)df.createOrReplaceTempView(customer)将数据框注册为视图查询视图分组 df.na.fill(50).show()df.na.drop().show()df.na .replace(10,20)用一个值替换空值去除 df 中为空值的行用一个值替换另一个值 .show()df.groupBy(age).count()按 age 列分组,统计每组人数 .show()df.describe().show()df.columns df.count()df.distinct().count()df.printSchema()df.explain()汇总统计数据返回 df 的列名返回 df 的行数返回 df 中不重复的行数返回 df的 Schema返回逻辑与实体方案 df.dtypes df.show()df.head()df.first()df.take(2)df.schema返回 df 的列名与数据类型显示 df 的内容返回前 n 行数据返回第 1 行数据返回前 n 行数据返回 df 的 Schema 筛选 df.filter(dfage24).show()按 age 列筛选,保留年龄大于24岁的 输出数据结构保存至文件 rdd1=df.rdd df.toJSON().first()df.toPandas()将 df 转换为 RDD将 df 转换为 RDD 字符串将 df 的内容转为 Pandas 的数据框重分区 df.repartition(10)将 df 拆分为10个分区 .rdd .getNumPartitions()df.coalesce(1).rdd.getNumPartitions()将 df 合并为1个分区Spark SQLSpark SQL 是 Apache Spark 处理结构化数据的模块。呆鸟译呆鸟译从 RDDRDD 创建原文作者原文作者SparkSession 用于创建数据框,将数据框注册为表,执行 SQL 查询,缓存表及读取 Parquet 文件。