共享变量是 Spark 中进阶特性之一,一共有两种:
这两种变量可以认为是在用算子定义的数据管道外的两个全局变量 ,供所有计算任务使用。在 Spark 作业中,用户编写的高阶函数会在集群中的 Executor 里执行,这些 Executor 可能会用到相同的变量,这些变量被复制到每个 Executor 中,而 Executor 对变量的更新不会传回 Driver。 在计算任务中支持通用的可读写变量一般是低效的,即便如此,Spark 还是提供了两类共享变量:广播变量(broadcast variable)与累加器(accumulator)。当然,对于分布式变量,如果不加限制会出现一致性的问题,所以共享变量是两种非常特殊的变量。广播变量:只读; 累加器:只能增加。 **
广播变量 广播变量类似于 MapReduce 中的 DistributeFile,通常来说是一份不大的数据集,一旦广播变量在 Driver 中被创建,整个数据集就会在集群中进行广播,能让所有正在运行的计算任务以只读方式访问。广播变量支持一些简单的数据类型,如整型、集合类型等,也支持很多复杂数据类型,如一些自定义的数据类型。 广播变量为了保证数据被广播到所有节点,使用了很多办法。这其实是一个很重要的问题,我们不能期望 100 个或者 1000 个 Executor 去连接 Driver,并拉取数据,这会让 Driver 不堪重负。Executor 采用的是通过 HTTP 连接去拉取数据,类似于 BitTorrent 点对点传输。 这样的方式更具扩展性,避免了所有 Executor 都去向 Driver 请求数据而造成 Driver 故障。 Spark 广播机制运作方式是这样的:Driver 将已序列化的数据切分成小块,然后将其存储在自己的块管理器 BlockManager 中。
BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager对外提供get和set数据接口,可将数据存储在memory, disk, off-heap
当Executor 开始运行时,每个 Executor 首先从自己的内部块管理器中试图获取广播变量,如果以前广播过,那么直接使用;如果没有,Executor 就会从 Driver 或者其他可用的 Executor 去拉取数据块。一旦拿到数据块,就会放到自己的块管理器中。供自己和其他需要拉取的 Executor 使用。这就很好地防止了 Driver 单点的性能瓶颈,如下图所示。 在 Spark 作业中创建、使用广播变量。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 scala> val rdd_one = sc.parallelize(Seq(1 ,2 ,3 )) rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101 ] at parallelize at <console>:25 scala> val i = 5 i: Int = 5 scala> val bi = sc.broadcast(i) bi: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(147 ) scala> bi.value res166: Int = 5 scala> rdd_one.take(5 ) res164: Array[Int] = Array(1 , 2 , 3 ) scala> rdd_one.map(j => j + bi.value).take(5) res165: Array[Int] = Array(6 , 7 , 8 )
在用户定义的高阶函数中,可以直接使用广播变量的引用。下面看一个集合类型的广播变量:
1 2 3 4 5 6 7 8 9 10 11 scala> val rdd_one = sc.parallelize(Seq(1 ,2 ,3 )) rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[109 ] at parallelize at <console>:25 scala> val m = scala.collection.mutable.HashMap(1 -> 2, 2 -> 3, 3 -> 4) m: scala.collection.mutable.HashMap[Int,Int] = Map(2 -> 3, 1 -> 2, 3 -> 4) scala> val bm = sc.broadcast(m) bm: org.apache.spark.broadcast.Broadcast[scala.collection.mutable.HashMap[Int,I nt]] = Broadcast(178 ) scala> rdd_one.map(j => j * bm.value(j)).take(5) res191: Array[Int] = Array(2 , 6 , 12 )
该例中,元素乘以元素对应值得到最后结果。广播变量会持续占用内存,当我们不需要的时候,可以用 unpersist 算子将其移除,这时,如果计算任务又用到广播变量,那么就会重新拉取数据,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 ... scala> val rdd_one = sc.parallelize(Seq(1 ,2 ,3 )) rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101 ] at parallelize at <console>:25 scala> val k = 5 k: Int = 5 scala> val bk = sc.broadcast(k) bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163 ) scala> rdd_one.map(j => j + bk.value).take(5) res184: Array[Int] = Array(6 , 7 , 8 ) scala> bk.unpersist scala> rdd_one.map(j => j + bk.value).take(5) res186: Array[Int] = Array(6 , 7 , 8 )
你还可以使用 destroy 方法彻底销毁广播变量 ,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 scala> val rdd_one = sc.parallelize(Seq(1 ,2 ,3 )) rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101 ] at parallelize at <console>:25 scala> val k = 5 k: Int = 5 scala> val bk = sc.broadcast(k) bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163 ) scala> rdd_one.map(j => j + bk.value).take(5) res184: Array[Int] = Array(6 , 7 , 8 ) scala> bk.destroy scala> rdd_one.map(j => j + bk.value).take(5) 17 /05 /27 14 :07 :28 ERROR Utils: Exception encounteredorg.apache.spark.SparkException: Attempted to use Broadcast(163 ) after it was destroyed (destroy at <console>:30 ) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1. apply$mc V$sp(TorrentBroadcast.scala:202 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$wri
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.apache.spark.sql.SparkSessionobject BigRDDJoinSmallRDD { def main (args: Array[String]) : Unit = { val sparkSession = SparkSession.builder().master("local[3]" ).appName("BigRDD Join SmallRDD" ).getOrCreate() val sc = sparkSession.sparkContext val list1 = List(("jame" ,23 ), ("wade" ,3 ), ("kobe" ,24 )) val list2 = List(("jame" , 13 ), ("wade" ,6 ), ("kobe" ,16 )) val bigRDD = sc.makeRDD(list1) val smallRDD = sc.makeRDD(list2) println(bigRDD.getNumPartitions) println(smallRDD.getNumPartitions) // driver端rdd不broadcast广播smallRDD到各executor,RDD不能被broadcast,需要转换成数组array val smallRDDB= sc.broadcast(smallRDD.collect()) val joinedRDD = bigRDD.mapPartitions(partition => { val smallRDDBV = smallRDDB.value // 各个executor端的task读取广播value partition.map(element => { //println(joinUtil(element, smallRDDBV)) joinUtil(element, smallRDDBV) }) }) joinedRDD.foreach(x => println(x)) } /** * join操作:对两个rdd中的相同key的value1和value2进行聚合,即(key,value1).join(key,value2)得到(key,(value1, vlaue2)) * 如果bigRDDEle的key和smallRDD的某个key一致,那么返回(key,(value1, vlaue2)) * 该方法会在各executor的task上执行 * */ def joinUtil (bigRDDEle:(String,Int) , smallRDD: Array[(String, Int) ]) : (String, (Int,Int)) = { var joinEle:(String, (Int, Int)) = null // 遍历数组smallRDD smallRDD.foreach(smallRDDEle => { if (smallRDDEle._1.equals(bigRDDEle._1)){ // 如果bigRDD中某个元素的key和数组smallRDD的key一致,返回join结果 joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2)) } }) joinEle } }
这样,相当于先将小表进行广播,广播到每个 Executor 的内存中,供 map 函数使用,这就避免了 Shuffle,虽然语义上还是 join(小表放内存),但无论是资源消耗还是执行时间,都要远优于前面两种方式。
累加器 与广播变量只读不同,累加器是一种只能进行增加操作的共享变量。如果你想知道记录中有多少错误数据,一种方法是针对这种错误数据编写额外逻辑,另一种方式是使用累加器。用法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ... scala> val acc1 = sc.longAccumulator("acc1" ) acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355 , name: Some(acc1), value: 0 ) scala> val someRDD = tableRDD.map(x => {acc1.add(1); x}) someRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[99 ] at map at <console>:29 scala> acc1.value res156: Long = 0 /*there has been no action on the RDD so accumulator did not get incremented*/scala> someRDD.count res157: Long = 351 scala> acc1.value res158: Long = 351 scala> acc1 res145: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355 , name: Some(acc1), value: 351 )
上面这个例子用 SparkContext 初始化了一个长整型的累加器。LongAccumulator 方法会将累加器变量置为 0。行动算子 count 触发计算后,累加器在 map 函数中被调用,其值会一直增加,最后定格为 351。Spark 内置的累加器有如下几种。
LongAccumulator:长整型累加器,用于求和、计数、求均值的 64 位整数。 DoubleAccumulator:双精度型累加器,用于求和、计数、求均值的双精度浮点数。 CollectionAccumulator[T]:集合型累加器,可以用来收集所需信息的集合。 所有这些累加器都是继承自 AccumulatorV2,如果这些累加器还是不能满足用户的需求,Spark 允许自定义累加器。如果需要某两列进行汇总,无疑自定义累加器比直接编写逻辑要方便很多,例如: 这个表只有两列,需要统计 A 列与 B 列的汇总值。下面来看看根据上面的逻辑如何实现一个自定义累加器。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import org.apache.spark.util.AccumulatorV2import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.SparkConf // 构造一个保存累加结果的类 case class SumAandB (A: Long, B: Long) class FieldAccumulator extends AccumulatorV2[SumAandB,SumAandB] { private var A:Long = 0L private var B:Long = 0L // 如果A和B同时为0 ,则累加器值为0 override def isZero : Boolean = A == 0 && B == 0L // 复制一个累加器 override def copy () : FieldAccumulator = { val newAcc = new FieldAccumulator newAcc.A = this.A newAcc.B = this.B newAcc } // 重置累加器为0 override def reset () : Unit = { A = 0 ; B = 0L } // 用累加器记录汇总结果 override def add (v: SumAandB) : Unit = { A += v.A B += v.B } // 合并两个累加器 override def merge (other: AccumulatorV2[SumAandB, SumAandB]) : Unit = { other match { case o: FieldAccumulator => { A += o.A B += o.B} case _ => } } // 当Spark调用时返回结果 override def value : SumAandB = SumAandB(A,B) }
凡是有关键字 override 的方法,均是重载实现自己逻辑的方法。累加器调用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.spark.examples.rdd import org.apache.spark.SparkConfimport org.apache.spark.SparkContext class Driver extends App{ val conf = new SparkConf val sc = new SparkContext(conf) val filedAcc = new FieldAccumulator sc.register(filedAcc, " filedAcc " ) // 过滤掉表头 val tableRDD = sc.textFile("table.csv" ).filter(_.split("," )(0 ) != "A" ) tableRDD.map(x => { val fields = x.split("," ) val a = fields(1 ).toInt val b = fields(2 ).toLong filedAcc.add(SumAandB (a, b)) x }).count }
总结 本课时主要介绍了 Spark 的两种共享变量,注意体会广播变量最后介绍的 map 端 join 的场景,这在实际使用中非常普遍。另外广播变量的大小,按照我的经验,要根据 Executor 和 Worker 资源来确定,几十兆、一个 G 的广播变量在大多数情况不会有什么问题,如果资源充足,那么1G~10G 以内问题也不大。