Spark Streaming 进阶

初始化

要初始化Spark Streaming程序,必须创建StreamingContext对象,该对象是所有Spark Streaming功能的主要入口点。

1
2
3
4
val conf: SparkConf = new SparkConf()
.setAppName("your application name")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))

maven依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!--<scope>provided</scope>-->
</dependency>


该appName参数是您的应用程序在集群UI上显示的名称。 master是Spark,Mesos,Kubernetes或YARN群集URL或特殊的“ local []”字符串,以本地模式运行。实际工作中,程序部署、运行在集群上,所以并不希望master在程序中进行硬编码,而是在提交程序的spark-submit –master * 命令中来指定。如果只是本地IDEA运行,则可指定 local。


在初始化的代码中,我们要设置每个批处理的时间间隔,上面代码中 Seconds(5),也就是5秒划分一个批次。我们打开源码,可以看到,StreamingContext()第二个参数还有其他选择,最终都是将时间转换成毫秒。
屏幕快照 2020-05-05 上午12.27.17.png
**

Dstream

unnamed.jpg


DStream由一系列连续的RDD表示,这是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含来自特定间隔的数据。


这里,我们来看一下下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package streaming

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}


object streaming_case {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
.setAppName("Kafka Streaming")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("/Users/cpeixin/IdeaProjects/code_warehouse/spark_streaming/src/main/scala/streaming/")

val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_spark_streaming",
"auto.offset.reset" -> "earliest", // earliest
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("weibo_keyword")
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))


kafkaStream.foreachRDD((x: RDD[ConsumerRecord[String, String]]) =>println(x))

ssc.start()
ssc.awaitTermination()
}


def change_data(string_data: String): String = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
keywordList
}

}

结果:

1
KafkaRDD[0] at createDirectStream at streaming_case.scala:39

上面的代码,在42行foreachRDD的中,我们打印DStream中的RDD,结果中我们看到,第一个批次中,只有一个RDD,这里我想说的是,在上面的这种读取数据代码中,一个 batch Interval中,DStream 只有一个RDD,当一个新的时间窗口(batchInterval)开始时,此时产生一个空的block,此后在这个窗口内接受到的数据都会累加到这个block上,当这个时间窗口结束时,停止累加,这个block对应的数据就是这个时间窗口对应的RDD包含的数据

屏幕快照 2020-05-05 上午1.32.05.png
这里我们还可以深入 slideDuration:Duration来看,和后面要讲的窗口函数windiw()中,RDD的区别。

batch interval


关于Spark Streaming的批处理时间设置是非常重要的,Spark Streaming在不断接收数据的同时,需要处理数据的时间,所以如果设置过段的批处理时间,会造成数据堆积,即未完成的batch数据越来越多,从而发生阻塞。
另外值得注意的是,batchDuration本身也不能设置为小于500ms,这会导致Spark Streaming进行频繁地提交作业,造成额外的开销,减少整个系统的吞吐量;相反如果将batchDuration时间设置得过长,又会影响整个系统的吞吐量。


如何设置一个合理的批处理时间,需要根据应用本身、集群资源情况,以及关注和监控Spark Streaming系统的运行情况来调整,重点关注Spark Web UI监控界面中的Total Delay,来进行调整。
0_zE-5D45Z7yImkO7f.png

CheckPoint


我们所编写的实时计算程序大多数都是24小时全天候生产环境运行的,因此必须对与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为此,Spark Streaming需要将足够的信息检查点指向容错存储系统,以便可以从故障中恢复。检查点有两种类型的数据。

  • 元数据检查点-将定义流计算的信息保存到HDFS等容错存储中。这用于从运行流应用程序的驱动程序的节点的故障中恢复。元数据包括:
    • 配置 用于创建流应用程序的配置。
    • DStream操作 -定义流应用程序的DStream操作集。
    • 不完整的批次 -作业排队但尚未完成的批次。
  • 数据检查点 将生成的RDD保存到可靠的存储中。在一些有状态转换中,这需要跨多个批次合并数据,这是必需的。在此类转换中,生成的RDD依赖于先前批次的RDD,这导致依赖项链的长度随时间不断增加。为了避免恢复时间的这种无限制的增加(与依赖关系链成比例),有状态转换的中间RDD定期 检查点到可靠的存储(例如HDFS)以切断依赖关系链。


总而言之,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

Checkpoint和persist从根本上是不一样的:
1、Cache or persist:
Cache or persist保存了RDD的血统关系,假如有部分cache的数据丢失可以根据血缘关系重新生成。
2、Checkpoint
会将RDD数据写到hdfs这种安全的文件系统里面,并且抛弃了RDD血缘关系的记录。即使persist存储到了磁盘里面,在driver停掉之后会被删除,而checkpoint可以被下次启动使用。


