Spark 广播变量和累加器

共享变量是 Spark 中进阶特性之一,一共有两种:

  • 广播变量
  • 累加器


这两种变量可以认为是在用算子定义的数据管道外的两个全局变量,供所有计算任务使用。在 Spark 作业中,用户编写的高阶函数会在集群中的 Executor 里执行,这些 Executor 可能会用到相同的变量,这些变量被复制到每个 Executor 中,而 Executor 对变量的更新不会传回 Driver。


在计算任务中支持通用的可读写变量一般是低效的,即便如此,Spark 还是提供了两类共享变量:广播变量(broadcast variable)与累加器(accumulator)。当然,对于分布式变量,如果不加限制会出现一致性的问题,所以共享变量是两种非常特殊的变量。


广播变量:只读;
累加器:只能增加。
**

广播变量

广播变量类似于 MapReduce 中的 DistributeFile,通常来说是一份不大的数据集,一旦广播变量在 Driver 中被创建,整个数据集就会在集群中进行广播,能让所有正在运行的计算任务以只读方式访问。广播变量支持一些简单的数据类型,如整型、集合类型等,也支持很多复杂数据类型,如一些自定义的数据类型。


广播变量为了保证数据被广播到所有节点,使用了很多办法。这其实是一个很重要的问题,我们不能期望 100 个或者 1000 个 Executor 去连接 Driver,并拉取数据,这会让 Driver 不堪重负。Executor 采用的是通过 HTTP 连接去拉取数据,类似于 BitTorrent 点对点传输。这样的方式更具扩展性,避免了所有 Executor 都去向 Driver 请求数据而造成 Driver 故障。


Spark 广播机制运作方式是这样的:Driver 将已序列化的数据切分成小块,然后将其存储在自己的块管理器 BlockManager 中。

BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager对外提供get和set数据接口,可将数据存储在memory, disk, off-heap

当Executor 开始运行时,每个 Executor 首先从自己的内部块管理器中试图获取广播变量,如果以前广播过,那么直接使用;如果没有,Executor 就会从 Driver 或者其他可用的 Executor 去拉取数据块。一旦拿到数据块,就会放到自己的块管理器中。供自己和其他需要拉取的 Executor 使用。这就很好地防止了 Driver 单点的性能瓶颈,如下图所示。
image.png
在 Spark 作业中创建、使用广播变量。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
scala> val i = 5
i: Int = 5
scala> val bi = sc.broadcast(i)
bi: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(147)
scala> bi.value
res166: Int = 5
scala> rdd_one.take(5)
res164: Array[Int] = Array(1, 2, 3)
scala> rdd_one.map(j => j + bi.value).take(5)
res165: Array[Int] = Array(6, 7, 8)

在用户定义的高阶函数中,可以直接使用广播变量的引用。下面看一个集合类型的广播变量:

1
2
3
4
5
6
7
8
9
10
11
scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[109] at
parallelize at <console>:25
scala> val m = scala.collection.mutable.HashMap(1 -> 2, 2 -> 3, 3 -> 4)
m: scala.collection.mutable.HashMap[Int,Int] = Map(2 -> 3, 1 -> 2, 3 -> 4)
scala> val bm = sc.broadcast(m)
bm:
org.apache.spark.broadcast.Broadcast[scala.collection.mutable.HashMap[Int,I
nt]] = Broadcast(178)
scala> rdd_one.map(j => j * bm.value(j)).take(5)
res191: Array[Int] = Array(2, 6, 12)

该例中,元素乘以元素对应值得到最后结果。广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除,这时,如果计算任务又用到广播变量,那么就会重新拉取数据,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
    ...
scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
scala> val k = 5
k: Int = 5
scala> val bk = sc.broadcast(k)
bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163)
scala> rdd_one.map(j => j + bk.value).take(5)
res184: Array[Int] = Array(6, 7, 8)
scala> bk.unpersist
scala> rdd_one.map(j => j + bk.value).take(5)
res186: Array[Int] = Array(6, 7, 8)

