Flink Watermark


如果您正在构建实时流媒体应用程序,则事件时间处理是您必须迟早使用的功能之一。由于在大多数现实世界的用例中,消息到达无序,应该有一些方法,您建立的系统了解消息可能迟到并且相应地处理的事实。在这篇博文中,我们将看到为什么我们需要事件时间处理,以及我们如何在ApacheFlink中启用它。


EventTime是事件在现实世界中发生的时间,ProcessingTime是Flink系统处理该事件的时间。要了解事件时间处理的重要性,我们首先要建立一个基于Process Time的系统,看看它的缺点。


我们将创建一个大小为10秒的SlidingWindow,每5秒滑动一次,在窗口结束时,系统将发出在此期间收到的消息数。一旦了解EventTime处理如何与SlidingWindow相关的工作,那么了解如何在TumblingWindow中工作也不难。所以让我们开始吧。

基于处理时间的系统


对于这个例子,我们的消息格式是 value,timestamp,其中value是消息,timestamp是在源生成此消息的时间。由于我们正在构建基于Process Time的系统,因此以下代码忽略了时间戳部分。


了解消息应包含生成时间的信息是一个重要的方面。Flink或任何其他系统不是一个魔术盒,可以以某种方式自己形成这个。稍后我们将看到,事件时间处理提取此时间戳信息以处理较晚的消息。




支持事件时间的流处理器需要一种测量事件时间进度的方法。例如,当事件时间超过一个小时结束时,需要通知构建每小时窗口的窗口算子,以便该窗口算子可以关闭正在进行的窗口。


Event Time可以独立于Process Time进行。例如,在一个程序中,窗口算子的当前事件时间可能会稍微落后于处理时间 (考虑到事件接收的延迟),而两者均以相同的速度进行。另一方面,另一个流媒体程序可以通过快速转发已经在Kafka主题(或另一个消息队列)中缓存的一些历史数据来在数周的事件时间内进行处理,而处理时间仅为几秒钟。

情况1

消息到达不间断

假设源分别在时间13秒生产两条消息和第16秒产生一条,类型为a的三个消息。(小时和分钟不重要,因为窗口大小只有10秒)。

这些消息将落入Windows中,如下所示。在第13秒产生的前两个消息将落入window1 [5s-15s]和window2 [10s-20s],第16个时间生成的第三个消息将落入window2 [ 10s-20s]和window3 [15s-25s] ]。每个window发出的最终计数分别为(a,2),(a,3)和(a,1)。

该输出可以被认为是预期的行为。现在我们将看看当一个消息到达系统的时候会发生什么。

情况2


消息到达延迟

现在假设其中一条消息(在第13秒生成)到达延迟6秒(即第19秒才到达),可能是由于某些网络拥塞。你能猜测这个消息会落入哪个window?

延迟的消息落入window2和window3,因为19秒在10-20和15-25之间。在window2中计算没有任何问题(因为消息应该落入该窗口),但是它影响了window1和window3的结果。我们现在将尝试使用EventTime处理来解决这个问题。

基于EventTime的系统

要启用EventTime处理,我们需要一个时间戳提取器,从输入的数据中提取Event Time信息。请记住,数据的格式是Value,Timestamp。该extractTimestamp方法获取Timestamp部分,并以Long型进行返回。现在忽略getCurrentWatermark方法,我们稍后再回来。

1
2
3
4
5
6
7
8
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
e.split(",")(1).toLong
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis)
}
}

我们现在需要设置这个时间戳提取器,并将TimeCharactersistic设置为EventTime。其余的代码与ProcessingTime的情况保持一致。

1
2
3
4
5
6
7
8
9
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("EventTime processing example")


运行上述代码的结果如下图所示。


结果看起来好一些,window 2和window 3现在得到了正确的结果,但是window 1仍然是错误的。Flink没有将延迟的消息分配给window 3,因为它现在检查了消息的事件时间,并且理解它不在该窗口中。但是为什么没有将消息分配给window 1 呢?原因是在延迟的信息到达系统时(第19秒),window 1的窗口计算已经完成了(第15秒)。现在让我们尝试通过使用watermark来解决这个问题。


ps:请注意,在window 2中,延迟的消息仍然位于第19秒,而不是第13秒(事件时间)。该图中的描述是故意表示窗口中的消息不会根据事件时间进行排序。(这可能会在将来改变)

WaterMark


watermark是一个非常重要和有趣的想法,watermark本质上是一个时间戳。当Flink中的运算符接收到watermark时,它明白(假设)它不会看到比该时间戳更早的消息。


