越来越多的实时项目需求,感觉好像各个业务线,产品都想让自己的数据动起来,并且配合上数据可视化展示出来。那么在使用Spark Streaming过程中,肯定不能避免一个问题,那就是,你全天24小时运行的实时程序,如果在某一时刻因为各种原因,停掉。当你发现实时程序已经因为故障停止运行了1个小时,或者产品运营中数据的使用者打电话给你,通知你最近一个小时的数据没有显示(尴尬😅),或者实时程序要临时升级,需要添加新的业务逻辑重新部署。
那么此时,在你还没有做offset管理的时候,你准备怎么办呢?是”auto.offset.reset” -> “latest” 从最新的offset位移数据读起,放弃那1个小时未读取到的数据,还是”auto.offset.reset” -> “earliest”, 从最早的位移数据读取,重新处理一遍所有topic数据。思考一下,这两种情况都不是一个好的办法,所以,我们要在有这个需求的程序中,来进行offset的管理,避免数据的丢失以及重复计算的问题。
- 将offset存储在外部数据存储中
- Checkpoints
- HBase
- ZooKeeper
- Kafka
- redis
- 不管理offset
上图描述了在Spark Streaming应用程序中管理offset的一般流程。offset可以通过几种方式进行管理,但通常遵循以下通用步骤。
- 在Direct DStream初始化后,可以指定每个主题分区的offset映射,以了解Direct DStream应该从哪个分区开始读取。
- 指定的偏移量与下面的第4步写入的位置相同。
- 然后可以读取和处理这批消息。
- 处理后,结果和offset都可以存储。
- _存储结果和提交偏移量_动作周围的虚线只是突出显示了一系列步骤,如果需要特殊的交付语义更严格的情况,用户可能需要进一步检查。这可能包括检查幂等运算或将结果及其偏移量存储在原子运算中。
- 最后,任何外部持久数据存储(例如HBase,Kafka,HDFS和ZooKeeper)都可以用来跟踪已处理的消息。
根据业务需求,可以将不同的方案合并到上述步骤中。Spark的编程灵活性允许用户进行细粒度的控制,以在处理的周期性阶段之前或之后存储offset。考虑发生以下情况的应用程序:Spark Streaming应用程序正在从Kafka读取消息,针对HBase数据执行转换操作,然后将操作后的消息发布到另一个topic中或单独的系统(例如,其他消息传递系统,到HBase,Solr,DBMS等)。在这种情况下,只有将消息成功发布到辅助系统后,我们才将其视为已处理。
外部存储offset
在本节中,我们探索用于在持久数据存储区中将offset持久保存在外部的不同选项。对于本节中提到的方法,如果使用spark-streaming-kafka-0-10_2.**库,建议用户将enable.auto.commit 设置为false。此配置仅适用于此版本,将enable.auto.commit 设置为true意味着offset将以config auto.commit.interval.ms控制的频率自动提交。在Spark Streaming中,将此值设置为true会在从Kafka读取消息时自动向Kafka提交偏移量,这不一定意味着Spark已完成对这些消息的处理。要启用精确的偏移量控制,请将Kafka参数enable.auto.commit 设置为 false。
Spark Streaming checkpoints
启用Spark Streaming的是存储checkpoints的最简单方法,因为它在Spark的框架中很容易获得。checkpoint是专门为保存应用程序的状态(一般情况下保存在HDFS)而设计的,以便可以在出现故障时将其恢复。
对Kafka流进行检查点将导致偏移范围存储在检查点中。如果出现故障,Spark Streaming应用程序可以开始从检查点偏移范围读取消息。但是,Spark Streaming检查点无法在Spark应用程序升级之后恢复,因此不是很可靠,尤其是当您将这种机制用于关键的生产应用程序时。我们不建议通过Spark检查点管理偏移量。
在HBase中存储offset
HBase可用作外部数据存储,以可靠的方式保留偏移范围。通过在外部存储偏移量范围,它使Spark Streaming应用程序能够从任意时间点重新启动和重播消息,只要消息在Kafka中仍然有效。
借助HBase的通用设计,该应用程序能够利用rowkey和column family 来处理跨同一表中的多个Spark Streaming应用程序和Kafka topic 存储偏移范围。在此示例中,是使用包含topic,group id和Spark Streaming 的 batchTime.milliSeconds 组合作为行键来区分写入表的每个条目。新记录将累积在我们在以下设计中配置的表格中,以在30天后自动过期。下面是HBase表的DDL和结构。
ddl
1 | create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000} |
RowKey 设计
1 | row: <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS> |
下面直接给出在HBase中,offset的管理设计流程代码:
1 | import kafka.utils.ZkUtils |
在ZooKeeper中存储offset
用户可以将偏移范围存储在ZooKeeper中,这可以类似地提供一种可靠的方法,以在最后停止的Kafka流上开始流处理。
在这种情况下,启动时,Spark Streaming作业将从ZooKeeper中检索每个主题分区的最新处理过的偏移量。如果找到了一个以前在ZooKeeper中未管理过的新分区,则默认将其最新处理的偏移量从头开始。处理完每批后,用户可以存储第一个或最后一个处理过的偏移量。此外,在ZooKeeper中存储偏移量的znode位置使用与旧Kafka使用者API相同的格式。因此,用于跟踪或监视存储在ZooKeeper中的Kafka偏移量的任何工具仍然可以使用。
初始化ZooKeeper连接,以获取和存储到ZooKeeper的偏移量:
1 | val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) |
检索存储在使用者组和主题列表的ZooKeeper中的最后偏移量的方法:
1 | def readOffsets(topics: Seq[String], groupId:String): |
使用特定的偏移量初始化Kafka Direct Dstream以开始处理:
1 | val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets)) |
将一组可恢复的偏移量持久保存到ZooKeeper的方法。
注意:_offsetPath_是一个ZooKeeper位置,表示为/ consumers / [groupId] / offsets / topic / [partitionId],用于存储偏移值
1 | def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = { |
在kafka中管理offset
在Apache Spark 2.1.x的Cloudera发行版中,spark-streaming-kafka-0-10使用了新的Consumer api,它公开了commitAsync API。使用commitAsync API,使用方可以在知道输出已存储后将偏移量提交给Kafka。新的使用者api根据使用者的_group.id_唯一地将偏移提交回Kafka 。
Persist Offsets in Kafka
1 | def createKafkaRDD(ssc: StreamingContext, config: Source) = { |
有关详细信息,请访问– http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
注意:commitAsync()是Spark Streaming和Kafka Integration的kafka-0-10版本的一部分。如Spark文档所述,此集成仍处于试验阶段,API可能会发生变化。
在Redis中存储offset
1 | import org.apache.kafka.clients.consumer.ConsumerRecord |
#### 其他方法 值得一提的是,您还可以将偏移量存储在HDFS之类的存储系统中。与上述选项相比,在HDFS中存储偏移量不太受欢迎,因为与其他系统(如ZooKeeper和HBase)相比,HDFS具有更高的延迟。此外,如果管理不当,则在HDFS中为每个批次编写offsetRanges可能会导致文件较小的问题。
不管理offset
当然,Spark Streaming应用程序并不是必须的去管理offset。对当前业务考虑好是否需要对offset进行保存。
本文参考如下:
Offset Management For Apache Kafka With Apache Spark Streaming