何时启用检查点
必须为具有以下任一要求的应用程序启用检查点:

  • 有状态转换的用法 -如果在应用程序中使用updateStateByKeyreduceByKeyAndWindow(带有反函数),则必须提供检查点目录以允许定期进行RDD检查点。
  • 从运行应用程序的驱动程序故障中恢复 -元数据检查点用于恢复进度信息。

注意,没有前述状态转换的简单流应用程序可以在不启用检查点的情况下运行。在这种情况下,从驱动程序故障中恢复也将是部分的(某些已接收但未处理的数据可能会丢失)。这通常是可以接受的,并且许多都以这种方式运行Spark Streaming应用程序。预计将来会改善对非Hadoop环境的支持。

如何配置检查点**
可以通过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用检查点,将检查点信息保存到该目录中。这是通过使用完成的streamingContext.checkpoint(checkpointDirectory)。这将允许您使用前面提到的有状态转换。此外,如果要使应用程序从驱动程序故障中恢复,则应重写流应用程序以具有以下行为。

  • 程序首次启动时,它将创建一个新的StreamingContext,设置所有流,然后调用start()。
  • 失败后重新启动程序时,它将根据检查点目录中的检查点数据重新创建StreamingContext。


代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

请注意,RDD的检查点会导致保存到可靠存储的成本。这可能会导致RDD获得检查点的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(例如1秒)时,每批检查点可能会大大降低操作吞吐量。相反,检查点太不频繁会导致沿袭和任务规模增加,这可能会产生不利影响。对于需要RDD检查点的有状态转换,默认间隔为批处理间隔的倍数,至少应为10秒。可以使用设置 dstream.checkpoint(checkpointInterval)。通常,DStream的5-10个滑动间隔的检查点间隔是一个很好的尝试设置。


checkpoint时机
在spark Streaming中,JobGenerator用于生成每个batch对应的jobs,它有一个定时器,定时器 的周期即初始化StreamingContext时设置batchDuration。这个周期一到,JobGenerator将调用generateJobs方法来生成并提交jobs,这之后调用doCheckpoint方法来进行checkpoint。doCheckpoint方法中,会判断 当前时间与streaming application start的时间只差是否是 checkpoint duration的倍数,只有在是的情况下才进行checkpoint。


具体应用实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

package org.apache.spark.examples.streaming

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.util.{IntParam, LongAccumulator}

/**
* Use this singleton to get or register a Broadcast variable.
*/
object WordBlacklist {

@volatile private var instance: Broadcast[Seq[String]] = null

def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}

/**
* Use this singleton to get or register an Accumulator.
*/
object DroppedWordsCounter {

@volatile private var instance: LongAccumulator = null

def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}

object RecoverableNetworkWordCount {

def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
: StreamingContext = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
println("Creating new context")
val outputFile = new File(outputPath)
if (outputFile.exists()) outputFile.delete()
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(checkpointDirectory)

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = s"Counts at time $time $counts"
println(output)
println(s"Dropped ${droppedWordsCounter.value} word(s) totally")
println(s"Appending to ${outputFile.getAbsolutePath}")
Files.append(output + "\n", outputFile, Charset.defaultCharset())
}
ssc
}

def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println(s"Your arguments were ${args.mkString("[", ", ", "]")}")
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
| <output-file>. <hostname> and <port> describe the TCP server that Spark
| Streaming would connect to receive data. <checkpoint-directory> directory to
| HDFS-compatible file system which checkpoint data <output-file> file to which the
| word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n > 1
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
}
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => createContext(ip, port, outputPath, checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}
}

UpdateStateByKey

流处理主要有3种应用场景:无状态操作、window操作、状态操作。updateStateByKey就是典型的状态操作。

下面是针对updateStateByKey举的实例,主要功能是针对流数据中的关键词进行统计,并且是根据历史状态持续统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package streaming

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object streaming_updatastatebykey {
Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
.setAppName("Kafka Streaming")
.setMaster("local[2]")

val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("/Users/cpeixin/IdeaProjects/code_warehouse/spark_streaming/src/main/scala/streaming/")

val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_spark_streaming",
"auto.offset.reset" -> "latest", // earliest
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("weibo_keyword")
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

val wordcount_dstream: DStream[(String, Int)] = kafkaStream
.map((x: ConsumerRecord[String, String]) => {
change_data(x.value())
})
.flatMap((_: String).split(","))
.map((x: String) => (x, 1))

val sum_dstream: DStream[(String, Int)] = wordcount_dstream.updateStateByKey((seq: Seq[Int], state: Option[Int]) => {
var sum: Int = state.getOrElse(0)+seq.sum
Option(sum)
})

sum_dstream.foreachRDD((keywordFormat_rdd: RDD[(String, Int)]) => {
val sort_rdd: Array[(Int, String)] = keywordFormat_rdd.map((x: (String, Int)) => {(x._2, x._1)}).sortByKey().top(10)
sort_rdd.foreach(println)
println("====================")
})

ssc.start()
ssc.awaitTermination()
}

