Spark - RDD讲解

What is RDD


在讲RDD之前,先和大家说一下在Spark中,我们分析数据的过程,主要会碰到RDD,DataFrame,DataSet概念,这三种都是我们计算过程中的数据单元。在Spark 2.0之前,主要的数据操作都是操作RDD,而在Spark 2.0以后,官方开始呼吁大家迁移到基于DataSet,即使你是从Spark 2.0以后开始接触的spark,但是我也很负责人的告诉你,RDD你必须要懂~~

rdd-1024x595.png

定义


弹性分布式数据集(Resilient Distributed Datasets : RDD),表示已被分区、不可变的,并能够被并行操作,可同错的数据集合。


针对上面的定义,还说描述的很抽象,接下来根据每个RDD的属性,进行逐点说明


分区

分区这个概念在分布式计算中我们经常会看到,例如MapReduce中,数据在Map端写入到环形缓冲区,数据进行分区,reduce端读取相应的分区文件,还有在Kafka中,topic中的分区概念。


在RDD中,本质上是一个只读的分区记录集合。也就是我们要处理的源数据的抽象。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同节点上,从而可以在集群中的不同节点上进行并行计算。这也是它可以被并行处理的前提。反向来思考的话,如果RDD不可分区,只是一个单独不可拆分的数据块,那么集群中的节点怎么对这个源数据进行分布式并行计算呢?


逻辑上,我们可以认为 RDD 是一个大的数组。数组中的每个元素可以代表一个分区(Partition)。在物理存储中,每个分区指向一个存放在堆内内存和堆外内存或者磁盘中的数据块(Block),而这些数据块是独立的,它们可以被存放在系统中的不同节点。所以,RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,
仅保存了元数据信息。下图很好地展示了 RDD 的分区逻辑结构:


1_gDz_AuuB-q0ux9Pl9CrvHA.png




RDD 中的每个分区存有它在该 RDD 中的 index。通过 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理。


在集群中,各个节点上的数据块会尽可能地存放在内存中,只有当内存没有空间时才会存入硬盘。这样可以最大化地减少硬盘读写的开销。虽然 RDD 内部存储的数据是只读的,但是,我们可以去修改(例如通过 repartition 转换操作)并行计算单元的划分结构,也就是分区的数量。




不可变性

不可变性代表每一个 RDD 都是只读的,它所包含的分区信息不可以被改变。既然已有的 RDD 不可以被改变,我们只可以对现有的 RDD 进行转换(Transformation)操作,得到新的 RDD 作为中间计算的结果。
屏幕快照 2020-04-10 下午11.36.45.png


上图也就是刚刚提到的针对RDD的Transformation操作中,包括的map算子,flatMap算子,filter算子,这些算子我们接下来的文章在一一讲解,这里我想给大家看的是,在举例的这三个算子实现方法中,我们可以看到都new MapPartitionsRDD(),也就是说,在对一个已有的RDD进行转换操作的过程中,并不是对这个RDD进行直接的修改,变换,而是读取父RDD,创建了一个新的RDD进行转换并且最后返回。




那么这样会带来什么好处呢?显然,对于代表中间结果的 RDD,我们需要记录它是通过哪个 RDD 进行哪些转换操作得来,即依赖关系,而不用立刻去具体存储计算出的数据本身。这样做有助于提升 Spark 的计算效率,并且使错误恢复更加容易。


试想,在一个有 N 步的计算模型中,如果记载第 N 步输出 RDD 的节点发生故障,数据丢失,我们可以从第 N-1 步的 RDD 出发,再次计算,而无需重复整个 N 步计算过程。这样的容错特性也是 RDD 为什么是一个“弹性”的数据集的原因之一。后边我们会提到 RDD 如何存储这样的依赖关系。


并行操作

由于单个 RDD 的分区特性,使得它天然支持并行操作,即不同节点上的数据可以被分别处理,然后产生一个新的 RDD。


容错性

为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的 特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时 ,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制 了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

另外,在RDD计算中,也通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是 logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通 过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算 生成丢失的分区数据。

RDD 五大特性

  • A list of partitions

RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。

  • A function for computing each split

