前言
Flink中的window窗口概念和Spark Streaming中的window是一样的。对一直流动的数据划定一个固定的窗口。
以交通传感器为例,该传感器每15秒统计通过某个位置的车辆数量。结果流看起来像:
如果您想知道有多少辆车经过该位置,您只需将各个计数相加即可。但是,传感器流的本质是它连续产生数据。这样的流永远不会结束,并且不可能计算可以返回的最终和。相反,可以计算滚动总和,即为每个输入事件返回更新的总和记录。这将产生新的部分和流。
但是,部分求和流可能不是我们想要的,因为它会不断更新计数,更重要的是,某些信息(例如随时间变化)会丢失。因此,我们可能想改一下我们的问题,并询问每分钟通过该位置的汽车数量。这要求我们将流的元素分组为有限的集合,每个集合对应于60秒。此操作称为_滚动Windows_操作。(不重叠)
滚动窗口将流离散化为不重叠的窗口。对于某些应用程序,重要的是窗口不可分离,因为应用程序可能需要平滑的聚合。例如,我们可以每30秒计算最后一分钟通过的汽车数量。这种窗户称为_滑动窗户_。(重叠)
如前所述,在数据流上定义窗口是非并行操作。这是因为流的每个元素必须由决定该元素应添加到哪个窗口的同一窗口运算符处理。完整流中的Windows 在Flink 中称为_AllWindows_。对于许多应用程序,数据流需要分组为多个逻辑流,每个逻辑流都可以应用窗口运算符。
例如,考虑来自多个交通传感器(而不是像前面的示例中的一个传感器)的车辆计数流,其中每个传感器监视一个不同的位置。通过按传感器ID对流进行分组,我们可以并行计算每个位置的窗口流量统计信息。在Flink中,我们称此类分区窗口为_Windows_,因为它们是分布式流的常见情况。下图显示了翻转窗口,该窗口收集了(sensorId, count)
成对元素流中的两个元素。
一般来说,窗口在无界流上定义了一组有限的元素。该集合可以基于时间(如我们前面的示例中所示),元素计数,计数和时间的组合,或一些将元素分配给窗口的自定义逻辑。Flink的DataStream API为最常见的窗口操作提供了简洁的运算符,并提供了一种通用的窗口机制,该机制允许用户定义非常自定义的窗口逻辑。在下面的内容中,我们将介绍Flink的时间和计数窗口,然后再详细讨论其窗口机制。
Window
Windows是处理无限流的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用计算。
Flink window窗口程序的一般结构如下所示。第一段指的是KeyedStream[T]流,第二段指的是non-keyed**DataStream[T]流。以下两段代码对比,唯一的区别是数据流stream经过keyBy(…)后,需要调用window(…)来创建窗口,而non-keyed stream则需要调用windowAll(…)创建窗口。
Keyed window**
1 | stream |
Non-Keyed Windows**
1 | stream |
在上面,方括号([…])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,从而使其最适合您的需求。
window生命周期
简而言之,一旦应属于该窗口的第一个元素到达,就会创建一个窗口,并且当时间(event or processing time)超过其结束时间戳加上用户指定的延迟时间,该窗口将被完全删除。Flink保证只删除基于时间(time-based windows)的窗口,而不保证其他类型(_例如 _global windows)的删除。
例如,采用基于event time的窗口化策略,该策略每5分钟创建一次不重叠的滚动窗口,并允许延迟1分钟,因此Flink将会创建一个时间在12:00~12:05之间的串口,当带有时间戳的第一个元素落入此间隔时,当水印通过12:06 时间戳时,它将删除它。
此外,每个窗口将具有Trigger触发器 和一个函数(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction等窗口功能。该函数将包含要应用于窗口内容的计算,而则Trigger指定了在什么条件下可以将窗口视为要应用该函数的条件。触发策略可能类似于“当窗口中的元素数大于4时”或“当watermark水印通过窗口末尾时”。触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在这种情况下,清除仅是指窗口中的元素,而不是窗口元数据。这意味着仍可以将新数据添加到该窗口。
除上述内容外,您还可以指定一个Evictor(请参阅Evictors),它将在触发触发器后以及应用此功能之前和/或之后从窗口中删除元素。
在下文中,我们将对上述每个组件进行更详细的介绍。我们先从上面的代码片段中的必需部分开始(请参见Keyed vs Non- Keyed Windows,Window Assigner和 Window Function)。
Keyed vs Non-Keyed Windows
在我们接入流数据后,第一件事情就是我们要在定义window函数前,需要决定你的数据流是否需要进行 keyed。
如果我们使用了keyBy(),那么输入的数据流将会被切分成logical keyed streams,否则,将不会对数据流进行切分,
在使用 keyed streams的情况下,传入事件的任何属性都可以用作key。拥有keyed streams将使您的窗口化计算可以由多个任务并行执行,因为logical keyed streams都可以独立于其余logical keyed streams进行处理。引用同一键的所有元素将被发送到同一并行任务task中。
对于non-keyed streams,您的原始流将不会拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1。
Window Assigners窗口分配器
在决定是否要对数据流keyed后,下一步则是定义window assigners窗口分配器,window assigners定义了数据流元素怎样通过窗口,这完全取决于你的window assigners的选择,window()还是windowAll()
WindowAssigner负责将每个传入元素分配给一个或多个窗口。Flink带有针对最常见用例的预定义窗口分配器,即tumbling windows,滚动窗口, sliding windows滑动窗口,session windows会话窗口和global windows全局窗口 。您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。所有内置窗口分配器(global windows**全局窗口除外**)均基于时间将元素分配给窗口,时间可以是process time,也可以是event time。process time和event time之间的差异以及timestamps和watermarks的生成方式稍后再讲。
基于时间的窗口具有开始时间戳(包括端点)和结束时间戳(包括端点),它们共同描述了窗口的大小。在代码中,Flink在使用TimeWindow基于时间的窗口时使用,该方法具有查询开始和结束时间戳记的方法maxTimestamp(),还具有返回给定窗口允许的最大时间戳的附加方法。
在下面,我们展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配器的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度。
Tumbling Windows滚动窗口
滚动_窗口_分配器分配每个元素到指定_窗口大小_的窗口。滚动窗口具有固定的大小,并且不重叠。例如,如果您指定大小为5分钟的滚动窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口。
那么在滚动窗口上的实现,Spark Streming和Flink有明显的不同,最大的区别是spark streaming在一开始便指定了切分流数据的方式val ssc = new StreamingContext(conf, Seconds(10)) 即10秒切分一次,然后对这一批进行处理。
而Flink是后面自己通过Window算子timeWindow(Time.seconds(10))来指定每10秒作为一个数据窗口进行处理。
以上可以看出Spark Streaming的底层为micro batch处理方式,Flink为流处理方式。
Flink滚动窗口如下图所示。
为了更好的演示Flink的各种窗口操作,我花了一点时间用python写了一个模拟实时产生用户数据的程序,随后将产生的数据写入kafka中,顺便提一下Faker这个python中的第三方库,模拟数据很好用,以前只在爬虫程序中,用过模拟浏览器信息。
Flink tumbling window 代码结构
1 | val input: DataStream[T] = ... |
窗口函数,这里也可以使用timeWindow(Time.seconds(5)),时间间隔可以通过使用一个指定Time.milliseconds(x)
,Time.seconds(x)
, Time.minutes(x)
,等等。
如最后一个示例所示,滚动窗口分配器还采用一个可选offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即您将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999依此类推。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等。一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定的偏移量Time.hours(-8)。
Sliding Windows_滑动窗口_
这个滑动窗口概念和Spark Streaming中的也是一样的,在Spark Streaming中是这样实现的
1 | val window_dstream: DStream[(String, Int)] = wordcount_dstream.reduceByKeyAndWindow((x: Int,y: Int)=>x+y,Seconds(30), Seconds(10)) |
_滑动窗口_分配器让元素以固定长度的窗口通过。类似于滚动窗口分配器,_窗口的大小由窗口大小_参数配置。附加的_窗口滑动_参数控制滑动窗口启动的频率。因此,如果滑动参数小于窗口大小,则滑动窗口会重叠。在这种情况下,元素被分配给多个窗口。
例如,您可以将大小为10分钟的窗口滑动5分钟。这样,您每隔5分钟就会得到一个窗口,其中包含最近10分钟内到达的事件,如下图所示。
sliding windows滑动窗口代码结构
**
1 | val input: DataStream[T] = ... |
窗口函数,这里也可以使用timeWindow(Time.seconds(10), Time.seconds(5)),时间间隔可以通过使用一个指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x),等等。
如最后一个示例所示,滑动窗口分配器还采用一个可选offset参数,该参数可用于更改窗口的对齐方式。例如,在没有偏移的情况下,每小时滑动30分钟的窗口将与epoch对齐,即您将获得诸如的窗口 1:00:00.000 - 1:59:59.999,1:30:00.000 - 2:29:59.999依此类推。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,1:45:00.000 - 2:44:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定的偏移量Time.hours(-8)。
Session Windows会话窗口
会话窗口中的元素是正在活跃的会话产生的数据。与滚动窗口和滑动窗口相比,会话窗口不重叠且没有固定的窗口开始和结束时间。相反,当会话窗口在一定时间段内未收到元素时(即不发生活动数据间隙时),它将关闭。会话窗口分配器配置静态会话间隔,其限定当会话不活动周期有多长。当此时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。
在现实中的意义就是,当数据处于连续活跃的状态下,并不划分窗口,但是当数据处于不活跃的状态,相邻数据的时间超过了设定的gap,则划定一个新窗口。
1 | val input: DataStream[T] = ... |
静态间隙可以通过使用中的一个来指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)。
动态间隙是通过实现SessionWindowTimeGapExtractor接口指定的。
由于会话窗口没有固定的开始和结束,因此对它们的评估不同于滚动窗口和滑动窗口。在内部,会话窗口运算符会为每个到达的记录创建一个新窗口,如果窗口彼此之间的距离比已定义的间隔小,则将它们合并在一起。为了可以将数据合并,会话窗口需要一个合并触发器以及合并窗口函数,如ReduceFunction,AggregateFunction,或ProcessWindowFunction (FoldFunction不能合并。)
接下来,所有的窗口函数操作的数据,都是基于下面我用python下的模拟生产用户数据。
python模拟数据代码
1 | import random |
数据样式
ReduceFunction
ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用a ReduceFunction来逐步增量的聚合窗口元素。ReduceFunction的源码:
1 | /** |
利用tumbling window举例
1 | package data_stream.window |
1 | province_count> (安徽省,2) |
AggregateFunction
AggregateFunction是一个普通版本ReduceFunction,其具有三种类型:输入类型(IN),累加器类型(ACC),和一个输出类型(OUT)。输入类型是输入流中数据的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法。该接口还具有创建初始累加器,将两个累加器合并为一个累加器以及OUT从累加器提取输出(类型)的方法。在下面的示例中,我们将了解其工作原理。与ReduceFunction一样,Flink将在窗口输入元素到达时逐步增量的聚合它们。
一个AggregateFunction可以被这样定义并使用:
1 | /** |
利用sliding window举例
aggregate()方法中不能像reduce(function: (T, T) => T)一样使用匿名函数,而是需要自定义继承了AggregateFunction的对象。
1 | package data_stream.window |
1 | channel_device:1> ((app,Samsung),27) |
ProcessWindowFunction
ProcessWindowFunction是处理窗口函数中,底层的处理数据API,获取一个Iterable,该Iterable包含窗口的所有元素,以及一个Context对象,该对象可以访问时间和状态信息,从而使其比其他窗口函数更具灵活性。这是以性能和资源消耗为代价的,因为无法增量聚合元素,而是需要在内部对其进行缓冲,直到将窗口视为已准备好进行处理为止。
ProcessWindowFunctionlook 的抽象类如下:
1 | abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { |
Sliding Window 举例
1 | package data_stream.window |
结果打印
1 | =========2018-05-17 19:29:15============= |