( 原文:A Watermark is essentially a timestamp. When an Operator in Flink receives a watermark, it understands(assumes) that it is not going to see any message older than that timestamp.我这里理解是翻译为更早的信息或者更晚的信息都可以,因为从时间上来说,1990年比1994年要老,但是1990年比1994年也是可以说是早的,那么就是它看不到更早/老的信息)


因此,在“EventTime”中,watermark也可以被认为是一种告诉Flink它有多远的一种方式。


Flink中衡量Event Time进度的机制是watermark水印。watermark简单的来说,就是一个延迟触发机制,watermark伴随着每一条数据,作为数据流的一部分流动,并带有时间戳_t,_其值为当前已流入数据中最大的Event Time-最大延迟时间 maxEventTime(程序中自定义),当watermark的值大于等于某个窗口的结束时间,则那个窗口立即触发执行。


这个例子的目的,把它看作是一种告诉Flink一个消息会延迟多少的方式。在上一次尝试中,我们将watermark设置为当前系统时间。因此,不要指望任何延迟的消息。我们现在将水印设置为当前时间-5秒,这告诉Flink希望消息最多有5s的延迟,这是因为每个窗口仅在水印通过时被评估。


由于我们的水印是当前时间-5秒,所以第一个窗口[5s-15s]将仅在第20秒被评估。类似地,窗口[10s-20s]将在第25秒进行评估,依此类推。

1
2
3
override def getCurrentWatermark(): Watermark = { 
new Watermark(System.currentTimeMillis - 5000)
}


通常最好保持接收到的最大时间戳,并创建具有最大预期延迟的水印,而不是从当前系统时间减去。


进行上述更改后运行代码的结果是:


最后我们得到了正确的结果,所有这三个窗口现在都按照预期的方式发射计数,这是(a,2),(a,3)和(a,1)。


注意:我们也可以使用AllowedLateness功能设置消息的最大允许时间来解决这个问题,这个后面我们再讲


Watermarks in Parallel Streams


并行流中的水印


watermark在源函数处或源函数之后直接生成。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该并行源处的event time。


随着watermark在流式传输程序中的流动,它们可能会比operators处的事件时间提前到达。每当一个operators提前它的事件时间,它为它的后继操作符产生一个新的下游水印。


一些算子将消耗多个输入流;例如,并集,keyBy()或partition()函数的运算符。该operator的当前事件时间是其输入流的事件时间中的最小值。随着其输入流更新其事件时间,operator也将更新。也就是operator会在各个到达的event time中,选择最小的event time进行“对齐”。

下图显示了流过并行流的事件和水印的示例,操作员跟踪事件时间。



请注意,Kafka源支持按分区添加水印,您可以在此处阅读更多信息。

Late Elements


迟到元素**


某些元素可能会违反水印条件,这意味着在_水印(t)_之后发生,也会出现更多时间戳为_t’<= t的_元素。实际上,在许多现实世界的设置中,某些元素可以任意延迟,这使得不可能指定某个事件时间戳的所有元素发生的时间。


此外,即使可以限制延迟,通常也不希望将水印延迟太多,因为这会导致事件时间窗的评估延迟过多。
由于这个原因,流式传输程序可能会明确期望某些_后期_元素。延迟元素是在系统的事件时间时钟(由水印指示)已经经过延迟元素时间戳的时间之后到达的元素。有关如何在事件时间窗口中使用延迟元素的更多信息


请参见允许延迟。

生成时间戳/水印


本节与在event time运行的程序有关。有关事件时间, 处理时间和摄取时间的简介,请参阅事件时间的简介
为了处理**事件时间,流式传输程序需要相应地设置时间特征**。

1
2
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

分配时间戳


为了使用事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过从元素的某个字段访问/提取时间戳来完成的。


时间戳分配与生成水印齐头并进,水印告诉系统事件时间的进展。
有两种分配时间戳和生成水印的方法:

  1. 直接在数据流源中
  2. 通过时间戳分配器/水印生成器:在Flink中,时间戳分配器还定义要发送的水印


注意**自1970年1月1日T00:00:00Z的Java时代以来,时间戳和水印都指定为毫秒。

带有时间戳和水印的源函数


流源可以将时间戳直接分配给它们产生的元素,并且它们还可以发出水印。完成此操作后,无需时间戳分配器。请注意,如果使用时间戳分配器,则源所提供的任何时间戳和水印都将被覆盖。