一个函数计算每一个分片,RDD的每个partition上面都会有function,也就是函数应用

  • A list of dependencies on other RDDs

RDD会记录它的依赖 ,依赖还具体分为宽依赖和窄依赖,RDD在计算的过程中,不断的转换,在内存中,不落地磁盘,如果某一环节出错,可以根据依赖来找回上一状态的RDD,为了容错(重算,cache,checkpoint),也就是说在内存中的RDD操作时出错或丢失会进行重算。

  • Optionally,a Partitioner for Key-value RDDs

    可选项,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面

  • Optionally, a list of preferred locations to compute each split on

我们的原则是移动计算,不移动数据,默认的是,磁盘中的数据是作为RDD加载到本机的内存中,但是,Spark这里给出了一个可选项,可以选择加载到指定的机器内存中,就是可以选择将数据放在那几台性能好的节点上


RDD 结构


通过上述讲解,我们了解了 RDD 的基本特性。而且,我们还提到每一个 RDD 里都会包括分区信息、所依赖的父 RDD 以及通过怎样的转换操作才能由父 RDD 得来等信息。实际上 RDD 的结构远比你想象的要复杂,让我们来看一个 RDD 的简易结构示意图:


image.png


SparkContext 是所有 Spark 功能的入口,它代表了与 Spark 节点的连接,可以用来创建 RDD 对象以及在节点中的广播变量等。一个线程只有一个 SparkContext。SparkConf 则是一些参数配置信息。感兴趣的同学可以去阅读官方的技术文档,一些相对不重要的概念我就不再赘述了。Partitions 前文中我已经提到过,它代表 RDD 中数据的逻辑结构,每个 Partition 会映射到某个节点内存或硬盘的一个数据块。Partitioner 决定了 RDD 的分区方式,目前有两种主流的分区方式:Hash partitioner 和 Range partitioner。Hash,顾名思义就是对数据的 Key 进行散列分区,Range 则是按照 Key 的排序进行均匀分区。此外我们还可以创建自定义的 Partitioner。

依赖关系


Dependencies 是 RDD 中最重要的组件之一。如前文所说,Spark 不需要将每个中间计算结果进行数据复制以防数据丢失,因为每一步产生的 RDD 里都会存储它的依赖关系,即它是通过哪个 RDD 经过哪个转换操作得到的。细心的读者会问这样一个问题,父 RDD 的分区和子 RDD 的分区之间是否是一对一的对应关系呢?Spark 支持两种依赖关系:窄依赖(Narrow Dependency)和宽依赖(Wide Dependency)。


image.jpeg


窄依赖就是父 RDD 的分区可以一一对应到子 RDD 的分区,宽依赖就是父 RDD 的每个分区可以被多个子 RDD 的分区使用。
image.jpeg


显然,窄依赖允许子 RDD 的每个分区可以被并行处理产生,而宽依赖则必须等父 RDD 的所有分区都被计算好之后才能开始处理。宽依赖本质就是shuffle,计算代价大,经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销


如上图所示,一些转换操作如 map、filter 会产生窄依赖关系,而 Join、groupBy 则会生成宽依赖关系。这很容易理解,因为 map 是将分区里的每一个元素通过计算转化为另一个元素,一个分区里的数据不会跑到两个不同的分区。而 groupBy 则要将拥有所有分区里有相同 Key 的元素放到同一个目标分区,而每一个父分区都可能包含各种 Key 的元素,所以它可能被任意一个子分区所依赖。


Spark 之所以要区分宽依赖和窄依赖是出于以下两点考虑:

  1. 窄依赖可以支持在同一个节点上链式执行多条命令,例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。
  2. 从失败恢复的角度考虑,窄依赖的失败恢复更有效,因为它只需要重新计算丢失的父分区即可,而宽依赖牵涉到 RDD 各级的多个父分区。

DAG


结合上篇spark运行原理和这篇RDD的讲述,我们来讲一下关于任务运行中,Job,Stage,Task的划分
map-reduce-vs-spark-16-638.jpg


spark任务运行中,会存在一个或者多个Job,action算子的触发会生成一个Job, Job会提交给DAGScheduler,分解成Stage。


