Flink DataStream - Event Time

Event Time / Processing Time / Ingestion Time

Flink 在流式传输程序中支持不同的_时间_概念。

  • 处理时间(Processing Time):处理时间是指执行相应操作的机器的系统时间。当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应操作员的计算机的系统时钟。每小时处理时间窗口将包括系统时钟指示整小时的时间之间到达特定操作员的所有记录。例如,如果应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,依此类推。处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到记录到达系统(例如从消息队列)到达系统的速度,记录在系统内部操作员之间流动的速度的影响。 以及中断(计划的或其他方式)。
  • 事件时间(Event time):事件时间是每个事件在其生产设备上发生的时间。该时间通常在它们进入Flink之前嵌入到记录中,并且 可以从每个记录中提取_事件时间戳_。在事件时间中,时间的进度取决于数据,而不取决于任何挂钟。事件时间程序必须指定如何生成“ _事件时间水印”_,这是一种表示事件时间进展的机制。在理想情况下,事件时间处理将产生完全一致且确定的结果,而不管事件何时到达或它们的顺序如何。但是,除非已知事件是按时间戳(按时间戳)到达的,否则事件时间处理会在等待无序事件时产生一定的延迟。由于只能等待有限的时间,因此这限制了确定性事件时间应用程序的可用性。
    假设所有数据都已到达,事件时间操作将按预期方式运行,即使在处理无序或迟到的事件或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含所有带有落入该小时事件时间戳的记录,无论它们到达的顺序或处理的时间。
    请注意,有时当事件时间程序实时处理实时数据时,它们将使用一些_处理时间_操作,以确保它们及时进行。
  • 摄取时间(Ingestion Time):摄取时间是事件进入Flink的时间。在源操作员处,每个记录都将源的当前时间作为时间戳记,并且基于时间的操作(如时间窗口)引用该时间戳记。
    _摄取时间_从概念上讲介于_事件时间处理时间之间_。与_处理时间_相比 ,它稍微贵一些,但结果却更可预测。由于 _摄取时间_使用稳定的时间戳(在源处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在_处理时间中,_每个窗口操作员都可以将记录分配给不同的窗口(基于本地系统时钟和任何运输延误)。
    与_事件时间_相比,_提取时间_程序无法处理任何乱序事件或迟到的数据,但是程序不必指定如何生成_水印_。
    在内部,_摄取时间事件时间_非常相似,但是具有自动时间戳分配和自动水印生成功能。




设置时间特征(Setting a Time Characteristic)

Flink DataStream程序的第一部分通常设置基准_时间特征_。该设置定义了数据流源的行为方式(例如,是否分配时间戳),以及诸如的窗口操作应使用什么时间概念KeyedStream.timeWindow(Time.seconds(30))。
以下示例显示了一个Flink程序,该程序在每小时的时间窗口中汇总事件。窗口的行为与时间特征相适应。

1
2
3
4
5
6
7
8
9
10
11
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)

请注意,为了在_事件时间中_运行此示例,程序需要使用直接为数据定义事件时间并自己发出水印的源,或者程序必须在源之后注入_Timestamp Assigner&Watermark Generator_。这些功能描述了如何访问事件时间戳,以及事件流呈现出何种程度的乱序。


以下部分描述了_时间戳水印_背后的一般机制。有关如何在Flink DataStream API中使用时间戳分配和水印生成的指南,请参阅 生成时间戳/水印


现实世界中的时间是不一致的,在 flink 中被划分为事件时间,提取时间,处理时间三种。如果以 EventTime 为基准来定义时间窗口那将形成 EventTimeWindow,要求消息本身就应该携带 EventTime 如果以 IngesingtTime 为基准来定义时间窗口那将形成 IngestingTimeWindow,以 source 的 systemTime 为准。 如果以 ProcessingTime 基准来定义时间窗口那将形成 ProcessingTimeWindow,以 operator 的 systemTime 为准。


