Flink 侧输出流SideOutput

在Spark和Flink的流式处理方面,有很多相似之处,例如map(),flatmap()等算子使用方法都是相似的。那这里我举出一个在Spark Streaming中没有的一个功能,侧出流SideOutput。


在Flink以前的版本中,是使用split()算子来实现这个功能的,但是目前1.4官方不推荐使用,在编译器中也被标示过时方法,推荐使用sideoutput。具体能实现的功能是将一个流数据,按照你自定义的规则,在流数据内部来分成一个或者多个流,并且同时输出给你。


在Spark Streaming中,如果我们想要对一个流数据进行分割,我们需要对一个流数据分别做两次filter算子,这样就会进行数据复制,相对来说耗用资源更高一些。flink中的侧输出就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据。


实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package data_stream

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.util.Collector
import utils.KafkaUtil

object sideoutput_datastream {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))


val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)

val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})

val processStream: DataStream[Raw] = word_stream.process(new SideOutput())

processStream.print("original data")
//通过getSideOutput获取侧输出流,并打印输出
processStream.getSideOutput(new OutputTag[String]("dirty_data")).print("side output data")

env.execute("side output test")

}

def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}

class SideOutput() extends ProcessFunction[Raw, Raw] {
//定义一个侧输出流标签
lazy val dirty_data: OutputTag[String] = new OutputTag[String]("dirty_data")

override def processElement(value: Raw,
ctx: ProcessFunction[Raw, Raw]#Context,
out: Collector[Raw]): Unit = {
if (value.keywordList == "dirty_data") {
ctx.output(dirty_data, s"${value.date_time}+${value.keywordList} is dirty data")
} else {
out.collect(value)
}
}
}

}


结果输出