image.jpeg
上图是一个job被切割成三个Stage,每个stage中有包含不用个数的partition,每个partition在计算的时候对应一个task,影响程序的并行度。

job分割stage的规则是从G端向前开始分割,遇到宽依赖,就分割一个stage.。
F–>G 切割 stage2 和 stage3
A–>B stage1


上图,程序的运行最小单元是Task,就拿stage2来举例:
stage2有4个Task: C端有2个partition,E端有两个partition。每个partition为开始,最终到F端,作为一个Task。 其中B阶段呢,属于一个程序级别的优化操作。一般分布式程序中,为了让程序能平稳的执行,就要做一些优化操作。


在以后的Spark开发中,我们在Web UI中会经常看到类似于下图的工作流程,这里展示了一个Job的划分和对应操作的细节。
Screen-Shot-2015-06-19-at-2.04.05-PM-1024x879.png




How use RDD


上面讲了很长篇幅的RDD概念和属性,那么我们该如何开始实操RDD呢?我们的第一个RDD来自哪里?上面我说过了,其实RDD就是我们要进行处理的源数据集合。在实际的业务场景中,对于离线数据分析,大多数的场景下,源数据可以是Spark从Hive、HBase中读取,转化成RDD,再细小一点的场景,源数据可以是一个csv报表数据,读取目标CSV进行RDD的转换。那么在下面的讲解中,我们无需对接上游生产环境的数据源,我们可以在IDEA中直接进行RDD的创建,随后再进行各种转换算子和行动算子的操作演示

创建RDD


RDD的创建有三种方法

  • 利用内存中集合中创建
  • 利用外部文件创建
  • 由其他RDD创建新的RDD


从集合中创建可以使用parallelize() 和 makeRDD()方法创建
屏幕快照 2020-04-11 下午5.42.55.png屏幕快照 2020-04-11 下午5.43.10.png


我们可以看到,这两个方法需要我们传入的参数是 Seq[T] 集合,返回的都是RDD[T]。注意,makeRDD给了两种方法,这里先记为第一种makeRDD和第二种makeRDD


这里,先把源码po出来


屏幕快照 2020-04-11 下午5.47.53.png屏幕快照 2020-04-11 下午5.48.07.png


可以清晰的看到,第一种makeRDD的源码实现中,实际上是调用了parallelize(),并且都可以指定分区数量,而且在注释中清晰的写明了,第一种makeRDD和parallelize是identical(完全相同的)。再来看第二种makeRDD,第二种实现可以为数据提供位置信息,并且不能指定RDD的分区数量,除此之外的实现和parallelize函数也是一致的


创建示例:

1
2
3
4
5
6
7
8
9
val make_rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
val make_rdd_1: RDD[String] = sc.parallelize(Array("Brent","HayLee","Henry"))
val make_rdd_2: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
val make_rdd_3: RDD[String] = sc.parallelize(Array("Brent","HayLee","Henry"),4)

println(make_rdd.partitions.size)
println(make_rdd_1.partitions.size)
println(make_rdd_2.partitions.size)
println(make_rdd_3.partitions.size)


结果:

1
2
3
4
1
1
2
4


**外部文件创建**
**
1
2
3
4
sc.textFile("hdfs://hadoop000:9000/xxx/data.txt") // 读取hdfs文件
sc.textFile("data.txt") // 这里纯粹的本地文件是不推荐的,
因为这个文件访问是针对每一个Worker都要是能访问的
换言之,如果是本地文件,则必须保证每一个Worker的本地都有一份这个文件



RDD 算子操作


RDD 的数据操作分为两种:转换(Transformation)和动作(Action)。顾名思义,转换是用来把一个 RDD 转换成另一个 RDD,而动作则是通过计算返回一个结果。


