1 为什么需要RDD

分布式计算需要:

  • 分区控制
  • Shuffle控制
  • 数据存储\序列化\发送
  • 数据计算API
  • 等一系列功能

这些功能, 不能简单的通过Python内置的本地集合对象(如 List\ 字典等)去完成。我们在分布式框架中, 需要有一个统一的数据抽象对象, 来实现上述分布式计算所需功能。这个抽象对象, 就是RDD。

2 什么是RDD

在Spark开山之作Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing这篇paper中,Matei等人提出了RDD这种数据结构。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

  • Dataset:一个数据集合,用于存放数据的。
  • Distributed:RDD中的数据是分布式存储的,可用于分布式计算。
  • Resilient:RDD中的数据可以存储在内存中或者磁盘中。
  • RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。
  • 所有的运算以及操作都建立在 RDD 数据结构的基础之上。
  • 可以认为RDD是分布式的列表List或数组Array,抽象的数据结构,RDD是一个抽象类Abstract Class和泛型Generic Type

3 RDD的五大特性

RDD 数据结构内部有五个特性,前三个特征每个RDD都具备的,后两个特征可选的。

  1. RDD是有分区的
  2. 计算方法都会作用到每一个分片(分区)之上
  3. RDD之间是有互相依赖的关系的
  4. KV型RDD可以有分区器
  5. RDD分区数据的读取会靠近数据所在地

3.1 RDD是有分区的

RDD的分区是RDD数据存储的最小单位,一份RDD的数据,本质上是分隔成了多个分区。

>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 3).glom().collect()
[[1,2,3],[4,5,6],[7,8,9]]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 6).glom().collect()
[[1],[2,3],[4],[5,6],[7],[8,9]]

3.2 RDD的方法会作用在其所有的分区上

>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 3).glom().collect()
[[1,2,3],[4,5,6],[7,8,9]]
>>> sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(lambda x: x*10).glom().collect()
[[10,20,30],[40,50,60],[70,80,90]]

如上述代码,RDD有三个分区,在执行了map操作将数据都乘以十后,可以看到,三个分区的数据都乘以十了,说明.map是作用在了每一个分区之上。

3.3 RDD之间是有依赖关系的(RDD有血缘关系)

sc = SparkContext(conf = conf)
rdd1 = sc.textFile("../t.txt")
rdd2 = rdd1.flatMap(lambda x:x.split())
rdd3 = rdd2.map(lambda x:(x,1))
rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
print(rdd4.collect())

如上代码,RDD之间是有依赖的。如RDD2会产生RDD3,但是RDD2依赖于RDD1;同样,RDD3会产生RDD4,但RDD3依赖于RDD2;……;会形成一个依赖链条,这个链条称之为RDD的血缘关系。

3.4 Key-Value型的RDD可以有分区器

默认分区器:默认为Hash分区规则,也可以手动设置一个分区器( rdd.partitionBy 的方法来设置)

Key-Value RDD:RDD中存储的是二元元组,这就是Key-Value型,如("Hadoop", 3)

这个特征不是所有RDD都拥有的,因为不是所有的RDD都是Key-Value型数据。

如上图所示,若存在以上五组数据,且设置三个分区,则遵从默认的Hash分区规则,Key相同的数据会被分至同一个区。

3.5 RDD分区数据的读取会靠近数据所在地

在初始RDD(读取数据的时候)规划时,分区会尽量规划到存储数据所在的服务器,因为这样可以走本地读取,避免网络读取所带来的时间损耗。

总之,Spark会在确保并行计算能力的前提下,尽量确保本地读取。

4 RDD编程入门

4.1 程序执行入口 :SparkContext对象

Spark RDD 编程的程序入口对象是SparkContext对象(不论何种编程语言),只有构建出SparkContext, 基于它才能执行后续的API调用和计算,本质上, SparkContext对编程来说, 主要功能就是创建第一个RDD出来。

from pyspark import SparkConf, SparkContext
if __name__ == "__main__":
    # 构建SparkConf对象
    conf = SparkConf().setAppName('helloworld').setMaster('local[*]')
    # 构建SparkContext执行环境入口对象
    sc = SparkContext(conf = conf)

4.2 RDD的创建

RDD的创建主要有2种方式:

  • 通过并行化集合创建 ( 本地对象 转 分布式RDD )
  • 读取外部数据源 ( 读取文件 )

