总览 _ Structured Streaming 则是在 Spark 2.0 加入的经过重新设计的全新流式引擎。它的模型十分简洁,易于理解。一个流的数据源从逻辑上来说就是一个不断增长的动态表格,随着时间的推移,新数据被持续不断地添加到表格的末尾。用户可以使用 Dataset/DataFrame 或者 SQL 来对这个动态数据源进行实时查询。每次查询在逻辑上就是对当前的表格内容执行一次 SQL 查询。如何执行查询则是由用户通过触发器(Trigger)来设定。用户既可以设定定期执行,也可以让查询尽可能快地执行,从而达到实时的效果。最后,系统通过 checkpointing 和 Write-Ahead Logs来确保端到端的一次容错保证。一个流的输出有多种模式,既可以是基于整个输入执行查询后的完整结果,也可以选择只输出与上次查询相比的差异,或者就是简单地追加最新的结果。这个模型对于熟悉 SQL 的用户来说很容易掌握,对流的查询跟查询一个表格几乎完全一样。 在内部,默认情况下,结构化流查询是使用_微批量处理_引擎_处理的_,该引擎将数据流作为一系列小批量作业进行处理,从而实现了低至100毫秒的端到端延迟以及 exactly-once的容错保证。但是,自Spark 2.3起,我们引入了一种称为“ 连续处理” 的新低延迟处理模式,该模式可以实现一次最少保证的低至1毫秒的端到端延迟。在不更改查询中的Dataset / DataFrame操作的情况下,您将能够根据应用程序需求选择模式
Structured Streaming 是对 Spark Streaming 的改进么? Structured Streaming 并不是对 Spark Streaming 的简单改进,而是我们吸取了过去几年在开发 Spark SQL 和 Spark Streaming 过程中的经验教训,以及 Spark 社区和 Databricks 众多客户的反馈,重新开发的全新流式引擎,致力于为批处理和流处理提供统一的高性能 API。同时,在这个新的引擎中,我们也很容易实现之前在 Spark Streaming 中很难实现的一些功能,比如 Event Time 的支持 ,Stream-Stream Join,毫秒级延迟 类似于 Dataset/DataFrame 代替 Spark Core 的 RDD 成为为 Spark 用户编写批处理程序的首选,Dataset/DataFrame 也将替代 Spark Streaming 的 DStream,成为编写流处理程序的首选。
Structured Streaming 的 Spark 有什么优劣势吗? 简洁的模型。Structured Streaming 的模型很简洁,易于理解。用户可以直接把一个流想象成是无限增长的表格。 一致的 API。由于和 Spark SQL 共用大部分 API,对 Spaprk SQL 熟悉的用户很容易上手,代码也十分简洁。同时批处理和流处理程序还可以共用代码,不需要开发两套不同的代码,显著提高了开发效率。 卓越的性能。Structured Streaming 在与 Spark SQL 共用 API 的同时,也直接使用了 Spark SQL 的 Catalyst 优化器和 Tungsten,数据处理性能十分出色。此外,Structured Streaming 还可以直接从未来 Spark SQL 的各种性能优化中受益。 多语言支持。Structured Streaming 直接支持目前 Spark SQL 支持的语言,包括 Scala,Java,Python,R 和 SQL。用户可以选择自己喜欢的语言进行开发。 呃~~ 关于Structured Streaming的介绍就说到这里,如果想看更详细更准确的介绍呢,还是乖乖的去官网吧。在2017年10月份的时候,新立项的一个app用户行为实时项目,我有意使用Structured Streaming,所以就调研了一下,自己写了一个demo,我记忆里那时写Structured很别扭,就像一个模版一样,输入源和输出源都被规定好了函数和参数,并且在那时候测试后的时候,不怎么稳定,而且官方并没有给出成熟的版本,当时所测试的功能还都是 alpha 版本,所以当时就还是使用了Spark Streaming
不过现在来看,Structured Streaming 越来越成熟,Spark Streaming感觉似乎停止了更新。Structured streaming应该是Spark流处理的未来,但是很难替代Flink。Flink在流处理上的天然优势很难被Spark超越。 在读完了Structured Streaming官网后,还是亲手的写一些实例感受一下,以后做架构的时候,如果适合的话,还可以加进来。
实例 ### complete, append, update
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame , Dataset , SparkSession }import com.alibaba.fastjson.{JSON , JSONObject }import org.apache.log4j.{Level , Logger }object structured_kafka { val logger:Logger = Logger .getRootLogger Logger .getLogger("org" ).setLevel(Level .ERROR ) def main (args: Array [String ]): Unit = { case class kafka_format (date_time: String , keyword_list: String ) val spark : SparkSession = SparkSession .builder() .appName("Structrued-Streaming" ) .master("local[2]" ) .getOrCreate() import spark.implicits._ val kafka_df: DataFrame = spark .readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "localhost:9092" ) .option("subscribe" , "weibo_keyword" ) .option("startingOffsets" , "earliest" ) .option("includeTimestamp" , value = true ) .load() val keyvalue_df: DataFrame = kafka_df .selectExpr("CAST(value AS STRING)" ) .as[String ] .map((x: String ) => { val date_time: String = JSON .parseObject(x).getString("datetime" ) val keyword_list: String = JSON .parseObject(x).getString("keywordList" ) (date_time, keyword_list) }) .flatMap((x: (String , String )) =>{ x._2.split("," ).map((word: String ) =>(x._1,word)) }) .toDF("date_time" , "keyword" ) .groupBy("keyword" ).count() .orderBy($"count" .desc) val query: StreamingQuery = keyvalue_df.writeStream .outputMode("complete" ) .format("console" ) .start() query.awaitTermination() } }
其中需要注意:
_
_Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;_
_
Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;
未进行aggregate的stream不能sort
_
_
### window窗口 _
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import com.alibaba.fastjson.JSON import org.apache.log4j.{Level , Logger }import org.apache.spark.sql.streaming.{StreamingQuery , Trigger }import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._object structured_kafka_window { val logger:Logger = Logger .getRootLogger Logger .getLogger("org" ).setLevel(Level .ERROR ) def main (args: Array [String ]): Unit = { case class kafka_format (date_time: String , keyword_list: String ) val spark : SparkSession = SparkSession .builder() .appName("Structrued-Streaming" ) .master("local[2]" ) .getOrCreate() import spark.implicits._ val kafka_df: DataFrame = spark .readStream .format("kafka" ) .option("kafka.bootstrap.servers" , "localhost:9092" ) .option("subscribe" , "weibo_keyword" ) .option("startingOffsets" , "latest" ) .option("includeTimestamp" , value = true ) .load() val keyvalue_df: DataFrame = kafka_df .selectExpr("CAST(value AS STRING)" ) .as[String ] .map((x: String ) => { val date_time: String = JSON .parseObject(x).getString("datetime" ) val keyword_list: String = JSON .parseObject(x).getString("keywordList" ) (date_time, keyword_list) }) .flatMap((x: (String , String )) =>{ x._2.split("," ).map((word: String ) =>(x._1,word)) }) .toDF("date_time" , "keyword" ) .groupBy(window($"date_time" , "5 minutes" , "1 minutes" ),$"keyword" ) .count() .orderBy("window" ) val query: StreamingQuery = keyvalue_df.writeStream .outputMode("complete" ) .format("console" ) .option("truncate" , "false" ) .trigger(Trigger .ProcessingTime ("5 seconds" )) .start() query.awaitTermination() } }
这里关于window窗口的划分,我建议大家好好的研读一下源码:
**位置:package **org.apache.spark.sql.catalyst.analysis
![11.29.50.png](https://cdn.nlark.com/yuque/0/2020/png/1072113/1588952005543-007c1291-12fd-4148-a625-587b1a6149f3.png#align=left&display=inline&height=1694&margin=%5Bobject%20Object%5D&name=%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202020-05-08%20%E4%B8%8B%E5%8D%8811.29.50.png&originHeight=1694&originWidth=1936&size=488433&status=done&style=none&width=1936)
### Watermark
1 2 3 4 5 6 7 8 9 10 11 12 13 word .withWatermark("timestamp" , "30 minutes" ) .dropDuplicates("User" , "timestamp" ) .groupBy(window(col("timestamp" ), "10 minutes" ),col("User" )) .count() .writeStream .queryName("events_per_window" ) .format("memory" ) .outputMode("complete" ) .start() spark.sql("SELECT * FROM events_per_window" )
watermark = max event time seen by the engine - late threshold,相当于Flink的BoundedOutOfOrdernessTximestampExtractor。
在window计算被触发时,Spark会删除结束时间低于当前wm的window的中间结果,属于该window的迟到数据“可能”会被忽略,越迟越可能被忽略,删除完后才更新wm,所以即便下一批没有数据加入,Spark所依据的wm也是新的,下下一批wm不变。
上面是update mode,如果是append模式,那么结果要等到trigger后发现window的结束时间低于更新后的水位线时才会出来。另外,max event time seen by the engine - late threshold机制意味着如果下一批计算没有更晚的数据加入,那么wm就不会前进,那么数据的append就会被延后。
Conditions for watermarking to clean aggregation state(as of Spark 2.1.1, subject to change in the future)
不支持complete模式。 groupBy必须包含timestamp列或者window(col(timestamp)),withWatermark中的列要和前面的timestamp列相同 顺序必须是先withWatermark再到groupBy 参考:结构化流编程指南 Spark之Structured Streaming