Spark Streaming offset 管理


越来越多的实时项目需求,感觉好像各个业务线,产品都想让自己的数据动起来,并且配合上数据可视化展示出来。那么在使用Spark Streaming过程中,肯定不能避免一个问题,那就是,你全天24小时运行的实时程序,如果在某一时刻因为各种原因,停掉。当你发现实时程序已经因为故障停止运行了1个小时,或者产品运营中数据的使用者打电话给你,通知你最近一个小时的数据没有显示(尴尬😅),或者实时程序要临时升级,需要添加新的业务逻辑重新部署。


那么此时,在你还没有做offset管理的时候,你准备怎么办呢?是”auto.offset.reset” -> “latest” 从最新的offset位移数据读起,放弃那1个小时未读取到的数据,还是”auto.offset.reset” -> “earliest”, 从最早的位移数据读取,重新处理一遍所有topic数据。思考一下,这两种情况都不是一个好的办法,所以,我们要在有这个需求的程序中,来进行offset的管理,避免数据的丢失以及重复计算的问题。

  • 将offset存储在外部数据存储中
    • Checkpoints
    • HBase
    • ZooKeeper
    • Kafka
    • redis
  • 不管理offset

image.jpeg

上图描述了在Spark Streaming应用程序中管理offset的一般流程。offset可以通过几种方式进行管理,但通常遵循以下通用步骤。

  1. 在Direct DStream初始化后,可以指定每个主题分区的offset映射,以了解Direct DStream应该从哪个分区开始读取。
    1. 指定的偏移量与下面的第4步写入的位置相同。
  2. 然后可以读取和处理这批消息。
  3. 处理后,结果和offset都可以存储。
    1. _存储结果提交偏移量_动作周围的虚线只是突出显示了一系列步骤,如果需要特殊的交付语义更严格的情况,用户可能需要进一步检查。这可能包括检查幂等运算或将结果及其偏移量存储在原子运算中。
  4. 最后,任何外部持久数据存储(例如HBase,Kafka,HDFS和ZooKeeper)都可以用来跟踪已处理的消息。

根据业务需求,可以将不同的方案合并到上述步骤中。Spark的编程灵活性允许用户进行细粒度的控制,以在处理的周期性阶段之前或之后存储offset。考虑发生以下情况的应用程序:Spark Streaming应用程序正在从Kafka读取消息,针对HBase数据执行转换操作,然后将操作后的消息发布到另一个topic中或单独的系统(例如,其他消息传递系统,到HBase,Solr,DBMS等)。在这种情况下,只有将消息成功发布到辅助系统后,我们才将其视为已处理。

外部存储offset


在本节中,我们探索用于在持久数据存储区中将offset持久保存在外部的不同选项。对于本节中提到的方法,如果使用spark-streaming-kafka-0-10_2.**库,建议用户将enable.auto.commit 设置为false。此配置仅适用于此版本,将enable.auto.commit 设置为true意味着offset将以config auto.commit.interval.ms控制的频率自动提交。在Spark Streaming中,将此值设置为true会在从Kafka读取消息时自动向Kafka提交偏移量,这不一定意味着Spark已完成对这些消息的处理。要启用精确的偏移量控制,请将Kafka参数enable.auto.commit 设置为 false。

Spark Streaming checkpoints

启用Spark Streaming的是存储checkpoints的最简单方法,因为它在Spark的框架中很容易获得。checkpoint是专门为保存应用程序的状态(一般情况下保存在HDFS)而设计的,以便可以在出现故障时将其恢复。

对Kafka流进行检查点将导致偏移范围存储在检查点中。如果出现故障,Spark Streaming应用程序可以开始从检查点偏移范围读取消息。但是,Spark Streaming检查点无法在Spark应用程序升级之后恢复,因此不是很可靠,尤其是当您将这种机制用于关键的生产应用程序时。我们不建议通过Spark检查点管理偏移量。

在HBase中存储offset


HBase可用作外部数据存储,以可靠的方式保留偏移范围。通过在外部存储偏移量范围,它使Spark Streaming应用程序能够从任意时间点重新启动和重播消息,只要消息在Kafka中仍然有效。

借助HBase的通用设计,该应用程序能够利用rowkey和column family 来处理跨同一表中的多个Spark Streaming应用程序和Kafka topic 存储偏移范围。在此示例中,是使用包含topic,group id和Spark Streaming 的 batchTime.milliSeconds 组合作为行键来区分写入表的每个条目。新记录将累积在我们在以下设计中配置的表格中,以在30天后自动过期。下面是HBase表的DDL和结构。


ddl

1
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}


RowKey 设计

1
2
3
4
row:              <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family: offsets
qualifier: <PARTITION_ID>
value: <OFFSET_ID>


下面直接给出在HBase中,offset的管理设计流程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Scan, Put, ConnectionFactory}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}



