1 SparkSQL概述

1.1 SparkSQL简介

SparkSQL 是Spark的一个模块, 用于处理海量结构化数据。

SparkSQL是非常成熟的 海量结构化数据处理框架,学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等
  • 企业大面积在使用SparkSQL处理业务数据
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

特点:

  1. 融合性:SQL可以无缝集成在代码中, 随时用SQL处理数据
  2. 统一数据访问:一套标准API可读写不同数据源
  3. Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
  4. 标准化连接:支持标准化JDBC\ODBC连接, 方便和各种数据库进行数据交互.

1.2 SparkSQL和Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”

均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

1.3 Spark数据抽象

Pandas -- DataFrame

  • 二维表数据结构
  • 单机(本地)集合

SparkCore -- RDD

  • 无标准数据结构,存储什么数据均可
  • 分布式集合(分区)

SparkSQL -- DataFrame

  • 二维表数据结构
  • 分布式集合(分区)

SparkSQL For JVM -- DataSet

  • 可用于Java、Scala语言

SparkSQL For Python\R -- DataSet

  • 可用于Java、Scala、Python、R语言

发展历程:

  • 14年最早的数据抽象是:SchemaRDD(内部存储二维表数据结构的RDD),SchemaRDD就是魔改的RDD,将RDD支持的存储数据,限定为二维表数据结构用以支持SQL查询。由于是魔改RDD,只是一个过渡产品,现已废弃。
  • 15年发布DataFrame对象,基于Pandas的DataFrame(模仿)独立于RDD进行实现,将数据以二维表结构进行存储并支持分布式运行
  • 16年发布DataSet对象,在DataFrame之上添加了泛型的支持,用以更好的支持Java和Scala这两个支持泛型的编程语言
  • 16年,Spark2.0版本,将DataFrame和DataSet进行合并。其底层均是DataSet对象,但在Python和R语言到用时,显示为DataFrame对象。和老的DataFrame对象没有区别

1.4 DataFrame概述

假定有如下数据集:

DataFrame按二维表格存储

RDD按数组对象存储

1.5 SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext,在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建SparkSession对象,这个对象是 构建者模式 ,通过builder方法来构建
    spark = SparkSession.builder.\
           appName('local[*]').\
           config('spark.sql.shuffle.partitions', '4').\
           getOrCreate()
    # appName 设置程序名称,config设置一些常用属性
    # 最后通过getOrCreate()方法,创建SparkSession对象
# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
    if __name__ == '__main__':
        # 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
        spark = SparkSession.builder.\
            appName("local[*]").\
            config("spark.sql.shuffle.partitions", "4").\
            getOrCreate()
        # appName 设置程序名称, config设置一些常用属性
        # 最后通过getOrCreate()方法 创建SparkSession对象
        df = spark.read.csv('../data/sql/stu_score.txt', sep=',', header=False)
        df2 = df.toDF('id', 'name', 'score')
        df2.printSchema()
        df2.show()
        df2.createTempView("score")
        # SQL 风格
        spark.sql("""
            SELECT * FROM score WHERE name='语文' LIMIT 5
        """).show()
        # DSL 风格
        df2.where("name='语文'").limit(5).show()

2 DataFrame入门

2.1 DataFrame的组成

DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:

  • 表结构描述

比如,在MySQL中的一张表:

  • 由许多行组成
  • 数据也被分成多个列
  • 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:

  • 在结构层面:
    • StructType对象描述整个DataFrame的表结构
    • StructField对象描述一个列的信息
  • 在数据层面
    • Row对象记录一行数据
    • Column对象记录一列数据并包含列的信息

2.2 DataFrame的构建

2.2.1 基于RDD方式1 createDataFrame

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))

# 将rdd转化为DataFrame
# 参数1:被转换的RDD
# 参数2:指定列名,通过list的形式指定,按照顺序依次提供字符串名字即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])

