1 SparkSQL概述
1.1 SparkSQL简介
SparkSQL 是Spark的一个模块, 用于处理海量结构化数据。
SparkSQL是非常成熟的 海量结构化数据处理框架,学习SparkSQL主要在2个点:
- SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等
企业大面积在使用SparkSQL处理业务数据
- 离线开发
- 数仓搭建
- 科学计算
- 数据分析
特点:
- 融合性:SQL可以无缝集成在代码中, 随时用SQL处理数据
- 统一数据访问:一套标准API可读写不同数据源
- Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
- 标准化连接:支持标准化JDBC\ODBC连接, 方便和各种数据库进行数据交互.
1.2 SparkSQL和Hive的异同
Hive和Spark 均是:“分布式SQL计算引擎”
均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级
1.3 Spark数据抽象
Pandas -- DataFrame
- 二维表数据结构
- 单机(本地)集合
SparkCore -- RDD
- 无标准数据结构,存储什么数据均可
- 分布式集合(分区)
SparkSQL -- DataFrame
- 二维表数据结构
- 分布式集合(分区)
SparkSQL For JVM -- DataSet
- 可用于Java、Scala语言
SparkSQL For Python\R -- DataSet
- 可用于Java、Scala、Python、R语言
发展历程:
- 14年最早的数据抽象是:SchemaRDD(内部存储二维表数据结构的RDD),SchemaRDD就是魔改的RDD,将RDD支持的存储数据,限定为二维表数据结构用以支持SQL查询。由于是魔改RDD,只是一个过渡产品,现已废弃。
- 15年发布DataFrame对象,基于Pandas的DataFrame(模仿)独立于RDD进行实现,将数据以二维表结构进行存储并支持分布式运行
- 16年发布DataSet对象,在DataFrame之上添加了泛型的支持,用以更好的支持Java和Scala这两个支持泛型的编程语言
- 16年,Spark2.0版本,将DataFrame和DataSet进行合并。其底层均是DataSet对象,但在Python和R语言到用时,显示为DataFrame对象。和老的DataFrame对象没有区别
1.4 DataFrame概述
假定有如下数据集:
DataFrame按二维表格存储
RDD按数组对象存储
1.5 SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext,在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建SparkSession对象,这个对象是 构建者模式 ,通过builder方法来构建
spark = SparkSession.builder.\
appName('local[*]').\
config('spark.sql.shuffle.partitions', '4').\
getOrCreate()
# appName 设置程序名称,config设置一些常用属性
# 最后通过getOrCreate()方法,创建SparkSession对象
# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
spark = SparkSession.builder.\
appName("local[*]").\
config("spark.sql.shuffle.partitions", "4").\
getOrCreate()
# appName 设置程序名称, config设置一些常用属性
# 最后通过getOrCreate()方法 创建SparkSession对象
df = spark.read.csv('../data/sql/stu_score.txt', sep=',', header=False)
df2 = df.toDF('id', 'name', 'score')
df2.printSchema()
df2.show()
df2.createTempView("score")
# SQL 风格
spark.sql("""
SELECT * FROM score WHERE name='语文' LIMIT 5
""").show()
# DSL 风格
df2.where("name='语文'").limit(5).show()
2 DataFrame入门
2.1 DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
- 行
- 列
- 表结构描述
比如,在MySQL中的一张表:
- 由许多行组成
- 数据也被分成多个列
- 表也有表结构信息(列、列名、列类型、列约束等)
基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
2.2 DataFrame的构建
2.2.1 基于RDD方式1 createDataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))
# 将rdd转化为DataFrame
# 参数1:被转换的RDD
# 参数2:指定列名,通过list的形式指定,按照顺序依次提供字符串名字即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])
# 打印DataFrame的表结构
df.peintSchema()
# 打印df中的数据
# 参数1:表示展示出多少条数据,默认不传的话是20
# 参数2:表示是否对列进行截取,如果列的数据长度超过20个字符串长度,后续的内容不显示以...代替
# 如果给False表示不截断、全部显示,默认是True
df.show(20, False)
# 将DF对象转换成临时视图表,可供sql语句查询
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age<30").show()
2.2.2 基于RDD方式2 加入schema
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))
# 构建表结构的描述对象:StructType对象
# 三个参数分别为列名,类型,是否为空
schema = StructType().add("name", StringType(), nullable=True).add("age",IntergerType(), nullable=False)
# 将rdd转化为DataFrame
# 参数1:被转换的RDD
# 参数2:将创建的StructType对象传进去
df = spark.createDataFrame(rdd, schema=schema)
2.2.3 基于RDD方式3 RDD的toDF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))
# toDF的方式构建DataFrame
df1 = rdd.toDF(["name", "age"])
df1.printSchema()
df1.show()
# toDF的方式2 通过StructType来构建
schema = StructType().add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()
2.2.4 基于Pandas的DataFrame构建
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
pdf = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["张大仙", "王晓晓", "吕不为"],
"age": [11, 21, 11]
}
)
df = spark.createDataFrame(pdf)
df.printSchema()
df.show()
2.3 读取外部数据
通过SparkSQL的统一API进行数据读取构建DataFrame。统一API示例代码:
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")
text:
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 构建StructType, text数据源, 读取数据的特点是, 将一整行只作为`一个列`读取, 默认列名是value 类型是String
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text").\
schema(schema=schema).\
load("../data/input/sql/people.txt")
json:
# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df = spark.read.format("json").load("../data/input/sql/people.json")
df.printSchema()
df.show()
csv:
df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV标头
.option("encoding", "utf-8")\ # 编码
.schema("name STRING, age INT, job STRING")\ # 指定列名和类型
.load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()
parquet:
parquet是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式。parquet对比普通的文本文件的区别:
- parquet 内置schema (列名\ 列类型\ 是否为空)
- 存储是以列作为存储格式
- 存储是序列化存储在文件中的(有压缩属性体积小)
Parquet文件不能直接打开查看,如果想要查看内容可以在PyCharm中安装如下插件来查看:
2.4 DSL风格入门API
DSL称之为:领域特定语言。
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data
比如:df.where().limit()
2.5 SQL风格入门API
SQL风格就是使用SQL语句处理DataFrame的数据
比如:spark.sql("SELECT * FROM xxx")
2.6 wordcount案例
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F
if __name__ == '__main__':
# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# TODO 1: SQL 风格进行处理
rdd = sc.textFile("../data/input/words.txt").\
flatMap(lambda x: x.split(" ")).\
map(lambda x: [x])
df = rdd.toDF(["word"])
# 注册DF为表格
df.createTempView("words")
spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()
# TODO 2: DSL 风格处理
df = spark.read.format("text").load("../data/input/words.txt")
# withColumn方法
# 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
df2.groupBy("value").\
count().\
withColumnRenamed("value", "word").\
withColumnRenamed("count", "cnt").\
orderBy("cnt", ascending=False).\
show()
3 UDF
4 SparkSQL的运行流程
4.1 RDD执行流程回顾
代码 -> DAG调度器逻辑任务 -> Task调度器任务分配和管理监控 -> Worker干活
4.2 SparkSQL的自动优化
RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。
这是因为RDD内含数据类型不限格式和结构,而DataFrame是二维表结构,可以被针对,SparkSQL的自动优化,依赖于Catalyst优化器。
4.3 Catalyst优化器
为了解决过多依赖Hive的问题,SparkSQL使用了一个新的SQL优化器替代Hive中的优化器,这个优化器就是Catalyst,整个SparkSQL的框架大致如下:
- 简单的说,API层即Spark会通过一些API接受SQL语句
- 收到SQL语句后,将其交给Catalyst,Catalyst负责解析SQL,生成执行计划等
- Catalyst的输出应该是RDD的执行计划
- 最终交由集群运行
具体优化流程:
step1:解析SQL,并生成AST(抽象语法树)
step2:在AST中加入元数据信息,做这一步主要是为了一些优化,例如col = col 这样的条件,下图是一个简略图,便于理解
score.id -> id#1#L
为score.id生成id为1,类型是Long
score.math_score -> math_score#2#L
为score.math_score生成id为2,类型为Long
people.id -> id#3#L
为people.id生成id为3,类型为Long
people.age -> age#4#L
为people.age生成id为4,类型为Long
set3:对已经加入元数据的AST,输入优化器,进行优化,从两种常见的优化开始,简单介绍:
断言下推 Predicate Pushdown,将Filter 这种可以减小数据集的操作下推,放在Sacn的位置,这样可以减少操作时候的数据量。(又称谓词下推)
SELECT sum(v) FROM( SELECT score_id, 100+80+score.math_score AS v FROM people JOIN score WHERE people.id = score.id AND people.age > 10 )tmp -- 如这个代码,正常流程是先JOIN然后WHERE,断言下推后,会先过滤age,然后JOIN,减少JOIN的数据量以提高性能
- 列值裁剪 Column Pruning,在断言下推后执行裁剪,由于people表之上的操作中用到了id列,所以可以把其他列裁剪掉,这样可以减少处理的数据量,从而优化处理速度。如下图,在scan前又加入了Filter,作为列裁剪用
step4:上面的过程生成的AST其实最终还没办法直接运行,这个AST叫做逻辑计划,结束后,需要生成物理计划,从而生成RDD来运行,在生成物理计划的时候,会经过成本模型对整个树再次执行优化,选择一个更好的计划;在生成物理计划之后,因为考虑到性能,所以会使用代码生成,在机器中运行。
可以使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划。
spark.sql("SELECT name,age FROM people WHERE age>19").explain(True)
总结:
Catalyst的各种优化细节非常多,大方面的优化点有两个:
- 谓词下推(Predicate Pushdown)\断言下推:将逻辑判断提前,以减少shuffle阶段的数据量【行过滤,提前执行where】;
- 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度【列过滤,提前规划select的组字段数量】。
4.4 SparkSQL的执行流程
- 提交SparkSQL代码
Catalyst优化
- 生成原始AST语法树
- 标记AST元数据
- 进行断言下推和列值裁剪有以及其他方面的优化作用在AST上
- 得到最终AST,生成执行计划
- 将执行计划翻译为RDD代码
- Driver执行环境入口构建(SparkSession)
- DAG调度器规划逻辑任务
- Task调度区分配逻辑任务到具体Executor上工作并监控管理任务
- Worker干活