/**
* Created by gmedasani on 6/10/17.
*/
object KafkaOffsetsBlogStreamingDriver {

def main(args: Array[String]) {

if (args.length < 6) {
System.err.println("Usage: KafkaDirectStreamTest <batch-duration-in-seconds> <kafka-bootstrap-servers> " +
"<kafka-topics> <kafka-consumer-group-id> <hbase-table-name> <kafka-zookeeper-quorum>")
System.exit(1)
}

val batchDuration = args(0)
val bootstrapServers = args(1).toString
val topicsSet = args(2).toString.split(",").toSet
val consumerGroupID = args(3)
val hbaseTableName = args(4)
val zkQuorum = args(5)
val zkKafkaRootDir = "kafka"
val zkSessionTimeOut = 10000
val zkConnectionTimeOut = 10000

val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
.setMaster("local[4]")//Uncomment this line to test while developing on a workstation
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
val topics = topicsSet.toArray
val topic = topics(0)

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrapServers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> consumerGroupID,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

/*
Create a dummy process that simply returns the message as is.
*/
def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={
message
}

/*
Save Offsets into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],hbaseTableName:String,
batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
val put = new Put(rowKey.getBytes)
for(offset <- offsetRanges){
put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
Bytes.toBytes(offset.untilOffset.toString))
}
table.put(put)
conn.close()
}

/*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
- CASE 1: SparkStreaming job is started for the first time. This function gets the number of topic partitions from
Zookeeper and for each partition returns the last committed offset as 0
- CASE 2: SparkStreaming is restarted and there are no changes to the number of partitions in a topic. Last
committed offsets for each topic-partition is returned as is from HBase.
- CASE 3: SparkStreaming is restarted and the number of partitions in a topic increased. For old partitions, last
committed offsets for each topic-partition is returned as is from HBase as is. For newly added partitions,
function returns last committed offsets as 0
*/
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,zkQuorum:String,
zkRootDir:String, sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={

val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val zkUrl = zkQuorum+"/"+zkRootDir
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl,sessionTimeout,connectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size

//Connect to HBase to retrieve last committed offsets
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
val scan = new Scan()
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
val result = scanner.next()

var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
if (result != null){
//If the result from hbase scanner is not null, set number of partitions from hbase to the number of cells
hbaseNumberOfPartitionsForTopic = result.listCells().size()
}

val fromOffsets = collection.mutable.Map[TopicPartition,Long]()

if(hbaseNumberOfPartitionsForTopic == 0){
// initialize fromOffsets to beginning
for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)}
} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
// handle scenario where new partitions have been added to existing kafka topic
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)}
} else {
//initialize fromOffsets from last run
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)}
}
scanner.close()
conn.close()
fromOffsets.toMap
}


val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,zkKafkaRootDir,
zkSessionTimeOut,zkConnectionTimeOut)
val inputDStream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Assign[String, String](
fromOffsets.keys,kafkaParams,fromOffsets))

/*
For each RDD in a DStream apply a map transformation that processes the message.
*/
inputDStream.foreachRDD((rdd,batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset))
val newRDD = rdd.map(message => processMessage(message))
newRDD.count()
saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime) //save the offsets to HBase
})

println("Number of messages processed " + inputDStream.count())
ssc.start()
ssc.awaitTermination()
}
}

在ZooKeeper中存储offset


用户可以将偏移范围存储在ZooKeeper中,这可以类似地提供一种可靠的方法,以在最后停止的Kafka流上开始流处理。

在这种情况下,启动时,Spark Streaming作业将从ZooKeeper中检索每个主题分区的最新处理过的偏移量。如果找到了一个以前在ZooKeeper中未管理过的新分区,则默认将其最新处理的偏移量从头开始。处理完每批后,用户可以存储第一个或最后一个处理过的偏移量。此外,在ZooKeeper中存储偏移量的znode位置使用与旧Kafka使用者API相同的格式。因此,用于跟踪或监视存储在ZooKeeper中的Kafka偏移量的任何工具仍然可以使用。

初始化ZooKeeper连接,以获取和存储到ZooKeeper的偏移量:

1
2
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)

检索存储在使用者组和主题列表的ZooKeeper中的最后偏移量的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def readOffsets(topics: Seq[String], groupId:String):
Map[TopicPartition, Long] = {

val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)

// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition

try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)

topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}

} catch {
case e: Exception =>
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)

topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
}
})
})

topicPartOffsetMap.toMap
}

使用特定的偏移量初始化Kafka Direct Dstream以开始处理:

1
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))

将一组可恢复的偏移量持久保存到ZooKeeper的方法。
注意:_offsetPath_是一个ZooKeeper位置,表示为/ consumers / [groupId] / offsets / topic / [partitionId],用于存储偏移值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);

val acls = new ListBuffer[ACL]()
val acl = new ACL
acl.setId(ANYONE_ID_UNSAFE)
acl.setPerms(PERMISSIONS_ALL)
acls += acl

val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))

LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
}

在kafka中管理offset


在Apache Spark 2.1.x的Cloudera发行版中,spark-streaming-kafka-0-10使用了新的Consumer api,它公开了commitAsync API。使用commitAsync API,使用方可以在知道输出已存储后将偏移量提交给Kafka。新的使用者api根据使用者的_group.id_唯一地将偏移提交回Kafka 。

