Spark Streaming + ELK + HBase

在Spark Streaming的业务场景中,大多数的业务需求是针对实时数据做数据统计,网站数据,App数据的监控分析,或者是对抓取的实时数据做ETL,再进行展示,以及实时的推荐系统等。


目前公司正在做的项目,针对公司内部运营部门的需求,一方面统计时尚,教育,科技等行业的最新动态,二则是对公司编辑人员提供目前行业热点素材,以热点图的方式展示,点击相应的关键词,并提取出相应的素材。


那么在整体的架构上:
数据源来自Python的定向爬虫
消息队列采用Kafka
数据处理采用Spark Streaming,对搜集的素材做ETL以及关键词提取等
数据存储采用HBase,Elasticsearch, HBase存储全量数据,做历史数据的备份,方便于后续的数据分析,Elasticsearch搭配HBase做二级索引方案,同时存储部分数据,对戒Kibana做数据可视化。


下面对上面的业务场景做一个简单的代码实例,其中没有写入项目的数据逻辑,那自己电脑中的数据进行了模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package write

import java.security.MessageDigest

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Connection, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.elasticsearch.spark.streaming.EsSparkStreaming

import utils.HBaseUtil

import scala.util.Try

object streaming_to_hbase_1 {
val logger:Logger = Logger.getRootLogger
Logger.getLogger("org").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {

val conf: SparkConf = new SparkConf()
.setAppName("spark streaming window")
.setMaster("local[2]")
.set("spark.es.nodes", "localhost")
.set("spark.es.port", "9200")
.set("es.index.auto.create", "true")


val ssc = new StreamingContext(conf, Seconds(5))

val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_spark_streaming",
"auto.offset.reset" -> "earliest", // earliest,latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("weibo_keyword")

val kafkaStream: DStream[(String, String)] = KafkaUtils
.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
.map((x: ConsumerRecord[String, String]) => {
val json_data: JSONObject = JSON.parseObject(x.value())
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
})

// ES 数据写入部分
val es_dstream: DStream[String] = kafkaStream.map((x: (String, String)) => {
val date_time: String = x._1
val keywordList: String = x._2
val data_es_json: JSONObject = new JSONObject()
data_es_json.put("date_time", date_time)
data_es_json.put("keyword_list", keywordList)
data_es_json.put("rowkey", MD5Encode(date_time))
data_es_json.toJSONString
})

EsSparkStreaming.saveJsonToEs(es_dstream,"weibo_keyword-2017-04-25/default")

//HBase 数据写入部分
kafkaStream.foreachRDD((rdd: RDD[(String, String)]) => {
rdd.foreachPartition((partitionRecords: Iterator[(String, String)]) => {//循环分区
try {
val connection: Connection = HBaseUtil.getHBaseConn //获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化
partitionRecords.foreach((s: (String, String)) => {

val tableName: TableName = TableName.valueOf("t_weibo_keyword")
val table: Table = connection.getTable(tableName)//获取表连接

var date_time: String = s._1
val keywordList: String = s._2
val put = new Put(Bytes.toBytes(MD5Encode(date_time)))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keywordLisr"), Bytes.toBytes(keywordList))

Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
// table.close()//分区数据写入HBase后关闭连接

})
} catch {
case e: Exception =>
logger.info(e)
logger.info("写入HBase失败")
}
})

})

ssc.start()
ssc.awaitTermination()
}

def MD5Encode(input: String): String = {
// 指定MD5加密算法
val md5: MessageDigest = MessageDigest.getInstance("MD5")
// 对输入数据进行加密,过程是先将字符串中转换成byte数组,然后进行随机哈希
val encoded: Array[Byte] = md5.digest(input.getBytes)
// 将加密后的每个字节转化成十六进制,一个字节8位,相当于2个16进制,不足2位的前面补0
encoded.map("%02x".format(_: Byte)).mkString
}

}


es在maven中引入的依赖:

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.7.0</version>
</dependency>


kibana中的结果展示
屏幕快照 2020-05-07 上午12.49.35.png