在之前的博文中,我们讲到了Flink WaterMark机制来针对乱序数据的处理,并且设置了一个数值较小的延迟时间maxOutOfOrderness,通常这个允许的延迟时间是我们根据具体业务数据观察而来的,例如通过观察,我司的游戏数据大概会延迟2~3秒到达,所以针对我司的业务,maxOutOfOrderness可以设置为2秒。
但是针对一些特殊的情况,游戏网关会有阻塞,发送出来的数据会有较大的延迟情况,例如某些数据会迟到20分钟到达,并且数据不能被抛弃,如果这时我们把maxOutOfOrderness设置为20分钟,那显然是不合适的,每个窗口的周期会变得超长,并且在内存中缓存了太多的数据。针对这种情况,Flink给出的解决方案是Allowed Lateness
在allowedLateness()中会设置一个Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明
注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常
默认情况下,允许的延迟设置为 0
。也就是说,到达水印后的元素将被丢弃。
获取最新数据作为侧面输出
使用Flink的侧面输出流功能,您可以获取最近被丢弃的数据流。
首先,您需要指定要sideOutputLateData(OutputTag)
在窗口流上使用的较晚数据。然后,您可以根据窗口化操作的结果获取侧面输出流,获取到的侧输出流,你可以对历史数据进行更新操作:
1 | val lateOutputTag = OutputTag[T]("late-data") |
下面给出一个完整的实例,需求背景,每5秒统计一次近10秒时间内,用户在各个游戏的得分列表,乱序数据允许2秒的延迟,对于晚期数据到达,allowedLateness允许20秒的延迟,并且对晚期数据进行收集,后续和直接对接数据库更新。
1 | package data_stream.watermark |
输入测试数据:
1 | user_001,g1,1591016001000,100 |
输出结果:
1 | 1591015995000=====1591016005000 |