Flink 合并流 connect & union

Flink 合并流 connect & union

* connect & union(合并流)**


connect之后生成ConnectedStreams,会对两个流的数据应用不同的处理方法,并且双流 之间可以共享状态(比如计数)。这在第一个流的输入会影响第二个流时, 会非常有用;


union 合并多个流,新的流包含所有流的数据。


union是DataStream
→ DataStream。


connect是DataStream* → ConnectedStreams。


connect只能连接两个流,而union可以连接多于两个流 。


connect连接的两个流类型可以不一致,但是合并数据的数据类型要一致,而union连接的流的类型必须一致。


connect算子后面跟的map 和 flatMap和普通的map, flatmap类似,只不过作用在ConnectedStreams上
会改变流的类型,由ConnectedStreams → DataStream


下面的实例,是以sideoutput那篇文章的基础上,对主数据流和侧出流进行connect,代码中也给出了union合并流方法,unino方法使用起来则很简单,并且和spark streaming中union的用法是一样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package data_stream

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.util.Collector
import utils.KafkaUtil

object connect_datastream {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))


val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)

val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})

val processStream: DataStream[Raw] = word_stream.process(new SideOutput())
val dirty_stream: DataStream[Raw] = processStream.getSideOutput(new OutputTag[Raw]("dirty_data"))

// val connect_datastream: DataStream[String] = processStream.connect(dirty_stream)
// .map(
// (originalRaw: Raw) => originalRaw.keywordList,
// (dirtyRaw: Raw) => dirtyRaw.keywordList
// )
//
// connect_datastream.print("ALL ")

val union_datastream: DataStream[Raw] = processStream.union(dirty_stream)
union_datastream.print("union_datastream ")

env.execute("connect stream test")

}

def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}

class SideOutput() extends ProcessFunction[Raw, Raw] {
//定义一个侧输出流标签
lazy val dirty_data: OutputTag[Raw] = new OutputTag[Raw]("dirty_data")

override def processElement(value: Raw,
ctx: ProcessFunction[Raw, Raw]#Context,
out: Collector[Raw]): Unit = {
if (value.keywordList == "dirty_data") {
ctx.output(dirty_data, value)
} else {
out.collect(value)
}
}
}

}


打印结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ALL :8> dirty_data
ALL :7> 网购翻车的经历,经历
ALL :1> 彭昱畅的自拍和他拍,彭昱畅
ALL :7> dirty_data
ALL :1> 彭昱畅的自拍和他拍,彭昱畅
ALL :7> dirty_data
ALL :1> 网购翻车的经历,经历
ALL :7> dirty_data
ALL :1> 网购翻车的经历,经历
ALL :8> dirty_data
ALL :7> 痛仰演唱会,演唱会
ALL :1> 痛仰演唱会,演唱会
ALL :8> dirty_data
ALL :1> dirty_data
ALL :7> dirty_data
ALL :1> 痛仰演唱会,演唱会
ALL :1> dirty_data
ALL :8> dirty_data
ALL :7> 痛仰演唱会,演唱会
ALL :8> dirty_data
ALL :1> 痛仰演唱会,演唱会
ALL :8> dirty_data
ALL :1> dirty_data
ALL :7> dirty_data
ALL :1> 幸福触手可及片花,片花
ALL :7> 幸福触手可及片花,片花