下表列出了一些 Spark 常用的 transformations(转换)。详情请参考 RDD API 文档(ScalaJavaPythonR)和 pair RDD 函数文档(ScalaJava

Transformation(转换)Meaning(含义)
map(func)返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数 func 来生成。
filter(func)返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func 且返回值为 true 的元素来生成。
flatMap(func)与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq 而不是一个单独的 item)。
mapPartitions(func)与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator=> Iterator 类型。
mapPartitionsWithIndex(func)与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的 interger value(整型值)作为参数的 func_,所以在一个类型为 T 的 RDD 上运行时 _func 必须是 (Int, Iterator) => Iterator 类型。
sample(withReplacement, fraction, seed)样本数据,设置是否放回(withReplacement),采样的百分比(_fraction_)、使用指定的随机数生成器的种子(seed)。
union(otherDataset)返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集。
intersection(otherDataset)返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集。
distinct([_numTasks_]))返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素。
groupByKey([_numTasks_])在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable) .
Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKeyaggregateByKey 来计算性能会更好.
Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数。
reduceByKey(func, [_numTasks_])在 (K, V) pairs 的 dataset 上调用时,返回 dataset of (K, V) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的函数 func 来进行聚合的,它必须是 type (V,V) => V 的类型。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。
aggregateByKey(zeroValue)(seqOp, combOp, [_numTasks_])在 (K, V) pairs 的 dataset 上调用时,返回 (K, U) pairs 的 dataset,其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral “0” 值来进行聚合的。允许聚合值的类型与输入值的类型不一样,同时避免不必要的配置。像 groupByKey 一样,reduce tasks 的数量是可以通过第二个可选的参数来配置的。
sortByKey([_ascending_], [_numTasks_])在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys 升序或降序的 (K, V) pairs 的 dataset,由 boolean 类型的 ascending 参数来指定。
join(otherDataset, [_numTasks_])在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoinfullOuterJoin 来实现。
cogroup(otherDataset, [_numTasks_])在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable, Iterable)) tuples 的 dataset。这个操作也调用了 groupWith
cartesian(otherDataset)在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。
pipe(command, [envVars])通过使用 shell 命令来将每个 RDD 的分区给 Pipe。例如,一个 Perl 或 bash 脚本。RDD 的元素会被写入进程的标准输入(stdin),并且 lines(行)输出到它的标准输出(stdout)被作为一个字符串型 RDD 的 string 返回。
coalesce(numPartitions)Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的。
repartition(numPartitions)Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀。该操作总是通过网络来 shuffles 所有的数据。
repartitionAndSortWithinPartitions(partitioner)根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行。


下表列出了一些 Spark 常用的 actions 操作。详细请参考 RDD API 文档(ScalaJavaPythonR
和 pair RDD 函数文档(ScalaJava)。

Action(动作)Meaning(含义)
reduce(func)使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative)和关联(associative)的,这样才能保证它可以被并行地正确计算。
collect()在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)之后返回足够小(sufficiently small)的数据子集通常是有用的。
count()返回 dataset 中元素的个数。
first()返回 dataset 中的第一个元素(类似于 take(1)。
take(n)将数据集中的前 n 个元素作为一个 array 数组返回。
takeSample(withReplacement, num, [_seed_])对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。
takeOrdered(n, [ordering])返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。
saveAsTextFile(path)将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。
saveAsSequenceFile(path)
(Java and Scala)将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int,Double,String 等等)。
saveAsObjectFile(path)
(Java and Scala)使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。
countByKey()仅适用于(K,V)类型的 RDD。返回具有每个 key 的计数的(K , Int)pairs 的 hashmap。
foreach(func)对 dataset 中每个元素运行函数 _func_。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分。

该 Spark RDD API 还暴露了一些 actions(操作)的异步版本,例如针对 foreachforeachAsync,它们会立即返回一个FutureAction 到调用者,而不是在完成 action 时阻塞。这可以用于管理或等待 action 的异步执行。

