Spark Streaming 讲解

简述

Spark Streaming是核心Spark API的扩展,近实时计算框架。特点可伸缩,高吞吐量,容错流处理。而我们之前讲的Spark SQL是负责处理离线数据。


既然是计算框架,那么实战的过程中还是三个步骤,读取数据源、计算数据、数据存储:


数据源:可以从Kafka, Flume, Kinesis, or TCP sockets等数据源读入
数据计算:同样可以使用同样的map,reduce,join等算子进行数据计算,还有Spark Streaming 特有的window窗口函数。
数据存储:可以将处理后的数据推送到文件系统HDFS,数据库的话比较常用的是MySQL,Mongo DB和HBase等,对于实时数据的展示和监控方面,我们比较常用的是将数据写入Elasticsearch中,并且使用Kibana或者Grafana来进行展示。
streaming-arch.png
Spark Streaming 提供一个对于流数据的抽象 DStream。DStream 可以由来自 Apache Kafka、Flume 或者 HDFS 的流数据生成,也可以由别的 DStream 经过各种转换操作得来,底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD。


然后,Spark 核心引擎将对 DStream 的 Transformation 操作变为针对 Spark 中对 RDD 的 Transformation 操作,将 RDD 经过操作变成中间结果保存在内存中。之前的 DataFrame 和 DataSet 也是同样基于 RDD,所以说 RDD 是 Spark 最基本的数据抽象。就像 Java 里的基本数据类型(Primitive Type)一样,所有的数据都可以用基本数据类型描述。也正是因为这样,无论是 DataFrame,还是 DStream,都具有 RDD 的不可变性、分区性和容错性等特质。


所以,Spark 是一个高度统一的平台,所有的高级 API 都有相同的性质,它们之间可以很容易地相互转化。Spark 的野心就是用这一套工具统一所有数据处理的场景。由于 Spark Streaming 将底层的细节封装起来了,所以对于开发者来说,只需要操作 DStream 就行。接下来,让我们一起学习 DStream 的结构以及它支持的转换操作

对比

一般在讲述Spark Streaming的时候,其他博主都会列一个表格,将Storm,Spark Streaming,Flink三个实时计算框架进行对比,这里我就不进行三者之间的比较了,直接根据我这几年使用的经验来分享一下直接的结果,

Storm是一个早期的流式计算框架,可以做到毫秒级别的计算,但是编程语言使用的是Java,代码量很多,编程复杂度会高一些,并且目前很少有公司使用Storm,我实习期后,在第一家公司的第一个任务就是改写现有的Storm任务,迁移到Spark Streaming。

Spark Streaming的流式计算,准确的来讲,可以说是近实时或者微批计算,所以在一般情况下,对于时间要求不是太严格的情况下,Spark Streaming可以满足大部分的场景了。支持Java,Scala, Python三种语言。但是Spark Streaming有一个缺点,就是某些需求中,我们的计算结果应该是基于event time数据产生时间,但是在Spark Streaming中,有时会因为延迟的原因,计算结果会基于process time 数据到达的时间。针对这个问题,我在后面的Structured Streaming也会讲到。


Flink可以说是流式计算的后起之秀,并且就是为了流计算而诞生,它采用了基于操作符(Operator)的连续流模型,可以做到微秒级别的延迟。同样支持Java,Scala,Python三种语言,在我还认认真真写Spark代码的时候,每天都能看到铺天盖地的Flink新闻,Flink 用流处理去模拟批处理的思想,比 Spark 用批处理去模拟流处理的思想扩展性更好,所以我相信将来 Flink 会发展的越来越好,生态和社区各方面追上 Spark。比如,阿里巴巴就基于 Flink 构建了公司范围内全平台使用的数据处理平台 Blink,美团、饿了么等公司也都接受 Flink 作为数据处理解决方案。

实例


这里用一个实例来展示一下实际工作中,Spark Streaming的大概流程:


数据的采集或者网站数据收集—>Kafka—>Spark Streaming—>show data or save data


下面这端python代码是数据采集或者收集,打到Kafka的阶段。我本想写个爬虫来做实时数据的,但是我想起来我的Elasticsearch集群中正在实时的爬取某博数据,所以就直接间隔调用ES,来打入Kafka中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
import time
from kafka import KafkaProducer
import json
from elasticsearch import Elasticsearch

producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=['localhost:9092']
)

def getEsArticle():
"""动态链接"""
es = Elasticsearch(
['xxx.xxx.xxx.xxx:9205'],
# 在做任何操作之前,先进行嗅探
sniff_on_start=True,
# 节点没有响应时,进行刷新,重新连接
sniff_on_connection_fail=True,
# 每 60 秒刷新一次
sniffer_timeout=10
)

# 模糊查询
query = {
"query": {
"match_all": {}
},
"sort": {
"datetime": {
"order": "desc" # 降序
}
},
"size": 10
}

esResult = es.search(index="article_warehouse*", body=query)