要将时间戳直接分配给源中的元素,源必须使用上的collectWithTimestamp() 方法SourceContext。要生成水印,源必须调用该emitWatermark(Watermark)函数。

下面是一个简单的示例(非检查点)源,该源分配时间戳并生成水印:

1
2
3
4
5
6
7
8
9
10
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)

if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}

**

带有定期水印


AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印。通过定义生成水印的间隔(每n毫秒) ExecutionConfig.setAutoWatermarkInterval()。分配器的getCurrentWatermark()方法每次都会被调用,如果返回的水印非空且大于前一个水印,则将发出新的水印。


Flink提供了抽象,允许程序员分配自己的时间戳并发出自己的水印。更具体地说,根据使用情况,可以通过实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口之一来实现。简而言之,第一个将定期发出水印,而第二个则根据传入记录的某些属性发出水印,例如,每当流中遇到特殊元素时。


为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了开箱即用的功能外,它们的实现还可以作为自定义实现的示例。

**

时间戳递增的分配者

_
定期生成水印的最简单的特殊情况是给定源任务看到的时间戳以升序出现的情况。在这种情况下,当前时间戳始终可以充当水印,因为没有更早的时间戳会到达。


请注意,每个并行数据源任务只需要增加时间戳记即可。例如,如果在一个特定的设置中一个并行数据源实例读取一个Kafka分区,则仅在每个Kafka分区内将时间戳记递增是必要的。每当对并行流进行混洗,合并,连接或合并时,Flink的水印合并机制都会生成正确的水印。

1
2
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

**

分配器允许固定的延迟时间


周期性水印生成的另一个示例是水印在流中看到的最大(事件时间)时间戳落后固定时间量的情况。这种情况涵盖了预先知道流中可能遇到的最大延迟的场景,例如,当创建包含时间戳的元素的自定义源时,该时间戳在固定时间段内传播以进行测试。对于这些情况,Flink提供了BoundedOutOfOrdernessTimestampExtractor,将用作参数maxOutOfOrderness,即在计算给定窗口的最终结果时允许元素延迟到被忽略之前的最长时间。


延迟对应于的结果t - t_w,其中t元素的(事件时间)时间戳和t_w前一个水印的时间戳。如果lateness > 0那么该元素将被认为是较晚的元素,默认情况下,在为其相应窗口计算作业结果时将其忽略。有关 使用延迟元素的更多信息,请参见有关允许延迟的文档。

1
2
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))

**

带标点的水印

**
要在特定事件表明可能会生成新的水印时生成水印,请使用 AssignerWithPunctuatedWatermarks。对于此类,Flink将首先调用该extractTimestamp()方法为元素分配时间戳,然后立即checkAndGetNextWatermark()在该元素上调用该 方法。


该checkAndGetNextWatermark()方法会传递该方法中分配的时间戳extractTimestamp() ,并可以决定是否要生成水印。每当该checkAndGetNextWatermark() 方法返回一个非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。

1
2
3
4
5
6
7
8
9
10
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}

override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}

_注意:_可以在每个事件上生成水印。但是,由于每个水印都会在下游引起一些计算,因此过多的水印会降低性能。

每个Kafka分区的时间戳


当使用Apache Kafka作为数据源时,每个Kafka分区可能都有一个简单的事件时间模式(时间戳升序或有界乱序)。但是,在使用来自Kafka的流时,通常会并行使用多个分区,从而交错插入分区中的事件并破坏每个分区的模式(这是Kafka的客户客户端工作方式所固有的)。


在这种情况下,您可以使用Flink的Kafka分区感知水印生成。使用该功能,将在Kafka使用者内部针对每个Kafka分区生成水印,并且按与合并水印在流shuffle上相同的方式合并每个分区的水印。


例如,如果事件时间戳严格按照每个Kafka分区递增,则使用递增时间戳水印生成器生成按分区的水印 将产生完美的整体水印。


下图显示了如何使用按kafka分区的水印生成,以及在这种情况下水印如何通过流数据流传播。

1
2
3
4
5
6
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)





预定义的时间戳提取器/水印发射器


时间戳和水印处理中所述,Flink提供了抽象,允许程序员分配自己的时间戳并发出自己的水印。更具体地说,根据使用情况,可以通过实现AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口之一来实现。


简而言之,第一个将定期发出水印,而第二个则根据传入记录的某些属性发出水印,例如,每当流中遇到特殊元素时。为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了开箱即用的功能外,它们的实现还可以作为自定义实现的示例。