# 打印DataFrame的表结构
df.peintSchema()

# 打印df中的数据
# 参数1:表示展示出多少条数据,默认不传的话是20
# 参数2:表示是否对列进行截取,如果列的数据长度超过20个字符串长度,后续的内容不显示以...代替
# 如果给False表示不截断、全部显示,默认是True
df.show(20, False)

# 将DF对象转换成临时视图表,可供sql语句查询
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people WHERE age<30").show()

2.2.2 基于RDD方式2 加入schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))

# 构建表结构的描述对象:StructType对象
# 三个参数分别为列名,类型,是否为空
schema = StructType().add("name", StringType(), nullable=True).add("age",IntergerType(), nullable=False)

# 将rdd转化为DataFrame
# 参数1:被转换的RDD
# 参数2:将创建的StructType对象传进去
df = spark.createDataFrame(rdd, schema=schema)

2.2.3 基于RDD方式3 RDD的toDF

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

'''
filePath:
zhangsan,21
lisi,23
wangwu,20
'''
# 构建rdd,得到[[zhangsan,21], [lisi,23], [wangwu,20]]这种数据形式
rdd = sc.textFile("filePath").map(lambda x:x.split(',')).map(lambda x:int(x(1)))

# toDF的方式构建DataFrame
df1 = rdd.toDF(["name", "age"])
df1.printSchema()
df1.show()

# toDF的方式2 通过StructType来构建
schema = StructType().add("name", StringType(), nullable=True).\
    add("age", IntegerType(), nullable=False)

df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show()

2.2.4 基于Pandas的DataFrame构建

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
    appName("test").\
    master("local[*]").\
    getOrCreate()
sc = spark.sparkContext

# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
pdf = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "name": ["张大仙", "王晓晓", "吕不为"],
        "age": [11, 21, 11]
    }
)

df = spark.createDataFrame(pdf)

df.printSchema()
df.show()

2.3 读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame。统一API示例代码:

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
    .option("K", "V") # option可选
    .schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
    .load("被读取文件的路径, 支持本地文件系统和HDFS")

text:

spark = SparkSession.builder.\
    appName("test").\
    master("local[*]").\
    getOrCreate()
sc = spark.sparkContext

# 构建StructType, text数据源, 读取数据的特点是, 将一整行只作为`一个列`读取, 默认列名是value 类型是String
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text").\
    schema(schema=schema).\
    load("../data/input/sql/people.txt")

json:

# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df = spark.read.format("json").load("../data/input/sql/people.json")
df.printSchema()
df.show()

csv:

df = spark.read.format("csv")\
    .option("sep", ";")\ # 列分隔符
    .option("header", False)\ # 是否有CSV标头
    .option("encoding", "utf-8")\ # 编码
    .schema("name STRING, age INT, job STRING")\ # 指定列名和类型
    .load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()

parquet:

parquet是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式。parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

Parquet文件不能直接打开查看,如果想要查看内容可以在PyCharm中安装如下插件来查看:

2.4 DSL风格入门API

DSL称之为:领域特定语言。

其实就是指DataFrame的特有API

DSL风格意思就是以调用API的方式来处理Data

比如:df.where().limit()

2.5 SQL风格入门API

SQL风格就是使用SQL语句处理DataFrame的数据

比如:spark.sql("SELECT * FROM xxx")

2.6 wordcount案例

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # TODO 1: SQL 风格进行处理
    rdd = sc.textFile("../data/input/words.txt").\
        flatMap(lambda x: x.split(" ")).\
        map(lambda x: [x])

    df = rdd.toDF(["word"])

    # 注册DF为表格
    df.createTempView("words")

    spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()


    # TODO 2: DSL 风格处理
    df = spark.read.format("text").load("../data/input/words.txt")

    # withColumn方法
    # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
    df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
    df2.groupBy("value").\
        count().\
        withColumnRenamed("value", "word").\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        show()

