Structured Streaming 重温

总览

_
Structured Streaming 则是在 Spark 2.0 加入的经过重新设计的全新流式引擎。它的模型十分简洁,易于理解。一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾。用户可以使用 Dataset/DataFrame 或者 SQL 来对这个动态数据源进行实时查询。每次查询在逻辑上就是对当前的表格内容执行一次 SQL 查询。如何执行查询则是由用户通过触发器(Trigger)来设定。用户既可以设定定期执行,也可以让查询尽可能快地执行,从而达到实时的效果。最后,系统通过 checkpointing 和 Write-Ahead Logs来确保端到端的一次容错保证。一个流的输出有多种模式,既可以是基于整个输入执行查询后的完整结果,也可以选择只输出与上次查询相比的差异,或者就是简单地追加最新的结果。这个模型对于熟悉 SQL 的用户来说很容易掌握,对流的查询跟查询一个表格几乎完全一样。


在内部,默认情况下,结构化流查询是使用_微批量处理_引擎_处理的_,该引擎将数据流作为一系列小批量作业进行处理,从而实现了低至100毫秒的端到端延迟以及 exactly-once的容错保证。但是,自Spark 2.3起,我们引入了一种称为“ 连续处理”的新低延迟处理模式,该模式可以实现一次最少保证的低至1毫秒的端到端延迟。在不更改查询中的Dataset / DataFrame操作的情况下,您将能够根据应用程序需求选择模式


Structured Streaming 是对 Spark Streaming 的改进么?


Structured Streaming 并不是对 Spark Streaming 的简单改进,而是我们吸取了过去几年在开发 Spark SQL 和 Spark Streaming 过程中的经验教训,以及 Spark 社区和 Databricks 众多客户的反馈,重新开发的全新流式引擎,致力于为批处理和流处理提供统一的高性能 API。同时,在这个新的引擎中,我们也很容易实现之前在 Spark Streaming 中很难实现的一些功能,比如 Event Time 的支持,Stream-Stream Join,毫秒级延迟


类似于 Dataset/DataFrame 代替 Spark Core 的 RDD 成为为 Spark 用户编写批处理程序的首选,Dataset/DataFrame 也将替代 Spark Streaming 的 DStream,成为编写流处理程序的首选。

Structured Streaming 的 Spark 有什么优劣势吗?

  • 简洁的模型。Structured Streaming 的模型很简洁,易于理解。用户可以直接把一个流想象成是无限增长的表格。
  • 一致的 API。由于和 Spark SQL 共用大部分 API,对 Spaprk SQL 熟悉的用户很容易上手,代码也十分简洁。同时批处理和流处理程序还可以共用代码,不需要开发两套不同的代码,显著提高了开发效率。
  • 卓越的性能。Structured Streaming 在与 Spark SQL 共用 API 的同时,也直接使用了 Spark SQL 的 Catalyst 优化器和 Tungsten,数据处理性能十分出色。此外,Structured Streaming 还可以直接从未来 Spark SQL 的各种性能优化中受益。
  • 多语言支持。Structured Streaming 直接支持目前 Spark SQL 支持的语言,包括 Scala,Java,Python,R 和 SQL。用户可以选择自己喜欢的语言进行开发。


呃~~ 关于Structured Streaming的介绍就说到这里,如果想看更详细更准确的介绍呢,还是乖乖的去官网吧。在2017年10月份的时候,新立项的一个app用户行为实时项目,我有意使用Structured Streaming,所以就调研了一下,自己写了一个demo,我记忆里那时写Structured很别扭,就像一个模版一样,输入源和输出源都被规定好了函数和参数,并且在那时候测试后的时候,不怎么稳定,而且官方并没有给出成熟的版本,当时所测试的功能还都是 alpha 版本,所以当时就还是使用了Spark Streaming

不过现在来看,Structured Streaming 越来越成熟,Spark Streaming感觉似乎停止了更新。Structured streaming应该是Spark流处理的未来,但是很难替代Flink。Flink在流处理上的天然优势很难被Spark超越。


在读完了Structured Streaming官网后,还是亲手的写一些实例感受一下,以后做架构的时候,如果适合的话,还可以加进来。

实例


### complete, append, update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.log4j.{Level, Logger}