在 Flink watermark这一模块,相对于Spark Streaming来说,确实有些晦涩难懂,在前面的一些章节中,Flink程序举例我都是直接对接kafka读取数据进行处理,但是在watermark模块中,还是使用socket发送数据,可以更清晰的看出Flink watermark机制下窗口的触发。下面就是针对watermark进行的程序实例。


基于有序数据的watermark,以下代码需求场景,我根据公司游戏监控项目的最基本的原理,使用简洁的数据进行演示,每5秒计算近10秒中,网关传递数据延迟时间最大的步骤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package data_stream.watermark
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object watermark_orderness {

case class game_gateway_data(user_id: String, step_num: String, gateway_time: Long, gateway_name: String, delay_time: Int)

def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val socketStream: DataStream[String] = env.socketTextStream("localhost",8888)

val gameGatewayStream: DataStream[game_gateway_data] = socketStream.map((line: String) => {
val array_data: Array[String] = line.split(",")
game_gateway_data(array_data(0), array_data(1), array_data(2).toLong, array_data(3), array_data(4).toInt)
})
//设置时间语义
.assignAscendingTimestamps((_: game_gateway_data).gateway_time)

gameGatewayStream
.keyBy((_: game_gateway_data).user_id)
.timeWindow(Time.seconds(10), Time.seconds(5))
.reduce(new MyReduceFunction, new stepTimeWindowFunction)
.print("====>")
env.execute("orderness case")

}

class MyReduceFunction extends ReduceFunction[game_gateway_data]{
override def reduce(t: game_gateway_data, t1: game_gateway_data): game_gateway_data = {
if (t.delay_time >= t1.delay_time) t else t1
}
}

class stepTimeWindowFunction extends WindowFunction[game_gateway_data, game_gateway_data, String, TimeWindow]{
// 在窗口结束时调用
override def apply(key: String, window: TimeWindow, input: Iterable[game_gateway_data], out: Collector[game_gateway_data]): Unit = {
// input中只有一条数据
println(s"${window.getStart}===${window.getEnd}")
val result: game_gateway_data = input.iterator.next()
out.collect(result)
}
}
}

下面需要特别注意,也可以像我一样使用nc来调试窗口打印,可以清楚的了解窗口之间的关系,首先,我依次在控制台输入以下数据,一共五个批次的数据,每条数据最后一个字段是“delay_time 延迟时间”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
user_001,1,1591016077123,visitor_gateway,2
user_001,2,1591016078123,login_gateway,3
user_001,3,1591016080123,userCenter_gateway,3


user_001,4,1591016081123,gameCenter_gateway,4
user_001,5,1591016085123,payCenter_gateway,5


user_001,6,1591016087123,exitCenter_gateway,6
user_001,10,1591016090123,exitCenter_gateway,10


user_001,11,1591016095123,exitCenter_gateway,11


user_001,12,1591016100123,exitCenter_gateway,12

依次打印以下五个窗口结果,仔细观察各个窗口的划分,其中我们来看第三个批次数据,当我输入延迟时间分别为6,10的时候,1591016080000===1591016090000窗口触发,返回当前窗口延迟时间最大的记录为6,为什么不是10呢?? 这是因为窗口计算中,每个窗口的数据,左闭右开,也就是延迟时间为10的这条数据不加入1591016080000===1591016090000窗口的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1591016070000===1591016080000
====>> game_gateway_data(user_001,2,1591016078123,login_gateway,3)

1591016075000===1591016085000
====>> game_gateway_data(user_001,4,1591016081123,gameCenter_gateway,4)

1591016080000===1591016090000
====>> game_gateway_data(user_001,6,1591016087123,exitCenter_gateway,6)

1591016085000===1591016095000
====>> game_gateway_data(user_001,10,1591016090123,exitCenter_gateway,10)

1591016090000===1591016100000
====>> game_gateway_data(user_001,11,1591016095123,exitCenter_gateway,11)

以下是窗口划分的源码:
屏幕快照 2020-06-01 下午11.16.01.png


基于乱序数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package data_stream.watermark

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object watermark_outoforderness {

case class game_gateway_data(user_id: String, step_num: String, gateway_time: Long, gateway_name: String, delay_time: Int)

def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.getConfig.setAutoWatermarkInterval(100L) //watermark周期

val socketStream: DataStream[String] = env.socketTextStream("localhost", 8888)

val gameGatewayStream: DataStream[game_gateway_data] = socketStream.map((line: String) => {
val array_data: Array[String] = line.split(",")
game_gateway_data(array_data(0), array_data(1), array_data(2).toLong, array_data(3), array_data(4).toInt)
})
//乱序数据处理,采用周期性watermark,设置延迟时间为5秒
//第一种写法,采用AssignerWithPeriodicWatermarks
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[game_gateway_data](Time.seconds(3)) {
"""抽取时间戳,设置event time"""

override def extractTimestamp(element: game_gateway_data): Long = {
element.gateway_time
}
})