3 UDF

4 SparkSQL的运行流程

4.1 RDD执行流程回顾

代码 -> DAG调度器逻辑任务 -> Task调度器任务分配和管理监控 -> Worker干活

4.2 SparkSQL的自动优化

RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。

这是因为RDD内含数据类型不限格式和结构,而DataFrame是二维表结构,可以被针对,SparkSQL的自动优化,依赖于Catalyst优化器。

4.3 Catalyst优化器

为了解决过多依赖Hive的问题,SparkSQL使用了一个新的SQL优化器替代Hive中的优化器,这个优化器就是Catalyst,整个SparkSQL的框架大致如下:

  1. 简单的说,API层即Spark会通过一些API接受SQL语句
  2. 收到SQL语句后,将其交给Catalyst,Catalyst负责解析SQL,生成执行计划等
  3. Catalyst的输出应该是RDD的执行计划
  4. 最终交由集群运行

具体优化流程:

step1:解析SQL,并生成AST(抽象语法树)

step2:在AST中加入元数据信息,做这一步主要是为了一些优化,例如col = col 这样的条件,下图是一个简略图,便于理解

  • score.id -> id#1#L

    为score.id生成id为1,类型是Long

  • score.math_score -> math_score#2#L

    为score.math_score生成id为2,类型为Long

  • people.id -> id#3#L

    为people.id生成id为3,类型为Long

  • people.age -> age#4#L

    为people.age生成id为4,类型为Long

set3:对已经加入元数据的AST,输入优化器,进行优化,从两种常见的优化开始,简单介绍:

  • 断言下推 Predicate Pushdown,将Filter 这种可以减小数据集的操作下推,放在Sacn的位置,这样可以减少操作时候的数据量。(又称谓词下推)

    SELECT sum(v)
    FROM(
      SELECT
          score_id,
          100+80+score.math_score AS v
      FROM people JOIN score
      WHERE people.id = score.id AND people.age > 10
    )tmp
    -- 如这个代码,正常流程是先JOIN然后WHERE,断言下推后,会先过滤age,然后JOIN,减少JOIN的数据量以提高性能
  • 列值裁剪 Column Pruning,在断言下推后执行裁剪,由于people表之上的操作中用到了id列,所以可以把其他列裁剪掉,这样可以减少处理的数据量,从而优化处理速度。如下图,在scan前又加入了Filter,作为列裁剪用

step4:上面的过程生成的AST其实最终还没办法直接运行,这个AST叫做逻辑计划,结束后,需要生成物理计划,从而生成RDD来运行,在生成物理计划的时候,会经过成本模型对整个树再次执行优化,选择一个更好的计划;在生成物理计划之后,因为考虑到性能,所以会使用代码生成,在机器中运行。

可以使用queryExecution方法查看逻辑执行计划,使用explain方法查看物理执行计划。

spark.sql("SELECT name,age FROM people WHERE age>19").explain(True)

总结:

Catalyst的各种优化细节非常多,大方面的优化点有两个:

  • 谓词下推(Predicate Pushdown)\断言下推:将逻辑判断提前,以减少shuffle阶段的数据量【行过滤,提前执行where】;
  • 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度【列过滤,提前规划select的组字段数量】。

4.4 SparkSQL的执行流程

  1. 提交SparkSQL代码
  2. Catalyst优化
    1. 生成原始AST语法树
    2. 标记AST元数据
    3. 进行断言下推和列值裁剪有以及其他方面的优化作用在AST上
    4. 得到最终AST,生成执行计划
    5. 将执行计划翻译为RDD代码
  3. Driver执行环境入口构建(SparkSession)
  4. DAG调度器规划逻辑任务
  5. Task调度区分配逻辑任务到具体Executor上工作并监控管理任务
  6. Worker干活
最后修改:2022 年 08 月 22 日
如果觉得我的文章对你有用,请随意赞赏