你还可以使用 destroy 方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
scala> val k = 5
k: Int = 5
scala> val bk = sc.broadcast(k)
bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163)
scala> rdd_one.map(j => j + bk.value).take(5)
res184: Array[Int] = Array(6, 7, 8)
scala> bk.destroy
scala> rdd_one.map(j => j + bk.value).take(5)
17/05/27 14:07:28 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Attempted to use Broadcast(163) after it
was destroyed (destroy at <console>:30)
at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mc
V$sp(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$wri

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.spark.sql.SparkSession

object BigRDDJoinSmallRDD {

def main(args: Array[String]): Unit = {

val sparkSession = SparkSession.builder().master("local[3]").appName("BigRDD Join SmallRDD").getOrCreate()
val sc = sparkSession.sparkContext
val list1 = List(("jame",23), ("wade",3), ("kobe",24))
val list2 = List(("jame", 13), ("wade",6), ("kobe",16))
val bigRDD = sc.makeRDD(list1)
val smallRDD = sc.makeRDD(list2)

println(bigRDD.getNumPartitions)
println(smallRDD.getNumPartitions)

// driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array
val smallRDDB= sc.broadcast(smallRDD.collect())

val joinedRDD = bigRDD.mapPartitions(partition => {
val smallRDDBV = smallRDDB.value // 各个executor端的task读取广播value
partition.map(element => {
//println(joinUtil(element, smallRDDBV))
joinUtil(element, smallRDDBV)
})
})
joinedRDD.foreach(x => println(x))
}
/**
* join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2))
* 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2))
* 该方法会在各executor的task上执行
* */
def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = {
var joinEle:(String, (Int, Int)) = null
// 遍历数组smallRDD
smallRDD.foreach(smallRDDEle => {
if(smallRDDEle._1.equals(bigRDDEle._1)){
// 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果
joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2))
}
})
joinEle
}

}

这样,相当于先将小表进行广播,广播到每个 Executor 的内存中,供 map 函数使用,这就避免了 Shuffle,虽然语义上还是 join(小表放内存),但无论是资源消耗还是执行时间,都要远优于前面两种方式。

累加器

与广播变量只读不同,累加器是一种只能进行增加操作的共享变量。如果你想知道记录中有多少错误数据,一种方法是针对这种错误数据编写额外逻辑,另一种方式是使用累加器。用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    ...
scala> val acc1 = sc.longAccumulator("acc1")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355,
name: Some(acc1), value: 0)
scala> val someRDD = tableRDD.map(x => {acc1.add(1); x})
someRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[99] at map at
<console>:29
scala> acc1.value
res156: Long = 0 /*there has been no action on the RDD so accumulator did
not get incremented*/
scala> someRDD.count
res157: Long = 351
scala> acc1.value
res158: Long = 351
scala> acc1
res145: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355,
name: Some(acc1), value: 351)

上面这个例子用 SparkContext 初始化了一个长整型的累加器。LongAccumulator 方法会将累加器变量置为 0。行动算子 count 触发计算后,累加器在 map 函数中被调用,其值会一直增加,最后定格为 351。Spark 内置的累加器有如下几种。

  • LongAccumulator:长整型累加器,用于求和、计数、求均值的 64 位整数。
  • DoubleAccumulator:双精度型累加器,用于求和、计数、求均值的双精度浮点数。
  • CollectionAccumulator[T]:集合型累加器,可以用来收集所需信息的集合。


所有这些累加器都是继承自 AccumulatorV2,如果这些累加器还是不能满足用户的需求,Spark 允许自定义累加器。如果需要某两列进行汇总,无疑自定义累加器比直接编写逻辑要方便很多,例如:
image.png
这个表只有两列,需要统计 A 列与 B 列的汇总值。下面来看看根据上面的逻辑如何实现一个自定义累加器。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

// 构造一个保存累加结果的类
case class SumAandB(A: Long, B: Long)

class FieldAccumulator extends AccumulatorV2[SumAandB,SumAandB] {

private var A:Long = 0L
private var B:Long = 0L
// 如果A和B同时为0,则累加器值为0
override def isZero: Boolean = A == 0 && B == 0L
// 复制一个累加器
override def copy(): FieldAccumulator = {
val newAcc = new FieldAccumulator
newAcc.A = this.A
newAcc.B = this.B
newAcc
}
// 重置累加器为0
override def reset(): Unit = { A = 0 ; B = 0L }
// 用累加器记录汇总结果
override def add(v: SumAandB): Unit = {
A += v.A
B += v.B
}
// 合并两个累加器
override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = {
other match {
case o: FieldAccumulator => {
A += o.A
B += o.B}
case _ =>
}
}
// 当Spark调用时返回结果
override def value: SumAandB = SumAandB(A,B)
}

凡是有关键字 override 的方法,均是重载实现自己逻辑的方法。累加器调用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.spark.examples.rdd

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

class Driver extends App{

val conf = new SparkConf
val sc = new SparkContext(conf)
val filedAcc = new FieldAccumulator
sc.register(filedAcc, " filedAcc ")
// 过滤掉表头
val tableRDD = sc.textFile("table.csv").filter(_.split(",")(0) != "A")
tableRDD.map(x => {
val fields = x.split(",")
val a = fields(1).toInt
val b = fields(2).toLong
filedAcc.add(SumAandB (a, b))
x
}).count
}

总结

本课时主要介绍了 Spark 的两种共享变量,注意体会广播变量最后介绍的 map 端 join 的场景,这在实际使用中非常普遍。另外广播变量的大小,按照我的经验,要根据 Executor 和 Worker 资源来确定,几十兆、一个 G 的广播变量在大多数情况不会有什么问题,如果资源充足,那么1G~10G 以内问题也不大。