部分Transformation算子操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object transformation_func {
def main(args: Array[String]): Unit ={
val sparkConf: SparkConf = new SparkConf()
.setAppName("transformation_func")
.setMaster("local")

val sc = new SparkContext(sparkConf)

var original_rdd: RDD[(String, Int)] = sc.parallelize(Array(("a", 1), ("b", 1), ("a", 2),("c",4),("c",4)),2)


var map_rdd: RDD[(String, Int)] = original_rdd.map(x =>(x._1,x._2+1))

println("map操作:对original_rdd每个数据的第二个元素+1")
map_rdd.foreach(println)

println("filter操作:过滤掉original_rdd中,第一个元素不为a的数据")
var filter_rdd: RDD[(String, Int)] = original_rdd.filter(x => x._1 == "a")
filter_rdd.foreach(println)

println("flatmap操作:对original_rdd做映射扁平化操作")
val flatmap_rdd: RDD[Char] = original_rdd.flatMap(x=> x._1 + x._2)
flatmap_rdd.foreach(println)

println("mapPartitions操作:对original_rdd每个分区做相应操作")
// 效率要好于map 减少了发送到执行器的交互次数,但是可能会出现内存溢出
val mapPartitions_rdd: RDD[(String, Int)] = original_rdd.mapPartitions(x=>{x.map(item=>(item._1,item._2+1))})
mapPartitions_rdd.foreach(println)


println("sample操作:提取样本")
val sample_rdd: RDD[(String, Int)] = original_rdd.sample(true, 0.25)
sample_rdd.foreach(println)


println("distinct操作:去重")
val distinct_rdd: RDD[(String, Int)] = original_rdd.distinct()
distinct_rdd.foreach(println)


println("groupbykey操作:分组聚合")
val groupByKey_rdd: RDD[(String, Iterable[Int])] = original_rdd.groupByKey()
groupByKey_rdd.foreach(println)


println("reduceByKey操作:聚合")
val reduceByKey_rdd: RDD[(String, Int)] = original_rdd.reduceByKey(_+_)
reduceByKey_rdd.foreach(println)

println("sortByKey操作:排序")
val sortByKey_rdd: RDD[(String, Int)] = original_rdd.sortByKey()
sortByKey_rdd.foreach(println)

}
}

RDD 的持久化(缓存)


每当我们对 RDD 调用一个新的 action 操作时,整个 RDD 都会从头开始运算。因此,如果某个 RDD 会被反复重用的话,每次都从头计算非常低效,我们应该对多次使用的 RDD 进行一个持久化操作。


Spark 的 persist() 和 cache() 方法支持将 RDD 的数据缓存至内存或硬盘中,这样当下次对同一 RDD 进行 Action 操作时,可以直接读取 RDD 的结果,大幅提高了 Spark 的计算效率。

1
2
3
4
5
6
7
8

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd1 = rdd.map(lambda x: x+5)
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
rdd2.persist()
count = rdd2.count() // 3
first = rdd2.first() // 6
rdd2.unpersist()


在文中的代码例子中你可以看到,我们对 RDD2 进行了多个不同的 action 操作。由于在第四行我把 RDD2 的结果缓存在内存中,所以 Spark 无需从一开始的 rdd 开始算起了(持久化处理过的 RDD 只有第一次有 action 操作时才会从源头计算,之后就把结果存储下来,所以在这个例子中,count 需要从源头开始计算,而 first 不需要)。


在缓存 RDD 的时候,它所有的依赖关系也会被一并存下来。所以持久化的 RDD 有自动的容错机制。如果 RDD 的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算。持久化可以选择不同的存储级别。正如我们讲 RDD 的结构时提到的一样,有 MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY 等。cache() 方法会默认取 MEMORY_ONLY 这一级别。

RDD Checkpoint


Checkpoint 的产生就是为了相对而言更加可靠的持久化数据,在 Checkpoint 可以指定把数据放在本地并且是多副本的方式,但是在正常生产环境下放在 HDFS 上,这就天然的借助HDFS 高可靠的特征来完成最大化的可靠的持久化数据的方式

在进行 RDD 的 Checkpoint 的时候,其所依赖的所有 RDD 都会清空掉;官方建议如果要进行 checkpoint 时,必需先缓存在内存中。但实际可以考虑缓存在本地磁盘上或者是第三方组件,e.g. Taychon 上。在进行 checkpoint 之前需要通过 SparkConetxt 设置 checkpoint 的文件夹


作为最佳实践,一般在进行 checkpoint 方法调用前都要进行 persists 来把当前 RDD 的数据持久化到内存或者是磁盘上,这是因为 checkpoint 是 lazy 级别,必需有 Job 的执行且在Job 执行完成后才会从后往前回溯哪个 RDD 进行了Checkpoint 标记,然后对该标记了要进行 Checkpoint 的 RDD 新启动一个Job 执行具体 Checkpoint 的过程

