Flink Allowed Lateness

在之前的博文中,我们讲到了Flink WaterMark机制来针对乱序数据的处理,并且设置了一个数值较小的延迟时间maxOutOfOrderness,通常这个允许的延迟时间是我们根据具体业务数据观察而来的,例如通过观察,我司的游戏数据大概会延迟2~3秒到达,所以针对我司的业务,maxOutOfOrderness可以设置为2秒。


但是针对一些特殊的情况,游戏网关会有阻塞,发送出来的数据会有较大的延迟情况,例如某些数据会迟到20分钟到达,并且数据不能被抛弃,如果这时我们把maxOutOfOrderness设置为20分钟,那显然是不合适的,每个窗口的周期会变得超长,并且在内存中缓存了太多的数据。针对这种情况,Flink给出的解决方案是Allowed Lateness


在allowedLateness()中会设置一个Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明


注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常

默认情况下,允许的延迟设置为 0。也就是说,到达水印后的元素将被丢弃。


获取最新数据作为侧面输出

使用Flink的侧面输出流功能,您可以获取最近被丢弃的数据流。
首先,您需要指定要sideOutputLateData(OutputTag)在窗口流上使用的较晚数据。然后,您可以根据窗口化操作的结果获取侧面输出流,获取到的侧输出流,你可以对历史数据进行更新操作:

1
2
3
4
5
6
7
8
9
10
11
12
val lateOutputTag = OutputTag[T]("late-data")

val input: DataStream[T] = ...

val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)


下面给出一个完整的实例,需求背景,每5秒统计一次近10秒时间内,用户在各个游戏的得分列表,乱序数据允许2秒的延迟,对于晚期数据到达,allowedLateness允许20秒的延迟,并且对晚期数据进行收集,后续和直接对接数据库更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package data_stream.watermark


import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

object watermark_allowedLateness {

case class GameData(user_id: String, game_id: String, game_time: Long, game_score: Int)


def main(args: Array[String]): Unit = {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.getConfig.setAutoWatermarkInterval(100L) //watermark周期

val socketStream: DataStream[String] = env.socketTextStream("localhost", 8888)

val gameStream: DataStream[GameData] = socketStream.map((line: String) => {
val array_data: Array[String] = line.split(",")
GameData(array_data(0), array_data(1), array_data(2).toLong, array_data(3).toInt)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GameData](Time.seconds(2)) {
override def extractTimestamp(element: GameData) = {
element.game_time
}
})


var gameLateData = new OutputTag[GameData]("late")


val windowStream: DataStream[(String, List[Int])] = gameStream
.keyBy((_: GameData).user_id)
.timeWindow(Time.seconds(10), Time.seconds(5))
// 数据延迟超过2秒,交给allowedLateness来处理
.allowedLateness(Time.seconds(20))
.sideOutputLateData(gameLateData)
// 接下里会发生三种情况:
// 1.没有延迟或者延迟小于2秒的数据,watermark保证窗口的触发,正常进入窗口中计算
// 2.数据延迟在2秒-20秒之间的数据,watermark+allowedLateness机制,watermark < window_end_time + allowedLateness_time时触发窗口
// 3.数据延迟大于20秒的数据,则输入到侧输出流处理sideOutputLateData

//第一种方法
// .process(new ProcessWindowFunction[GameData, (String, List[Int]), String, TimeWindow] {
// override def process(key: String, context: Context, elements: Iterable[GameData], out: Collector[(String, List[Int])]): Unit = {
// val scoreList: ListBuffer[Int] = ListBuffer[Int]()
// val scoreiterator: Iterator[GameData] = elements.iterator
// while (scoreiterator.hasNext) {
// val data: GameData = scoreiterator.next()
// scoreList += data.game_score
// }
// out.collect((key, scoreList.toList))
// }
// })

//第二种方法
.apply(new WindowFunction[GameData, (String, List[Int]), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[GameData], out: Collector[(String, List[Int])]): Unit = {
val scoreList: ListBuffer[Int] = ListBuffer[Int]()
val scoreiterator: Iterator[GameData] = input.iterator
while (scoreiterator.hasNext) {
val data: GameData = scoreiterator.next()
scoreList += data.game_score
}
println(s"${window.getStart}=====${window.getEnd}")
out.collect((key, scoreList.toList))
}
})

windowStream.print("window data")

val late: DataStream[GameData] = windowStream.getSideOutput(gameLateData)
late.print("迟到的数据:")

env.execute(this.getClass.getName)

}
}


输入测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
user_001,g1,1591016001000,100
user_001,g2,1591016002000,200
user_001,g4,1591016004000,400
user_001,g1,1591016005000,500
user_001,g2,1591016006000,600
user_001,g2,1591016007000,700


user_001,g4,1591016009000,400
user_001,g1,1591016010000,100


user_001,g2,1591016011000,1100
user_001,g4,1591016014000,1400

user_001,g8,1591016008000,800(迟到没超20秒,触发)
user_001,g8,1591016003000,300(迟到没超20秒,触发)


user_001,g17,1591016017000,1700


user_001,g43,1591016043000,4300

user_001,g22,1591016022000,2200

user_001,g21,1591016021000,2100

user_001,g21,1591016019000,1900

user_001,g1,1591016001000,100(迟到超20秒,没有触发)


输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1591015995000=====1591016005000
window data> (user_001,List(100, 200, 400))
1591016000000=====1591016010000
window data> (user_001,List(100, 200, 400, 500, 600, 700, 400))
1591016000000=====1591016010000
window data> (user_001,List(100, 200, 400, 500, 600, 700, 400, 800))
1591016000000=====1591016010000
window data> (user_001,List(100, 200, 400, 500, 600, 700, 400, 800, 300))
1591015995000=====1591016005000
window data> (user_001,List(100, 200, 400, 300))
1591016005000=====1591016015000
window data> (user_001,List(500, 600, 700, 400, 100, 1100, 1400, 800))
1591016010000=====1591016020000
window data> (user_001,List(100, 1100, 1400, 1700))
1591016015000=====1591016025000
window data> (user_001,List(1700))
1591016020000=====1591016030000
window data> (user_001,List(2200))
1591016015000=====1591016025000
window data> (user_001,List(1700, 2200))
1591016020000=====1591016030000
window data> (user_001,List(2200, 2100))
1591016015000=====1591016025000
window data> (user_001,List(1700, 2200, 2100))
1591016015000=====1591016025000
window data> (user_001,List(1700, 2200, 2100, 1900))
迟到的数据:> GameData(user_001,g1,1591016001000,100)