以上三种选择是需要根据不同业务需求进行选择的,例如,在数据计算的过程中,需要考虑到网络延迟的原因,那么就应该选择setStreamTimeCharacteristic(TimeCharacteristic.EventTime),相反,不需要考虑延迟原因的话,只考虑处理数据的时间,则可以选择另两个。

Event Time and Watermarks

支持事件时间的_流处理器需要一种测量事件时间进度的方法。例如,当事件时间超过一个小时结束时,需要通知构建每小时窗口的窗口操作员,以便该操作员可以关闭正在进行的窗口。
_事件时间_可以独立于_处理时间_(由挂钟测量)进行。例如,在一个程序中,操作员的当前_事件时间_可能会稍微落后于_处理时间
(考虑到事件接收的延迟),而两者均以相同的速度进行。另一方面,另一个流媒体程序可以通过快速转发已经在Kafka主题(或另一个消息队列)中缓存的一些历史数据来在数周的事件时间内进行处理,而处理时间仅为几秒钟。


Flink中衡量事件时间进度的机制是水印。水印作为数据流的一部分流动,并带有时间戳t。甲_水印(T)_宣布事件时间达到时间 该流,这意味着应该有从该流没有更多的元素与时间戳_T” <= T_(即,具有时间戳的事件较旧的或等于水印)。
下图显示了带有(逻辑)时间戳记的事件流,以及串联的水印。在此示例中,事件是按顺序排列的(相对于其时间戳),这意味着水印只是流中的周期性标记。

水印对于_乱序_流至关重要,如下图所示,其中事件不是按其时间戳排序的。通常,水印是一种声明,即到流中的那个点,直到某个时间戳的所有事件都应该到达。一旦水印到达操作员,操作员就可以将其内部_事件时钟_提前到水印的值。

请注意,事件时间是由新创建的一个(或多个)流元素从产生它们的事件或触发了创建这些元素的水印中继承的。

并行流中的水印(Watermarks in Parallel Streams)

水印在源函数处或源函数之后直接生成。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该特定并行源处的事件时间。
随着水印在流式传输程序中的流动,它们会提前到达其到达的运营商的事件时间。每当操作员提前其事件时间时,都会为其后续操作员在下游生成新的水印。
一些运算符消耗多个输入流;例如,并集,或遵循_keyBy(…)partition(…)_函数的运算符。该操作员的当前事件时间是其输入流的事件时间中的最小值。随着其输入流更新其事件时间,操作员也将更新。
下图显示了流过并行流的事件和水印的示例,操作员跟踪事件时间。

请注意,Kafka源支持按分区添加水印,您可以在此处阅读更多信息。

后期元素

某些元素可能会违反水印条件,这意味着即使在发生_水印(t)_之后,也会出现更多时间戳为_t’<= t的_元素。实际上,在许多现实世界的设置中,某些元素可以任意延迟,从而无法指定某个事件时间戳记的所有元素都将发生的时间。此外,即使可以限制延迟,通常也不希望将水印延迟太多,因为这会导致事件时间窗的评估延迟过多。
由于这个原因,流式传输程序可能会明确期望某些_后期_元素。延迟元素是在系统的事件时间时钟(由水印指示)已经经过延迟元素时间戳的时间之后到达的元素。有关如何在事件时间窗口中使用延迟元素的更多信息,请参见允许延迟。

闲置来源

当前,使用纯事件时间水印生成器,如果没有要处理的元素,则水印将无法进行。这意味着在输入数据存在间隙的情况下,事件时间将不会继续进行,例如不会触发窗口操作符,因此现有窗口将无法生成任何输出数据。
为了避免这种情况,可以使用周期性的水印分配器,这些分配器不仅基于元素时间戳进行分配。一个示例解决方案可能是一个分配器,该分配器在一段时间内未观察到新事件之后切换为使用当前处理时间作为时间基础。
可以使用将源标记为空闲SourceFunction.SourceContext#markAsTemporarilyIdle。有关详细信息,请参考此方法的Javadoc以及StreamStatus

