scala> val rdd_one = sc.parallelize(Seq(1,2,3)) rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at parallelize at <console>:25 scala> val k = 5 k: Int = 5 scala> val bk = sc.broadcast(k) bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163) scala> rdd_one.map(j => j + bk.value).take(5) res184: Array[Int] = Array(6, 7, 8) scala> bk.destroy scala> rdd_one.map(j => j + bk.value).take(5) 17/05/2714:07:28 ERROR Utils: Exception encountered org.apache.spark.SparkException: Attempted to use Broadcast(163) after it was destroyed (destroy at <console>:30) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mc V$sp(TorrentBroadcast.scala:202) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$wri
... scala> val acc1 = sc.longAccumulator("acc1") acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355, name: Some(acc1), value: 0) scala> val someRDD = tableRDD.map(x => {acc1.add(1); x}) someRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[99] at map at <console>:29 scala> acc1.value res156: Long = 0 /*there has been no action on the RDD so accumulator did not get incremented*/ scala> someRDD.count res157: Long = 351 scala> acc1.value res158: Long = 351 scala> acc1 res145: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355, name: Some(acc1), value: 351)
所有这些累加器都是继承自 AccumulatorV2,如果这些累加器还是不能满足用户的需求,Spark 允许自定义累加器。如果需要某两列进行汇总,无疑自定义累加器比直接编写逻辑要方便很多,例如: 这个表只有两列,需要统计 A 列与 B 列的汇总值。下面来看看根据上面的逻辑如何实现一个自定义累加器。代码如下:
package com.spark.examples.rdd import org.apache.spark.SparkConf import org.apache.spark.SparkContext class Driver extends App{
val conf = new SparkConf val sc = new SparkContext(conf) val filedAcc = new FieldAccumulator sc.register(filedAcc, " filedAcc ") // 过滤掉表头 val tableRDD = sc.textFile("table.csv").filter(_.split(",")(0) != "A") tableRDD.map(x => { val fields = x.split(",") val a = fields(1).toInt val b = fields(2).toLong filedAcc.add(SumAandB (a, b)) x }).count }