RDD Shuffle

Spark 里的某些操作会触发 shuffle。shuffle 是spark 重新分配数据的一种机制,使得这些数据可以跨不同的区域进行分组。这通常涉及在 executors 和 机器之间拷贝数据,这使得 shuffle 成为一个复杂的、代价高的操作。

[
Background](http://spark.apachecn.org/#/docs/4?id=background%ef%bc%88%e5%b9%95%e5%90%8e%ef%bc%89)

为了明白 reduceByKey 操作的过程,我们以 reduceByKey 为例。reduceBykey 操作产生一个新的 RDD,其中 key 所有相同的的值组合成为一个 tuple - key 以及与 key 相关联的所有值在 reduce 函数上的执行结果。面临的挑战是,一个 key 的所有值不一定都在一个同一个 paritition 分区里,甚至是不一定在同一台机器里,但是它们必须共同被计算。


在 spark 里,特定的操作需要数据不跨分区分布。在计算期间,一个任务在一个分区上执行,为了所有数据都在单个 reduceByKey 的 reduce 任务上运行,我们需要执行一个 all-to-all 操作。它必须从所有分区读取所有的 key 和 key对应的所有的值,并且跨分区聚集去计算每个 key 的结果 - 这个过程就叫做 shuffle


尽管每个分区新 shuffle 的数据集将是确定的,分区本身的顺序也是这样,但是这些数据的顺序是不确定的。如果希望 shuffle 后的数据是有序的,可以使用:

  • mapPartitions 对每个 partition 分区进行排序,例如,.sorted
  • repartitionAndSortWithinPartitions 在分区的同时对分区进行高效的排序.
  • sortBy 对 RDD 进行全局的排序

触发的 shuffle 操作包括 repartition 操作,如 repartitioncoalesce‘ByKey 操作(除了 count 之外)像 groupByKeyreduceByKey,和 join 操作,像 cogroupjoin.


性能影响

该 **Shuffle
是一个代价比较高的操作,它涉及磁盘 I/O、数据序列化、网络 I/O。为了准备 shuffle 操作的数据,Spark 启动了一系列的任务,map 任务组织数据,reduce 完成数据的聚合。这些术语来自 MapReduce,跟 Spark 的 map 操作和 reduce 操作没有关系。


在内部,一个 map 任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在 reduce 时,任务将读取相关的已排序的数据块。


某些 shuffle 操作会大量消耗堆内存空间,因为 shuffle 操作在数据转换前后,需要在使用内存中的数据结构对数据进行组织。需要特别说明的是,reduceByKeyaggregateByKey 在 map 时会创建这些数据结构,'ByKey 操作在 reduce 时创建这些数据结构。当内存满的时候,Spark 会把溢出的数据存到磁盘上,这将导致额外的磁盘 I/O 开销和垃圾回收开销的增加。


shuffle 操作还会在磁盘上生成大量的中间文件。在 Spark 1.3 中,这些文件将会保留至对应的 RDD 不在使用并被垃圾回收为止。这么做的好处是,如果在 Spark 重新计算 RDD 的血统关系(lineage)时,shuffle 操作产生的这些中间文件不需要重新创建。如果 Spark 应用长期保持对 RDD 的引用,或者垃圾回收不频繁,这将导致垃圾回收的周期比较长。这意味着,长期运行 Spark 任务可能会消耗大量的磁盘空间。临时数据存储路径可以通过 SparkContext 中设置参数 spark.local.dir 进行配置。
shuffle 操作的行为可以通过调节多个参数进行设置。详细的说明请看 Spark 配置指南 中的 “Shuffle 行为” 部分。

Why use RDD


首先,它的数据可以尽可能地存在内存中,从而大大提高的数据处理的效率;其次它是分区存储,所以天然支持并行处理;而且它还存储了每一步骤计算结果之间的依赖关系,从而大大提升了数据容错性和错误恢复的正确率,使 Spark 更加可靠。