在Spark和Flink的流式处理方面,有很多相似之处,例如map(),flatmap()等算子使用方法都是相似的。那这里我举出一个在Spark Streaming中没有的一个功能,侧出流SideOutput。
在Flink以前的版本中,是使用split()算子来实现这个功能的,但是目前1.4官方不推荐使用,在编译器中也被标示过时方法,推荐使用sideoutput。具体能实现的功能是将一个流数据,按照你自定义的规则,在流数据内部来分成一个或者多个流,并且同时输出给你。
在Spark Streaming中,如果我们想要对一个流数据进行分割,我们需要对一个流数据分别做两次filter算子,这样就会进行数据复制,相对来说耗用资源更高一些。flink中的侧输出就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据。
实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package data_stream
import java.util.Properties
import org.apache.flink.streaming.api.scala._ import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase import org.apache.flink.util.Collector import utils.KafkaUtil
object sideoutput_datastream {
case class Raw(date_time: String, keywordList: String)
private val KAFKA_TOPIC: String = "weibo_keyword"
def main(args: Array[String]) {
val properties: Properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "kafka_consumer")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.enableCheckpointing(5000) env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))
val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)
val word_stream: DataStream[Raw] = env.addSource(kafkaSource) .map((x: String) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw(date_time, keywordList) })
val processStream: DataStream[Raw] = word_stream.process(new SideOutput())
processStream.print("original data") processStream.getSideOutput(new OutputTag[String]("dirty_data")).print("side output data")
env.execute("side output test")
}
def get_value(string_data: String): (String, String) = { val json_data: JSONObject = JSON.parseObject(string_data) val date_time: String = json_data.get("datetime").toString val keywordList: String = json_data.get("keywordList").toString (date_time, keywordList) }
class SideOutput() extends ProcessFunction[Raw, Raw] { lazy val dirty_data: OutputTag[String] = new OutputTag[String]("dirty_data")
override def processElement(value: Raw, ctx: ProcessFunction[Raw, Raw]#Context, out: Collector[Raw]): Unit = { if (value.keywordList == "dirty_data") { ctx.output(dirty_data, s"${value.date_time}+${value.keywordList} is dirty data") } else { out.collect(value) } } }
}
|
结果输出