调试水印

请参阅“ 调试Windows和事件时间”部分以在运行时调试水印。

运营商如何处理水印

通常,要求操作员在将给定水印转发到下游之前对其进行完全处理。例如, WindowOperator将首先评估应触发哪个窗口,只有在产生了所有由水印触发的输出之后,水印本身才会被发送到下游。换句话说,由于水印的出现而产生的所有元素将在水印之前发出。
相同的规则适用于TwoInputStreamOperator。但是,在这种情况下,操作员的当前水印被定义为其两个输入的最小值。

生成时间戳/水印

本节与在事件时间运行的程序有关。有关事件时间_, _处理时间摄取时间的简介_,请参阅事件时间的简介
为了处理**_事件时间
,流式传输程序需要相应地设置时间特征**。

1
2
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

分配时间戳

为了使用_事件时间_,Flink需要知道事件的_时间戳_,这意味着流中的每个元素都需要_分配_其事件时间戳。这通常是通过从元素的某个字段访问/提取时间戳来完成的。
时间戳分配与生成水印齐头并进,水印告诉系统事件时间的进展。
有两种分配时间戳和生成水印的方法:

  1. 直接在数据流源中
  2. 通过时间戳分配器/水印生成器:在Flink中,时间戳分配器还定义要发送的水印

注意自1970年1月1日T00:00:00Z的Java时代以来,时间戳和水印都指定为毫秒。

带有时间戳和水印的源函数

流源可以将时间戳直接分配给它们产生的元素,并且它们还可以发出水印。完成此操作后,无需时间戳分配器。请注意,如果使用时间戳分配器,则源所提供的任何时间戳和水印都将被覆盖。
要将时间戳直接分配给源中的元素,源必须使用上的collectWithTimestamp(...) 方法SourceContext。要生成水印,源必须调用该emitWatermark(Watermark)函数。
下面是一个简单的示例_(非检查点)_源,该源分配时间戳并生成水印:

1
2
3
4
5
6
7
8
9
10
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)

if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}

时间戳记分配器/水印生成器

时间戳记分配器获取流并产生带有时间戳记的元素和水印的新流。如果原始流已经具有时间戳和/或水印,则时间戳分配器将覆盖它们。


时间戳记分配器通常在数据源之后立即指定,但并非严格要求这样做。例如,一种常见的模式是在时间戳分配器之前解析(_MapFunction_)和筛选器(_FilterFunction_)。无论如何,都需要在事件时间的第一个操作(例如第一个窗口操作)之前指定时间戳分配器。作为一种特殊情况,当使用Kafka作为流作业的源时,Flink允许在源(或使用者)本身内部指定时间戳分配器/水印发射器。有关如何执行此操作的更多信息,请参见 Kafka Connector文档
注意:本节的其余部分介绍了程序员为了创建自己的时间戳提取器/水印发射器而必须实现的主要接口。要查看Flink附带的预实现提取器,请参阅“ 预定义时间戳提取器/水印发射器”页面。

  • 爪哇
  • 斯卡拉
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val stream: DataStream[MyEvent] = env.readFile(
    myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
    FilePathFilter.createDefaultFilter())

    val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
    .filter( _.severity == WARNING )
    .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

    withTimestampsAndWatermarks
    .keyBy( _.getGroup )
    .timeWindow(Time.seconds(10))
    .reduce( (a, b) => a.add(b) )
    .addSink(...)

    **

    带有定期水印

    AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流元素,或仅基于处理时间)。
    通过定义生成水印的间隔(每n毫秒) ExecutionConfig.setAutoWatermarkInterval(...)。分配器的getCurrentWatermark()方法每次都会被调用,如果返回的水印非空且大于前一个水印,则将发出新的水印。
    在这里,我们显示了两个使用定期水印生成的时间戳分配器的简单示例。请注意,Flink随附与以下所示BoundedOutOfOrdernessTimestampExtractor类似的内容BoundedOutOfOrdernessGenerator,您可以在此处阅读有关内容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