object structured_kafka {
val logger:Logger = Logger.getRootLogger
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {

case class kafka_format(date_time: String, keyword_list: String)

val spark: SparkSession = SparkSession
.builder()
.appName("Structrued-Streaming")
.master("local[2]")
.getOrCreate()

import spark.implicits._

val kafka_df: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "weibo_keyword")
.option("startingOffsets", "earliest")
.option("includeTimestamp", value = true)
// .option("endingOffsets", "latest")
// .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
// .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()

val keyvalue_df: DataFrame = kafka_df
.selectExpr("CAST(value AS STRING)")
.as[String]
.map((x: String) => {
val date_time: String = JSON.parseObject(x).getString("datetime")
val keyword_list: String = JSON.parseObject(x).getString("keywordList")
(date_time, keyword_list)
})
.flatMap((x: (String, String)) =>{
x._2.split(",").map((word: String) =>(x._1,word))
})
.toDF("date_time", "keyword")
.groupBy("keyword").count()
.orderBy($"count".desc)


val query: StreamingQuery = keyvalue_df.writeStream
.outputMode("complete") //append
.format("console")
.start()

query.awaitTermination()

}
}
其中需要注意:
_
_Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;_
_
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;

未进行aggregate的stream不能sort
_
_ ### window窗口 _
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.alibaba.fastjson.JSON
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object structured_kafka_window {
val logger:Logger = Logger.getRootLogger
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {

case class kafka_format(date_time: String, keyword_list: String)

val spark: SparkSession = SparkSession
.builder()
.appName("Structrued-Streaming")
.master("local[2]")
.getOrCreate()

import spark.implicits._

val kafka_df: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "weibo_keyword")
.option("startingOffsets", "latest")
.option("includeTimestamp", value = true)
// .option("endingOffsets", "latest")
// .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
// .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()

val keyvalue_df: DataFrame = kafka_df
.selectExpr("CAST(value AS STRING)")
.as[String]
.map((x: String) => {
val date_time: String = JSON.parseObject(x).getString("datetime")
val keyword_list: String = JSON.parseObject(x).getString("keywordList")
(date_time, keyword_list)
})
.flatMap((x: (String, String)) =>{
x._2.split(",").map((word: String) =>(x._1,word))
})
.toDF("date_time", "keyword")
.groupBy(window($"date_time", "5 minutes", "1 minutes"),$"keyword")
.count()
.orderBy("window")

val query: StreamingQuery = keyvalue_df.writeStream
.outputMode("complete") //append
.format("console")
.option("truncate", "false")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()

}
}
这里关于window窗口的划分,我建议大家好好的研读一下源码:
**位置:package **org.apache.spark.sql.catalyst.analysis
![11.29.50.png](https://cdn.nlark.com/yuque/0/2020/png/1072113/1588952005543-007c1291-12fd-4148-a625-587b1a6149f3.png#align=left&display=inline&height=1694&margin=%5Bobject%20Object%5D&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-05-08%20%E4%B8%8B%E5%8D%8811.29.50.png&originHeight=1694&originWidth=1936&size=488433&status=done&style=none&width=1936) ### Watermark
1
2
3
4
5
6
7
8
9
10
11
12
13
// 基于event-time的window,words包含timestamp和word两列
word
.withWatermark("timestamp", "30 minutes")//某窗口结果为x,但是部分数据在这个窗口的最后一个timestamp过后还没到达,Spark在这会等30min,过后就不再更新x了。
.dropDuplicates("User", "timestamp")
.groupBy(window(col("timestamp"), "10 minutes"),col("User"))// 10min后再加一个参数变为Sliding windows,表示每隔多久计算一次。
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()

spark.sql("SELECT * FROM events_per_window")
watermark = max event time seen by the engine - late threshold,相当于Flink的BoundedOutOfOrdernessTximestampExtractor。

在window计算被触发时,Spark会删除结束时间低于当前wm的window的中间结果,属于该window的迟到数据“可能”会被忽略,越迟越可能被忽略,删除完后才更新wm,所以即便下一批没有数据加入,Spark所依据的wm也是新的,下下一批wm不变。

上面是update mode,如果是append模式,那么结果要等到trigger后发现window的结束时间低于更新后的水位线时才会出来。另外,max event time seen by the engine - late threshold机制意味着如果下一批计算没有更晚的数据加入,那么wm就不会前进,那么数据的append就会被延后。

Conditions for watermarking to clean aggregation state(as of Spark 2.1.1, subject to change in the future)
  • 不支持complete模式。
  • groupBy必须包含timestamp列或者window(col(timestamp)),withWatermark中的列要和前面的timestamp列相同
  • 顺序必须是先withWatermark再到groupBy


参考:
结构化流编程指南
Spark之Structured Streaming