Spark Streaming Exactly once 语义

Exactly-Once


Exactly-Once不是指对输入的数据只处理一次,指的是, 在流计算引擎中, 算子给下游的结果是Exactly-Once的(即:给下游的结果有且仅有一个,且不重复、不少算)。


如在Spark Streaming处理过程中,从一个算子(Operator)到另一个算子(Operator),可能会因为各种不可抗力如机器挂掉等原因,导致某些Task处理失败,Spark内部会基于Lineage或Checkpoint启动重试Task去重新处理同样的数据。因不可抗力的存在,流处理引擎内部不可能做到一条数据仅被处理一次。所以,当流处理引擎声称提供Exactly-Once语义时,指的是从一个Operator到另一个Operator,同样的数据,无论重复处理多少次,最终的结果状态是Exactly-Once。

Spark Streaming保证Exactly-Once语义


一个Spark Streaming流处理程序,从广义上讲,包含三个步骤。

  • 接收数据:从Source中接收数据。
  • 转换数据:用DStream和RDD算子转换。
  • 储存数据:将结果保存至外部系统。

如果流处理程序需要实现Exactly-Once语义,那么每一个步骤都要保证Exactly-Once。

接收数据Exactly-Once

Spark Streaming上游对接kafka时如何保证Exactly Once?

简要说一下,Spark Streaming使用Direct模式对接上游kafka。无论kafka有多少个partition, 使用Direct模式总能保证Spark Streaming中有相同数量的partition与之相对, 也就是说Spark Streaming中的KafkaRDD的并发数量在Direct模式下是由上游kafka决定的。


在这个模式下,kafka的offset是作为KafkaRDD的一部分存在,会存储在checkpoints中, 由于checkpoints只存储offset内容,而不存储数据,这就使得checkpoints是相对轻的操作。 这就使得Spark Streaming在遇到故障时,可以从checkpoint中恢复上游kafka的offset,从而保证exactly once。


这里有个需要注意的地方这里有个需要注意的地方,那就是checkpoint方法在spark中主要有两块应用:


一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;


另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理 (如之前waiting batch的job会在重启后继续处理)。

1.1 checkpoint 机制能准确保证不重复消费吗?

要解决这个问题,需要明确checkpoint这个功能是如何实现的。

1.4 正确的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object feed_ops {
def main(args: Array[String]): Unit = {
val checkpointDirectory = "./check"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[2]").setAppName("feed:alg:test")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
val kafkaParams = Map[String, Object] (
"bootstrap.servers" -> "xxx:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "feed:alg:test",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
stream.map(record => (record.key, record.value)).
filter(x=>x._2.contains("kankan_v2")).filter(x=>x._2.contains("NewsList") || x._2.contains("NewsPage") ).
map(x =>(x._1, x._2.split("\u0001",0).filter(x=>x!="")) ).map(x=>(x._1, x._2.mkString("^"))).
print()
stream.foreachRDD {
rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o:OffsetRange = offsetRanges(TaskContext.get.partitionId)
println("data: ", iter.map(x=>(x.key, x.value)).take(2).foreach(println))
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
}
}

1.5 streaming中checkpoint写流程

同样,针对streaming中checkpoint的写流程,主要有以下三个问题,并对此做相关解释。
Q1:streaming中checkpoint是在何时做的?
A1:在spark streaming中,jobGenerator会定期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统做checkpoint。
此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。


Q2:在streaming checkpoint过程中,具体都写入了哪些数据到checkpoint目录?
A2: 做checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。在该方法中,首先更新当前时间段需要做checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。
其次,通过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,我们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。


Checkpoint主要包含的信息如下:

1
2
3
4
5
6
7
8
val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll

具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而我们可以看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。


Q3: streaming checkpoint都有哪些坑?
A3:从A2中可以看到,应用定义的计算函数也被序列化到checkpoint目录,当应用代码发生改变时,此时就没法从checkpoint恢复。个人感觉这是checkpoint在生产环境使用中碰到的最大障碍。


另外,当从checkpoint目录恢复streamingContext时,配置信息也都是从checkpoint读取的(只有很少的一部分配置是reload的,具体见读流程),当重启任务时,新改变的配置就可能不生效,导致很奇怪的问题。
此外,broadcast变量在checkpoint中使用也受到限制(SPARK-5206)。

输出数据Exactly-Once

首先输出操作是具有At-least Once语义的,也就是说Spark Streaming可以保证需要输出的数据一定会输出出去,只不过由于失败等原因可能会输出多次。 那么如何保证Exactly once?

  • 第一种幂等输出,就是期望下游(数据)具有幂等特性。

将kafka参数enable.auto.commit设置为false。

生产中可用Kafka、Zookeeper、HBase等保存offset。

  • 第二种使用事务更新
    1
    2
    3
    4
    5
    6
    7
    dstream.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
    }
    }
    这样保证同一个partition要么一起更新成功,要么一起失败,通过uniqueId来标识这一次的更新,这就要求下游支持事务机制。

转换数据Exactly-Once

Spark Streaming内部的实现机制是基于RDD模型的,RDD为保证计算过程中数据不丢失使用了checkpoint机制, 也就是说其计算逻辑是RDD的变换过程,也就是DAG,可以在计算过程中的任何一个阶段(也就是这个阶段的RDD) 上使用checkpoint方法,就可以保证当后续计算失败,可以从这个checkpoint重新算起,使得计算延续下去。


当Spark Streaming场景下,其天然会进行batch操作,也就是说kafka过来的数据, 每秒(一个固定batch的时间周期)会对当前kafka中的数据产生一个RDD, 那么后续计算就是在这个RDD上进行的。只需要在kafkaRDD这个位置合理使用了checkpoint (这一点在前面已经讲过,可以保证)就能保证Spark Streaming内部的Exactly once。


注意一点:Spark Streaming中没有Tuple级别的ACK,其操作必然是在RDD的某个partition上的, 要么全做,要么不做,要么失败,要么成功,都是基于RDD的partition的。