val maxOutOfOrderness = 3500L // 3.5 seconds

var currentMaxTimestamp: Long = _

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}

override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}

/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

val maxTimeLag = 5000L // 5 seconds

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}

override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}

**

带标点的水印

要在特定事件表明可能会生成新的水印时生成水印,请使用 AssignerWithPunctuatedWatermarks。对于此类,Flink将首先调用该extractTimestamp(...)方法为元素分配时间戳,然后立即checkAndGetNextWatermark(...)在该元素上调用该 方法。
checkAndGetNextWatermark(...)方法会传递该方法中分配的时间戳extractTimestamp(...) ,并可以决定是否要生成水印。每当该checkAndGetNextWatermark(...) 方法返回一个非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。

  • 爪哇
  • 斯卡拉
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
    element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
    if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
    }
    _注意:_可以在每个事件上生成水印。但是,由于每个水印都会在下游引起一些计算,因此过多的水印会降低性能。

    每个Kafka分区的时间戳

    当使用Apache Kafka作为数据源时,每个Kafka分区可能都有一个简单的事件时间模式(时间戳升序或有界乱序)。但是,在使用来自Kafka的流时,通常会并行使用多个分区,从而交错插入分区中的事件并破坏每个分区的模式(这是Kafka的客户客户端工作方式所固有的)。


    在这种情况下,您可以使用Flink的Kafka分区感知水印生成。使用该功能,将在Kafka使用者内部针对每个Kafka分区生成水印,并且按与合并水印在流shuffle上相同的方式合并每个分区的水印。
    例如,如果事件时间戳严格按照每个Kafka分区递增,则使用递增时间戳水印生成器生成按分区的水印 将产生完美的整体水印。
    下图显示了如何使用按kafka分区的水印生成,以及在这种情况下水印如何通过流数据流传播。
1
2
3
4
5
6
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)




预定义的时间戳提取器/水印发射器

时间戳和水印处理中所述,Flink提供了抽象,允许程序员分配自己的时间戳并发出自己的水印。更具体地说,根据使用情况,可以通过实现AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks接口之一来实现。


简而言之,第一个将定期发出水印,而第二个则根据传入记录的某些属性发出水印,例如,每当流中遇到特殊元素时。为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了开箱即用的功能外,它们的实现还可以作为自定义实现的示例。

时间戳递增的分配者

_定期_生成水印的最简单的特殊情况是给定源任务看到的时间戳以升序出现的情况。在这种情况下,当前时间戳始终可以充当水印,因为没有更早的时间戳会到达。
请注意,_每个并行数据源任务_只需要增加时间戳记即可。例如,如果在一个特定的设置中,一个并行数据源实例读取一个Kafka分区,则只需要在每个Kafka分区内将时间戳记递增。每当对并行流进行混洗,合并,连接或合并时,Flink的水印合并机制都会生成正确的水印。

1
2
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )

**

分配器允许固定的延迟时间

周期性水印生成的另一个示例是水印在流中看到的最大(事件时间)时间戳落后固定时间量的情况。这种情况包括预先知道流中可能遇到的最大延迟的场景,例如,当创建包含时间戳的元素的自定义源时,该时间戳在固定的时间段内传播以进行测试。


对于这些情况,Flink提供了BoundedOutOfOrdernessTimestampExtractor,将用作参数maxOutOfOrderness,即在计算给定窗口的最终结果时允许元素延迟到被忽略之前的最长时间。延迟对应于的结果t - t_w,其中t元素的(事件时间)时间戳和t_w前一个水印的时间戳。如果lateness > 0那么该元素将被认为是较晚的元素,默认情况下,在为其相应窗口计算作业结果时将其忽略。有关 使用延迟元素的更多信息,请参见有关允许延迟的文档。

1
2
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))