4.2.1 并行化创建:parallelize

使用parallelize API并行化创建是指将本地集合 -> 分布式RDD。这一步是分布式的开端(本地转分布式)

# 参数1:集合对象,如list等
# 参数2:分区数
rdd = sparkcontext.parallelize([1,2,3,4,5,6,7], 3)

collect方法是将RDD(分布式对象)中每个分区的数据,都发送到Driver中,形成一个python List对象。

collect实现将RDD分布式 -> 本地集合

4.2.2 读取文件创建:textFile

使用textFile API可以读取本地数据,也可以读取hdfs数据

# 参数1:必填,文件路径,支持本地文件或HDFS文件,也支持一些其他协议如亚马逊的S3协议(路径为文件夹则将文件夹内所有文件都读取出来)
# 参数2:可选,表示最小分区数量
# 注意,参数2话语权不足,Spark有自己的判断,在它允许的范围内,参数2有效果,超过Spark允许的范围,参数2失效
sparkcontext.textFile(参数1, 参数2)

4.2.3 一堆小文件的读取:wholeTextFiles

使用wholeTextFiles API,适合读取一堆小文件

# 参数1:必填,文件路径,支持本地文件或HDFS文件,也支持一些其他协议如亚马逊的S3协议(路径为文件夹则将文件夹内所有文件都读取出来)
# 参数2:可选,表示最小分区数量
# 注意,参数2话语权不足,分区数最多只能开到文件数量
sparkcontext.wholeTextFiles(参数1, 参数2)

这个API偏向于少量分区读取数据,因为此API表明了自己是小文件读取专用,那么文件的数据很小、分区很多,导致shuffle的几率更高。所以尽量少分区读取数据。

4.2.4 获取分区数量:getNumPartitions

getNumPartitions API获取分区数量,返回值是int数字

>>> rdd.getNumPartitions()
3

4.3 RDD算子的概念与分类

算子:分布式集合对象上的API称之为算子

方法\函数:本地对象的API,叫做方法\函数

RDD的算子分为两类:

  • Transformation:转换算子
  • Action:动作(行动)算子

Transformation算子

  • 定义:RDD的算子,返回值仍旧是一个RDD的,称之为转换算子

  • 特性:这类算子是lazy(懒加载)的,如果没有action算子,Transformation算子是不工作的。

Action算子

  • 定义:返回值不是RDD的就是action算子。

对于这两类算子来说,Transformation算子相当于在构建执行计划,action是让这个执行计划开始工作的指令。

如果没有action,transformation算子之间的迭代关系就是一个没有通电的流水线。只有action到来,这个数据处理的流水线才开始工作。

4.4 RDD常用的转换算子

4.4.1 map算子

功能:将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD。

# func:  f:(T) -> U
rdd.map(func)

4.4.2 flatMap算子

功能:对RDD执行map操作,然后进行解除嵌套操作(展平,即多维变一维)

# 解除嵌套
# 嵌套的list
l = [[1,2,3], [4,5,6], [7,8,9]]

# 接触嵌套后:
l = [1,2,3,4,5,6,7,8,9]
>>> rdd = sc.parallelize(["hadoop hive spark", "hadoop hadoop flink", "spark hadoop hive"])
>>> rdd1 = rdd.map(lambda x: x.split())
>>> rdd1.collect()
[['hadoop','hive','spark'], ['hadoop','hadoop','flink'], ['spark','hadoop','hive']]
>>> rdd2 = rdd.flatMap(lambda x: x.split())
>>> rdd2.collect()
['hadoop','hive','spark','hadoop','hadoop','flink','spark','hadoop','hive']

4.4.3 reduceByKey算子

功能:针对KV型RDD,自动按照key分组,然后根据所提供的聚合逻辑,完成组内数据(value)的聚合操作。(按key分组,按value聚合)

# func:(V,V) -> V
# 接受两个传入参数(类型需一致),返回一个返回值,类型和传入要求一致。
rdd.reduceByKey(func)

reduceByKey中的聚合逻辑(以[1,2,3,4,5],lambda a,b:a+b为例):

>>> rdd = sc.aprallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
>>> result = rdd.reduceByKey(lambda a,b: a+b)
>>> print(result.collect())
[('b',3),('a',2)]

reduceByKey中接收的函数,只负责聚合,不理会分组

分组时自动by key来分组的