for hit in esResult['hits']['hits']:
data = {'datetime': hit["_source"]["datetime"], 'keywordList': hit["_source"]["keywordList"]}
print(data)
producer.send('weibo_keyword', data)


def main():
while True:
getEsArticle()
time.sleep(10)


if __name__ == '__main__':
main()

打入数据的格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
{'datetime': '2017-04-10 18:29:42', 'keywordList': '特朗普,得志,领头羊,治国,爱心,美国,本领'}
{'datetime': '2017-04-10 18:29:56', 'keywordList': '车型,整体,长安,手动挡,自动,便利性,新手,平顺,发动机,版本,市区,积炭,编想,小编,驻车,速手,次顶,小伙伴,车师,省心'}
{'datetime': '2017-04-10 18:29:53', 'keywordList': '同桌,一个男孩,高三,人才,感觉'}
{'datetime': '2017-04-10 18:29:53', 'keywordList': '玩家,段位,落地,钢枪,游戏,新人,机器人,成盒,开局,军事基地,低端,小学生,精英,高水平,惩罚,谢谢,字数,高端,优质,战场'}
{'datetime': '2017-04-10 18:29:52', 'keywordList': '公话,情侣,青春校园,放学,校园,同学,印象,家人'}
{'datetime': '2017-04-10 18:29:52', 'keywordList': '阿萨德,表哥,经济命脉,皇亲国戚,叙利亚,动手,国家'}
{'datetime': '2017-04-10 18:29:51', 'keywordList': '集训,高带,体工队,大牌,劲旅,球员,南亚,浪费,印度,时代,国家'}
{'datetime': '2017-04-10 18:29:51', 'keywordList': '图片,老师,全校,同桌,爱慕,姨妈,手册,男孩,青春,窗户,主权,我会,毕业'}
{'datetime': '2017-04-10 18:29:51', 'keywordList': '段位,玩家,分会,分数,战场'}
{'datetime': '2017-04-10 18:29:50', 'keywordList': '杯子,盒子,自习室,专四,保送生,作文,单词,礼物,伤心,考试,耳朵,距离,姑娘'}
{'datetime': '2017-04-10 18:29:50', 'keywordList': '边路传,有球,曼联,全员,出力'}
{'datetime': '2017-04-10 18:30:07', 'keywordList': 'T恤,女生,身材,时尚,女装,印花,袖口,条纹,小编,板型,潘领,蓬蓬裙,踝款,百褶裙,碎花,拜拜,黑白相间,牛仔,牛仔裤,深色'}
{'datetime': '2017-04-10 18:30:06', 'keywordList': '周梅森,经济'}

下面的spark streaming代码比较简单,只是读取数据,从json格式中解析出来,随后打印,这里我没有直接写入的数据库或者es中,主要是给出一个spark streaming的程序框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.log4j.{Level, Logger}


object streaming_weibo {
// 设置日志级别
Logger.getLogger("org").setLevel(Level.ERROR)

def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
.setAppName("Kafka Streaming")
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("/Users/cpeixin/IdeaProjects/code_warehouse/spark_streaming/src/main/scala/streaming/")

val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_spark_streaming",
"auto.offset.reset" -> "earliest", // earliest
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("weibo_keyword")
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

kafkaStream.map((x: ConsumerRecord[String, String]) => {
get_data(x.value())
}).foreachRDD((x: RDD[String]) => x.foreach(println))

ssc.start()
ssc.awaitTermination()
}


def get_data(string_data: String): String = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
keywordList
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
手机,安卓,像素,摄像头,国产品牌,品牌,华为,小米,旗舰,英寸,屏幕,电池容量,对焦,逆光,音质,系统,后置,机身,光线,光学
电场线,本子,样子,铃响,心形,卷子,猫儿,异性,男朋友,板凳,电荷,我会,桌子,小学
范畴,心情,能力,东西
车型,整体,长安,手动挡,自动,便利性,新手,平顺,发动机,版本,市区,积炭,编想,小编,驻车,速手,次顶,小伙伴,车师,省心
高尔夫,大众,领速,高嘉,技术实力,试车,性价比,舒适性,内饰,亮点,亲戚,重庆,消费者,动力,空间,人士,原因,产品,情况
同桌,一个男孩,高三,人才,感觉
方向机,漆面,大叔,差点,图片
公话,情侣,青春校园,放学,校园,同学,印象,家人
棒棒,东忘西,太久,方言,老汉,老头,老师,时间
图片,老师,全校,同桌,爱慕,姨妈,手册,男孩,青春,窗户,主权,我会,毕业
刹车,啊啊啊,篮球场,转圈,脸红,张开,手臂,汉子,口气,朋友
段位,玩家,分会,分数,战场
法学专家谈拒绝加班被判赔1.8万,被判赔,法学,专家
杯子,盒子,自习室,专四,保送生,作文,单词,礼物,伤心,考试,耳朵,距离,姑娘
思域,品味
周梅森,经济