Persist Offsets in Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def createKafkaRDD(ssc: StreamingContext, config: Source) = {
var SparkDStream: InputDStream[ConsumerRecord[String, String]] = null
try {
SparkDStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> config.group,
"auto.offset.reset" -> config.offset
)
/*
"enable.auto.commit" -> config.getString("kafkaSource.enable.auto.commit"))*/
// val subscribeTopics = config.getStringList("kafkaSource.topics").toIterable
import scala.collection.JavaConversions._
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](config.topic.toList, kafkaParams)
)
kafkaStream
}
} catch {
case e: Throwable => {
throw new Exception("Couldn't init Spark stream processing", e)
}
}
SparkDStream
}

var inputDStream: InputDStream[ConsumerRecord[String, String]] = createKafkaRDD()
inputDStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 更新 Offset 值
inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

有关详细信息,请访问– http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
注意:commitAsync()是Spark Streaming和Kafka Integration的kafka-0-10版本的一部分。如Spark文档所述,此集成仍处于试验阶段,API可能会发生变化。

在Redis中存储offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._

import scala.collection.JavaConverters._
import scala.util.Try

object KafkaOffsetsBlogStreamingDriver {
/**
* 根据groupId保存offset
* @param ranges
* @param groupId
*/
def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
for (o <- ranges) {
val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}"
val value = o.untilOffset
JedisUtil.set(key, value.toString)
}
}

/**
* 根据topic,groupid获取offset
* @param topics
* @param groupId
* @return
*/
def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = {
val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]()

topics.foreach(topic => {
val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*")
if (!keys.isEmpty) {
keys.asScala.foreach(key => {
val offset = JedisUtil.get(key)
val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong)
})
}
})
if (fromOffSets.isEmpty) {
(fromOffSets.toMap, 0)
} else {
(fromOffSets.toMap, 1)
}
}

/**
* 创建InputDStream,如果auto.offset.reset为latest则从redis读取
* @param ssc
* @param topic
* @param kafkaParams
* @return
*/
def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String],
kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = {
var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
val groupId = kafkaParams.get("group.id").get
val (fromOffSet, flag) = getOffset(topic, groupId.toString)
val offsetReset = kafkaParams.get("auto.offset.reset").get
if (flag == 1 && offsetReset.equals("latest")) {
kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet))
} else {
kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams))
}
kafkaStreams
}

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "binlog.test.rpt_test_1min",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "20000",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
val topic = Array("weibo_keyword")
val groupId = "test"
val lines = createStreamingContextRedis(ssc, topic, kafkaParams)
lines.foreachRDD(rdds => {
if (!rdds.isEmpty()) {
println("##################:" + rdds.count())
}
storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)
})

ssc.start()
ssc.awaitTermination()
}
}
import java.util

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool, JedisPoolConfig}

object JedisUtil {
private val config = ConfigFactory.load("realtime-etl.conf")

private val redisHosts: String = config.getString("redis.server")
private val port: Int = config.getInt("redis.port")

private val hostAndPortsSet: java.util.Set[HostAndPort] = new util.HashSet[HostAndPort]()
redisHosts.split(",").foreach(host => {
hostAndPortsSet.add(new HostAndPort(host, port))
})


private val jedisConf: JedisPoolConfig = new JedisPoolConfig()
jedisConf.setMaxTotal(5000)
jedisConf.setMaxWaitMillis(50000)
jedisConf.setMaxIdle(300)
jedisConf.setTestOnBorrow(true)
jedisConf.setTestOnReturn(true)
jedisConf.setTestWhileIdle(true)
jedisConf.setMinEvictableIdleTimeMillis(60000l)
jedisConf.setTimeBetweenEvictionRunsMillis(3000l)
jedisConf.setNumTestsPerEvictionRun(-1)

lazy val redis = new JedisCluster(hostAndPortsSet, jedisConf)

def get(key: String): String = {
try {
redis.get(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}

def set(key: String, value: String) = {
try {
redis.set(key, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}


def hmset(key: String, map: java.util.Map[String, String]): Unit = {
// val redis=pool.getResource
try {
redis.hmset(key, map)
}catch {
case e:Exception => e.printStackTrace()
}
}

def hset(key: String, field: String, value: String): Unit = {
// val redis=pool.getResource
try {
redis.hset(key, field, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}

def hget(key: String, field: String): String = {
try {
redis.hget(key, field)
}catch {
case e:Exception => e.printStackTrace()
null
}
}

def hgetAll(key: String): java.util.Map[String, String] = {
try {
redis.hgetAll(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}
}

#### 其他方法 值得一提的是,您还可以将偏移量存储在HDFS之类的存储系统中。与上述选项相比,在HDFS中存储偏移量不太受欢迎,因为与其他系统(如ZooKeeper和HBase)相比,HDFS具有更高的延迟。此外,如果管理不当,则在HDFS中为每个批次编写offsetRanges可能会导致文件较小的问题。

不管理offset


当然,Spark Streaming应用程序并不是必须的去管理offset。对当前业务考虑好是否需要对offset进行保存。

本文参考如下:
Offset Management For Apache Kafka With Apache Spark Streaming