4.4.4 mapValues算子

功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作。

# func: (V) -> U
# 注意传入的参数是二元元组的value值
# 这个算子支队value处理
rdd.mapValues(func)
>>> rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
>>> print(rdd.mapValues(lambda x:x*10).collect())
[('a', 10), ('a', 10), ('b', 10), ('b', 10), ('b', 10)]

4.4.5 groupBy算子

功能:将rdd的数据进行分组

rdd.groupBy(func)
# func (T) -> K
# 函数要求传入一个参数,返回一个返回值,类型无所谓
# 这个算子是拿到参数函数的返回值后,将所有相同返回值的放入一个组中
# 分组完成后,每一个数据都是一个二元元组,key就是返回值,所有同组的数据放入一个迭代器对象中作为value
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> rdd1 = rdd.groupBy(lambda num: 'even' if num%2==0 else 'odd')
>>> print(rdd1.map(lambda x:(x[0], list(x[1]))).collect())
[('even',[2,4]),('odd',[1,3,5])]

4.4.6 Filter算子

功能:过滤想要的数据进行保留

rdd.filter(func)
# func: (T) -> bool,传入一个任意类型的参数进来,返回一个bool类型
>>> rdd = sc.parallelize([1,2,3,4,5,6])
>>> print(rdd.filter(lambda x:x%2==1).collect())
[1, 3, 5]

4.4.7 distinct算子

功能:对RDD数据进行去重,返回新的RDD

rdd.distinct(参数1)
# 参数1:驱虫分区数量,一般不用传
>>> rdd = sc.parallelize([1,1,1,2,2,5,5,5,5,6,6,6,6])
>>> rdd.distinct().collect()
[1, 5, 2, 6]
>>> rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
>>> rdd.distinct().collect()
[('a', 1), ('b', 1)]

4.4.8 union算子

功能:2个rdd合并成1个rdd返回(只合并,不去重,不同类型RDD依旧可以混合)

rdd.union(other_rdd)
>>> rdd1 = sc.parallelize([1,1,2,2])
>>> rdd2 = sc.parallelize([1,1,2,2,3,3,4,4])
>>> union_rdd = rdd1.union(rdd2)
>>> union_rdd.collect()
[1, 1, 2, 2, 1, 1, 2, 2, 3, 3, 4, 4]

4.4.9 join算子

功能:对两个RDD执行JOIN操作(可实现SQL的内\外连接)(只能用于二元元组)

rdd.join(other_rdd)             # 内连接
rdd.leftOuterJoin(other_rdd)     # 左外连接
rdd.rightOuterJoin(other_rdd)    # 右外连接
>>> x = sc.parallelize([(1001,'zhangsan'), (1002,'lisi'), (1003,'wangwu'), (1004, 'maliu')])
>>> y = sc.parallelize([(1001,'sales'), (1002,'tech')])
>>> print(x.join(y).collect())
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
>>> print(x.leftOuterJoin(y).collect())
[(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('maliu', None))]

4.4.10 intersection算子

功能:求2个rdd的交集,返回一个新rdd

rdd.intersection(other_rdd)
>>> rdd = sc.parallelize([1,3,5,7])
>>> rdd1 = sc.parallelize([1,3,8,9])
>>> rdd.intersection(rdd1).collect()
[1, 3]  

4.4.11 glom算子

功能:将RDD的数据加上嵌套,这个嵌套按照分区来进行

# 无需参数
rdd.glom()
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)
>>> rdd.glom().collect()
[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

4.4.12 groupByKey算子

功能:针对KV型RDD,自动按照key分组

# 无需参数
rdd.groupByKey()
>>> rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])
>>> rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7f4b817541d0>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7f4b817540d0>)]
>>> rdd.groupByKey().map(lambda x:list(x[1])).collect()
[[1, 1, 1], [1, 1]]
>>> rdd.groupByKey().map(lambda x:(x[0],list(x[1]))).collect()
[('b', [1, 1, 1]), ('a', [1, 1])]

4.4.13 sortBy算子

功能:基于指定的排序依据对RDD数据进行排序。