def change_data(string_data: String): String = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
keywordList
}
}

打印统计信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
====================
(8,剪头)
(5,黑人抬棺队长称将环游世界)
(5,黑人)
(5,队长)
(5,环游世界)
(4,野餐)
(4,这野餐也太实在了吧)
(3,郑钧低空飞行)
(3,郑钧)
(3,森林)
====================
(8,剪头)
(7,森林)
(7,挪威)
(7,伍佰 挪威的森林)
(7,伍佰)
(6,肤色)
(6,状态)
(6,今年夏天的肤色状态)
(6,今年夏天)
(5,黑人抬棺队长称将环游世界)

注意:类似updateStateByKey和mapWithState等有状态转换算子,程序中必须要指定checkpoint检查点。

窗口函数

Spark Streaming还提供了_窗口计算_,可让您在数据的滑动窗口上应用转换。下图说明了此滑动窗口。

如该图所示,每当窗口_滑动_在源DSTREAM,落入窗口内的源RDDS被组合及操作以产生RDDS的窗DSTREAM。在这种特定情况下,该操作将应用于数据的最后3个时间单位,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

  • 窗口长度 - _窗口_的持续时间。
  • 滑动间隔 -进行窗口操作的间隔。

这两个参数必须是源DStream的批处理间隔的倍数


下面给出实例代码,描述的场景是每10秒统计一次过去30秒期间,关键词出现次数的top 5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package streaming

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object streaming_window {
Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("spark streaming window")
.setMaster("local[2]")

val sc = new StreamingContext(conf, Seconds(5))

val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_spark_streaming",
"auto.offset.reset" -> "latest", // earliest
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("weibo_keyword")
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
.createDirectStream[String, String](sc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

val wordcount_dstream: DStream[(String, Int)] = kafkaStream
.map((x: ConsumerRecord[String, String]) => {
get_data(x.value())
})
.flatMap((_: String).split(","))
.map((x: String) => (x, 1))

val window_dstream: DStream[(String, Int)] = wordcount_dstream.reduceByKeyAndWindow((x: Int,y: Int)=>x+y,Seconds(30), Seconds(10))

val result: DStream[(String, Int)] = window_dstream.transform((rdd: RDD[(String, Int)]) =>{
val top3: Array[(String, Int)] = rdd.map((x: (String, Int)) =>(x._2,x._1)).sortByKey(ascending = false).map((x: (Int, String)) =>(x._2,x._1)).take(5)
sc.sparkContext.makeRDD(top3)
})

result.print()

sc.start()
sc.awaitTermination()

}

def get_data(string_data: String): String = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
keywordList
}
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-------------------------------------------
Time: 1588694330000 ms
-------------------------------------------
(台版,6)
(黑人,6)
(台版101模仿黑人抬棺,6)
(训练,2)
(听起来很厉害的专业术语,2)

-------------------------------------------
Time: 1588694340000 ms
-------------------------------------------
(野餐,10)
(野餐还没拍好照就被牛吃了,10)
(台版,6)
(黑人,6)
(台版101模仿黑人抬棺,6)

-------------------------------------------
Time: 1588694350000 ms
-------------------------------------------
(姐姐,10)
(野餐,10)
(野餐还没拍好照就被牛吃了,10)
(乘风破浪的姐姐们,10)
(台版,6)

-------------------------------------------
Time: 1588694360000 ms
-------------------------------------------
(姐姐,10)
(野餐,10)
(野餐还没拍好照就被牛吃了,10)
(乘风破浪的姐姐们,10)
(叶冲太难了,9)


transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作;
它可以用于实现,DStream API中所没有提供的操作;比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。


这里看 reduceByKeyAndWindow()函数的第二个参数和第三个参数,分别代表的意义就是,窗口的长度和窗口滑动间隔。


这里还需要知道一点,Dstream中的RDD也可以调用persist()方法保存在内存当中,但是基于window和state的操作,reduceByWindow,reduceByKeyAndWindow,updateStateByKey等它们已经在源码中默认persist了
屏幕快照 2020-05-06 上午12.23.11.png

**