//第二种写法,自己定义实现类
// .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[game_gateway_data] {
// var maxEventTime: Long = _
//
// //周期性生成watermark
// override def getCurrentWatermark = {
// new Watermark(maxEventTime - 3000L)
// }
//
// //设定event time
// override def extractTimestamp(element: game_gateway_data, previousElementTimestamp: Long) = {
// //设置maxEventTime
// maxEventTime = maxEventTime.max(element.gateway_time)
// element.gateway_time
// }
// })

gameGatewayStream
.keyBy((_: game_gateway_data).user_id)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.reduce(new MyReduceFunction, new stepTimeWindowFunction)
.print("====>")

env.execute("orderness case")

}

class MyReduceFunction extends ReduceFunction[game_gateway_data] {
override def reduce(t: game_gateway_data, t1: game_gateway_data): game_gateway_data = {
if (t.delay_time >= t1.delay_time) t else t1
}
}

class stepTimeWindowFunction extends WindowFunction[game_gateway_data, game_gateway_data, String, TimeWindow] {
// 在窗口结束时调用
override def apply(key: String, window: TimeWindow, input: Iterable[game_gateway_data], out: Collector[game_gateway_data]): Unit = {
// input中只有一条数据
println(s"${window.getStart}===${window.getEnd}")
val result: game_gateway_data = input.iterator.next()
out.collect(result)
}
}
}

我依次在控制台输入以下数据,请注意,窗口对于延迟数据,是怎样处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
70-80(窗口范围,此条无需输入)
user_001,1,1591016077123,visitor_gateway,2
user_001,2,1591016078123,login_gateway,3
user_001,3,1591016080123,userCenter_gateway,3
user_001,4,1591016083123,gameCenter_gateway,4


75-85(窗口范围,此条无需输入)

user_001,4,1591016081123,gameCenter_gateway,4
user_001,5,1591016085123,payCenter_gateway,5
user_001,6,1591016088123,exitCenter_gateway,6


80-90(窗口范围,此条无需输入)

user_001,6,1591016079123,exitCenter_gateway,20
user_001,6,1591016080123,exitCenter_gateway,40
user_001,10,1591016093123,exitCenter_gateway,10

85-95(窗口范围,此条无需输入)
user_001,6,1591016084123,exitCenter_gateway,84
user_001,10,1591016098123,exitCenter_gateway,10

90-100(窗口范围,此条无需输入)

user_001,10,1591016090123,exitCenter_gateway,90
user_001,10,1591016094123,exitCenter_gateway,94
user_001,10,1591016091123,exitCenter_gateway,91
user_001,10,1591016092123,exitCenter_gateway,92
user_001,10,1591016098123,exitCenter_gateway,98
user_001,10,1591016099123,exitCenter_gateway,99
user_001,10,1591016095123,exitCenter_gateway,95
user_001,10,1591016096123,exitCenter_gateway,96
user_001,10,15910160100123,exitCenter_gateway,100

结果如下,根据下面的结果,我们可以看出,基于延迟机制watermark并且晚于窗口触发到达的数据,将会被抛弃

1
2
3
4
5
6
7
8
9
10
11
12
1591016070000===1591016080000
====>> game_gateway_data(user_001,2,1591016078123,login_gateway,3)
1591016075000===1591016085000
====>> game_gateway_data(user_001,4,1591016083123,gameCenter_gateway,4)
1591016080000===1591016090000
====>> game_gateway_data(user_001,6,1591016080123,exitCenter_gateway,40)
1591016085000===1591016095000
====>> game_gateway_data(user_001,10,1591016093123,exitCenter_gateway,10)
1591016090000===1591016100000
====>> game_gateway_data(user_001,10,1591016099123,exitCenter_gateway,99)
1591016095000===1591016105000
====>> game_gateway_data(user_001,10,1591016099123,exitCenter_gateway,99)


实时流处理系统的重要性日益增长,必须处理延迟的消息是您构建的任何此类系统的一部分。在这篇博文中,我们看到到达的消息迟到会影响系统的结果,以及如何使用ApacheFlink的事件时间处理功能来解决它们。