rdd.sortBy(func, ascending=False, numPartitions=1)
# func: (T) -> U: 告知按照rdd中的哪个数据进行排序,比如lambda x:x[1] 表示rdd中的第二列元素进行排序
# ascending True升序,False降序
# numPartitions:用多少分区排序
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9,1,2,3,4,5,6,7,8,9])
>>> rdd.sortBy(lambda x:x).collect()
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9]
>>> rdd = sc.parallelize([('a', 1), ('a', 3), ('b', 8), ('b', 3), ('b', 1)])
>>> rdd.sortBy(lambda x:x[1]).collect()
[('a', 1), ('b', 1), ('a', 3), ('b', 3), ('b', 8)]
>>> rdd.sortBy(lambda x:(x[0],x[1])).collect()
[('a', 1), ('a', 3), ('b', 1), ('b', 3), ('b', 8)]

4.4.14 sortByKey算子

功能:针对KV型RDD,按照key进行排序

sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)
# ascending: 升序or降序,True升,False降,默认升
# numPartitions: 按照几个分区进行排序,如果全局有序,设置为1
# keyfunc: 在排序前对key进行处理,语法是(K) -> U,一个参数传入,返回一个值
>>> rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
                          ('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
                          ('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)
>>> print(rdd.sortByKey(ascending=True, numPartitions=1).collect())
[('C', 1), ('D', 1), ('E', 1), ('a', 1), ('b', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]
>>> print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
[('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]

keyfunc并不会改变数据的值,只会改变排序时依据的值,如上,虽然lower()变小写了,但是输出时仍是大写。

4.5 常用动作算子

4.5.1 countByKey算子

功能:一般适用于KV型的RDD,用于统计key出现的次数

>>> rdd = sc.parallelize(['hadoop','spark','hadoop'])
>>> rdd1 = rdd.map(lambda x:(x,1))
>>> result = rdd1.countByKey()
>>> print(result)
defaultdict(<class 'int'>, {'hadoop': 2, 'spark': 1})
>>> type(result)
<class 'collections.defaultdict'>

注意:countByKey操作后的result已经不是一个RDD了。

4.5.2 collect算子

功能:将RDD各个分区内的数据统一收集到Driver中,形成一个List对象

rdd.collect()

这个算子是将RDD各个分区数据都拉取到Driver中,需要注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子前要确保结果数据及不会太大,防止把Driver内存撑爆。

4.5.3 reduce算子

功能:对RDD数据集按照所传入的逻辑进行聚合

rdd.reduce(func)
# func: (T, T) -> T
# 两个参数传入,一个返回值,返回值和参数要求类型一致

reduce算子执行流程:

>>> rdd = sc.parallelize(range(10))
>>> print(rdd.reduce(lambda a,b:a+b))
45

4.5.6 fold算子

功能:和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的。这个初始值会作用在分区内聚合和分区间聚合时。

如:[[1,2,3], [4,5,6], [7,8,9]]

数据分布在三个分区

分区1:123聚合的时候带上10作为初始值得到16

分区1:456聚合的时候带上10作为初始值得到25

分区1:789聚合的时候带上10作为初始值得到34

rdd.fold(参数, func)
# 参数:分区内及分区间聚合时带有的初始值
# func:func: (T, T) -> T
>>> rdd = sc.parallelize(range(1,10),3)
>>> print(rdd.fold(10, lambda a,b: a+b))
85

有3次分区内聚合,一次分区间聚合,因此额外多了40

4..5.5 first算子

功能:取RDD的第一个元素。

>>> rdd = sc.parallelize(range(1,10),2)
>>> rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> rdd.glom().collect()
[[1, 2, 3, 4], [5, 6, 7, 8, 9]]
>>> rdd.first()
1

4.5.7 take算子

功能:取RDD中的前n个元素,组合成List返回。

>>> rdd = sc.parallelize(range(1,10),2)
>>> rdd.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> rdd.glom().collect()
[[1, 2, 3, 4], [5, 6, 7, 8, 9]]
>>> rdd.take(5)
[1, 2, 3, 4, 5]

4.5.8 top算子

功能:对RDD数据集进行降序排序,取前n个,组合成List返回

>>> rdd = sc.parallelize([1,5,3,9,5,2,3,0,1,2,9,4,5,3])
>>> rdd.top(5)
[9, 9, 5, 5, 5]

4.5.9 count算子

功能:计算RDD有多少条数据,返回值是一个数字

>>> sc.parallelize([1,2,3,4]).count()
6

4.5.10 takeSample算子

功能:随机抽样RDD的数据

takeSample(参数1:True or False, 参数2:采样数, 参数3:随机数种子)
# 参数1:True表示运行去同一个数字,False表示不允许取同一个数字,和数据内容无关,重复指的是同一个位置上的数据
# 参数2:抽样要几个
>>> rdd = sc.parallelize([1,5,3,9,5,2,3,0,1,2,9,4,5,3])
>>> rdd.takeSample(True, 5)
[3, 3, 1, 3, 2]
>>> rdd.takeSample(True, 20)
[3, 1, 5, 5, 3, 5, 2, 1, 3, 2, 1, 0, 5, 1, 3, 5, 2, 3, 9, 0]
>>> rdd.takeSample(False, 20)
[0, 3, 3, 1, 4, 1, 5, 3, 2, 5, 9, 5, 9, 2]

4.5.11 takeOrdered算子

功能:对RDD进行排序取前N个

rdd.takeOrdered(参数1, 参数2)
# 参数1:要几个数据
# 参数2:堆排序的数据进行更改(不会更改数据本身,只是在排序时换个样子)
>>> rdd = sc.parallelize([1,3,2,4,7,9,6],1)
>>> rdd.takeOrdered(3)
[1, 2, 3]
>>> rdd.takeOrdered(3,lambda x:-x)
[9, 7, 6]

4.5.12 foreach算子

功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个算子没有返回值。

rdd.foreach(func)
# func: (T) -> None
>>> rdd = sc.parallelize([1,2,3,4,5])
>>> r = rdd.foreach(lambda x:print(x*10))
40
50
10
30
20
>>> print(r)
None

foreach中函数不能给返回值,所以直接在里面打印了。foreach不需要提交给Driver,直接在分区下运行完毕。(可以用来高效写入SQL、打印等操作)

4.5.13 saveAsTextFile算子

功能:将RDD的数据写入文本文件中,支持本地写出和HDFS等文件系统。

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
rdd.saveAsTextFile("hdfs://node1:8020/output/test")

参数为文件夹路径,有几个分区产生多少个文件,且亦无需向Driver汇报

4.6 分区操作算子

4.6.1 mapPartitions算子

功能:一次传递一整个分区,然后按接收到的函数逻辑处理数据。

>>> rdd = sc.parallelize([1,2,3,4,5],2)
>>> rdd.glom().collect()
[[1, 2], [3, 4, 5]]
>>> rdd1 = rdd.mapPartitions(lambda x: [i*10 for i in x])
>>> rdd1.glom().collect()
[[10, 20], [30, 40, 50]]

4.6.2 foreachPartition算子

功能:和普通的foreach一致,但一次处理的是一整个分区数据。

4.6.3 partitionBy算子

功能:对RDD进行自定义分区操作

rdd.partitionBy(参数1, 参数2)
# 参数1 重新分区后有几个分区
# 参数2 自定义分区规则,函数传入
# 参数2:(K) -> int  一个传入参数进来,类型无所谓,但是返回值一定是int类型,将key传给这个函数,根据所写函数逻辑返回一个分区编号,分区编号从0开始,不要超过分区数-1
def partition_self(key):
    if key[0] == 'h':
        return 0
    elif key[0] == 's':
        return 1
    else:
        return 2
rdd = sc.parallelize()

>>> rdd = sc.parallelize(["hadoop hive spark", "hadoop hadoop flink", "spark hadoop hive"])
>>> rdd1 = rdd.flatMap(lambda x:x.split())
>>> rdd2 = rdd1.map(lambda x:(x,1))
>>> rdd2.collect()
[('hadoop', 1), ('hive', 1), ('spark', 1), ('hadoop', 1), ('hadoop', 1), ('flink', 1), ('spark', 1), ('hadoop', 1), ('hive', 1)]
>>> rdd2.partitionBy(3,partition_self).glom().collect()
[[('hadoop', 1), ('hive', 1), ('hadoop', 1), ('hadoop', 1), ('hadoop', 1), ('hive', 1)], [('spark', 1), ('spark', 1)], [('flink', 1)]]

4.6.4 repartition算子

功能:对RDD的分区执行重新分区(仅数量)

rdd.repartition(N)
# 传入N决定新的分区数
>>> rdd = sc.parallelize(range(10),2)
>>> rdd.glom().collect()
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
>>> rdd1 = rdd.repartition(5)
>>> rdd1.glom().collect()
[[], [0, 1, 2, 3, 4], [], [5, 6, 7, 8, 9], []]
>>> rdd2 = rdd1.repartition(1)
>>> rdd2.glom().collect()
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]

注意:对分区的数量进行操作,一定要慎重。

一般情况下,我们写Spark代码,除了要求全局排序设置为1个分区外,多数时候,所有API中关于分区相关的代码都不必太多例会。因为如果更改分区了,可能会影响并行计算(内存迭代的并行管道数量),且分区如果增加,极大可能导致shuffle。

4.6.5 coalesce算子

功能:修改分区数量

    rdd.coalesce(num, shuffle=False)
# 相当于安全阀,shuffle必须为True才能增加分区数,而repatition就是调用的coalesce,且shuffle始终为True
>>> rdd = sc.parallelize(range(10),3)
>>> rdd.glom().collect()
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
>>> rdd.coalesce(2).glom().collect()
[[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]
>>> rdd.coalesce(4).glom().collect()
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
>>> rdd.coalesce(4,shuffle=True).glom().collect()
[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]

4.7 groupByKey与reduceByKey的区别

在功能上:

  • groupByKey仅仅有分组功能而已
  • reduceBykey除了有ByKey的分组功能外,还有reduce聚合功能。

如果对数据执行分组+聚合,那么使用这两个算子的性能差别是很大的。

对于groupByKey,reduceByKey最大的提升在于,分组前进行了预聚合,那么再shuffle分组节点,被shuffle的数据可以极大地减少。这就极大的提升了性能。

分组+聚合,首选reduceByKey,数据越大,对groupByKey的优势就越高。

5 RDD的持久化

5.1 RDD的数据是过程数据

RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。

这个特性可以最大化的利用资源,老旧RDD没用了就从内存中清理,给后续的计算腾出内存空间。

如上图,rdd3被两次使用,第一次使用之后,其实rdd3就不存在了。第二次用的时候,只能基于rdd的血缘关系,从rdd1重新执行,构建出来rdd3,供rdd5使用。

5.2 RDD的缓存

在上一章节,若rdd3不消失,那么rdd1->rdd2->rdd3这个链条就不会执行两次,或者更多次,因此上述场景一定要执行优化。

Spark提供了缓存API,可以让我们通过调用API,将指定的RDD数据保留在内存或磁盘上。

'''加入缓存的API'''
# 缓存到内存中
rdd.cache()
# 仅内存缓存
rdd.persist(StorageLevel.MEMORY_ONLY)
# 仅内存缓存,2个副本
rdd.persist(StorageLevel.MEMORY_ONLY_2)
# 仅缓存硬盘上
rdd.persist(StorageLevel.DISK_ONLY)
# 仅缓存硬盘上,2个副本
rdd.persist(StorageLevel.DISK_ONLY_2)
# 仅缓存硬盘上,3个副本
rdd.persist(StorageLevel.DISK_ONLY_3)
# 先放内存,不够放硬盘
rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 先放内存,不够放硬盘,2个副本
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
# 堆外内存(系统内存)
rdd.persist(StorageLevel.OFF_HEAP)

# 一般建议使用rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 如果集群内存较小,建议使用rdd.persist(StorageLevel.DISK_ONLY),或者用CheckPoint

'''主动清理缓存的API'''
rdd.unpersist()

特点:

  • 保留血缘关系
  • 分散存储

5.3 RDD的CheckPoint

CheckPoint存储RDD数据是集中收集各个分区数据进行存储,而缓存是分散存储。

缓存和CheckPoint的对比:

  • CheckPoint不管分区数量多少,风险是一样的, 缓存分区越多,风险越高
  • CheckPoint支持写入HDFS,缓存不行,HDFS是高可靠性存储,CheckPoint被认为是安全的
  • CheckPoint不支持内存,缓存可以,缓存如果写内存,性能比CheckPoint要好一些
  • CheckPoint应为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全(只要跟内存存储有关一律认为不安全),所以保留。
# 设置CheckPoint第一件事:选择CP的保存路径
# 如果是Local模式,可以支持本地文件系统,如果在集群运行,千万要用HDFS
sc.setCheckPointDir("hdfs://node1:8020/output")
# 用的时候,直接调用CheckPoint算子即可
rdd.CheckPoint()
最后修改:2022 年 08 月 22 日
如果觉得我的文章对你有用,请随意赞赏