Flink window窗口操作

前言


Flink中的window窗口概念和Spark Streaming中的window是一样的。对一直流动的数据划定一个固定的窗口。


以交通传感器为例,该传感器每15秒统计通过某个位置的车辆数量。结果流看起来像:





如果您想知道有多少辆车经过该位置,您只需将各个计数相加即可。但是,传感器流的本质是它连续产生数据。这样的流永远不会结束,并且不可能计算可以返回的最终和。相反,可以计算滚动总和,即为每个输入事件返回更新的总和记录。这将产生新的部分和流。





但是,部分求和流可能不是我们想要的,因为它会不断更新计数,更重要的是,某些信息(例如随时间变化)会丢失。因此,我们可能想改一下我们的问题,并询问每分钟通过该位置的汽车数量。这要求我们将流的元素分组为有限的集合,每个集合对应于60秒。此操作称为_滚动Windows_操作。(不重叠)





滚动窗口将流离散化为不重叠的窗口。对于某些应用程序,重要的是窗口不可分离,因为应用程序可能需要平滑的聚合。例如,我们可以每30秒计算最后一分钟通过的汽车数量。这种窗户称为_滑动窗户_。(重叠)





如前所述,在数据流上定义窗口是非并行操作。这是因为流的每个元素必须由决定该元素应添加到哪个窗口的同一窗口运算符处理。完整流中的Windows 在Flink 中称为_AllWindows_。对于许多应用程序,数据流需要分组为多个逻辑流,每个逻辑流都可以应用窗口运算符。


例如,考虑来自多个交通传感器(而不是像前面的示例中的一个传感器)的车辆计数流,其中每个传感器监视一个不同的位置。通过按传感器ID对流进行分组,我们可以并行计算每个位置的窗口流量统计信息。在Flink中,我们称此类分区窗口为_Windows_,因为它们是分布式流的常见情况。下图显示了翻转窗口,该窗口收集了(sensorId, count)成对元素流中的两个元素。





一般来说,窗口在无界流上定义了一组有限的元素。该集合可以基于时间(如我们前面的示例中所示),元素计数,计数和时间的组合,或一些将元素分配给窗口的自定义逻辑。Flink的DataStream API为最常见的窗口操作提供了简洁的运算符,并提供了一种通用的窗口机制,该机制允许用户定义非常自定义的窗口逻辑。在下面的内容中,我们将介绍Flink的时间和计数窗口,然后再详细讨论其窗口机制。

Window



Windows是处理无限流的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用计算。


Flink window窗口程序的一般结构如下所示。第一段指的是KeyedStream[T]流,第二段指的是non-keyed**DataStream[T]流。以下两段代码对比,唯一的区别是数据流stream经过keyBy(…)后,需要调用window(…)来创建窗口,而non-keyed stream则需要调用windowAll(…)创建窗口。


Keyed window**

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"


Non-Keyed Windows**

1
2
3
4
5
6
7
8
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"


在上面,方括号([…])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,从而使其最适合您的需求。

window生命周期


简而言之,一旦应属于该窗口的第一个元素到达,就会创建一个窗口,并且当时间(event or processing time)超过其结束时间戳加上用户指定的延迟时间,该窗口将被完全删除。Flink保证只删除基于时间(time-based windows)的窗口,而不保证其他类型(_例如 _global windows)的删除


例如,采用基于event time的窗口化策略,该策略每5分钟创建一次不重叠的滚动窗口,并允许延迟1分钟,因此Flink将会创建一个时间在12:00~12:05之间的串口,当带有时间戳的第一个元素落入此间隔时,当水印通过12:06 时间戳时,它将删除它。


此外,每个窗口将具有Trigger触发器 和一个函数(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction等窗口功能。该函数将包含要应用于窗口内容的计算,而则Trigger指定了在什么条件下可以将窗口视为要应用该函数的条件。触发策略可能类似于“当窗口中的元素数大于4时”或“当watermark水印通过窗口末尾时”。触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在这种情况下,清除仅是指窗口中的元素,而不是窗口元数据。这意味着仍可以将新数据添加到该窗口。


除上述内容外,您还可以指定一个Evictor(请参阅Evictors),它将在触发触发器后以及应用此功能之前和/或之后从窗口中删除元素。


在下文中,我们将对上述每个组件进行更详细的介绍。我们先从上面的代码片段中的必需部分开始(请参见Keyed vs Non- Keyed WindowsWindow AssignerWindow Function)。

Keyed vs Non-Keyed Windows



在我们接入流数据后,第一件事情就是我们要在定义window函数前,需要决定你的数据流是否需要进行 keyed。
如果我们使用了keyBy(),那么输入的数据流将会被切分成logical keyed streams,否则,将不会对数据流进行切分,

在使用 keyed streams的情况下,传入事件的任何属性都可以用作key。拥有keyed streams将使您的窗口化计算可以由多个任务并行执行,因为logical keyed streams都可以独立于其余logical keyed streams进行处理。引用同一键的所有元素将被发送到同一并行任务task中。


对于non-keyed streams,您的原始流将不会拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,并行度为1

Window Assigners窗口分配器


在决定是否要对数据流keyed后,下一步则是定义window assigners窗口分配器,window assigners定义了数据流元素怎样通过窗口,这完全取决于你的window assigners的选择,window()还是windowAll()

WindowAssigner负责将每个传入元素分配给一个或多个窗口。Flink带有针对最常见用例的预定义窗口分配器,即tumbling windows,滚动窗口, sliding windows滑动窗口,session windows会话窗口和global windows全局窗口 。您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。所有内置窗口分配器(global windows**全局窗口除外**)均基于时间将元素分配给窗口,时间可以是process time,也可以是event time。process time和event time之间的差异以及timestamps和watermarks的生成方式稍后再讲。


基于时间的窗口具有开始时间戳(包括端点)和结束时间戳(包括端点),它们共同描述了窗口的大小。在代码中,Flink在使用TimeWindow基于时间的窗口时使用,该方法具有查询开始和结束时间戳记的方法maxTimestamp(),还具有返回给定窗口允许的最大时间戳的附加方法。


在下面,我们展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配器的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度。


Tumbling Windows滚动窗口

滚动_窗口_分配器分配每个元素到指定_窗口大小_的窗口。滚动窗口具有固定的大小,并且不重叠。例如,如果您指定大小为5分钟的滚动窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口。


那么在滚动窗口上的实现,Spark Streming和Flink有明显的不同,最大的区别是spark streaming在一开始便指定了切分流数据的方式val ssc = new StreamingContext(conf, Seconds(10)) 即10秒切分一次,然后对这一批进行处理。


而Flink是后面自己通过Window算子timeWindow(Time.seconds(10))来指定每10秒作为一个数据窗口进行处理。
以上可以看出Spark Streaming的底层为micro batch处理方式,Flink为流处理方式。


Flink滚动窗口如下图所示。
tumbling-windows.svg
为了更好的演示Flink的各种窗口操作,我花了一点时间用python写了一个模拟实时产生用户数据的程序,随后将产生的数据写入kafka中,顺便提一下Faker这个python中的第三方库,模拟数据很好用,以前只在爬虫程序中,用过模拟浏览器信息。


Flink tumbling window 代码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input: DataStream[T] = ...

// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)

// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)

窗口函数,这里也可以使用timeWindow(Time.seconds(5)),时间间隔可以通过使用一个指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等等。


如最后一个示例所示,滚动窗口分配器还采用一个可选offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即您将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999依此类推。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等。一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定的偏移量Time.hours(-8)。

Sliding Windows_滑动窗口_

这个滑动窗口概念和Spark Streaming中的也是一样的,在Spark Streaming中是这样实现的

1
val window_dstream: DStream[(String, Int)] = wordcount_dstream.reduceByKeyAndWindow((x: Int,y: Int)=>x+y,Seconds(30), Seconds(10))

_滑动窗口_分配器让元素以固定长度的窗口通过。类似于滚动窗口分配器,_窗口的大小窗口大小_参数配置。附加的_窗口滑动_参数控制滑动窗口启动的频率。因此,如果滑动参数小于窗口大小,则滑动窗口会重叠。在这种情况下,元素被分配给多个窗口。


例如,您可以将大小为10分钟的窗口滑动5分钟。这样,您每隔5分钟就会得到一个窗口,其中包含最近10分钟内到达的事件,如下图所示。


sliding-windows.svg
sliding windows滑动窗口代码结构
**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)


窗口函数,这里也可以使用timeWindow(Time.seconds(10), Time.seconds(5)),时间间隔可以通过使用一个指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x),等等。


如最后一个示例所示,滑动窗口分配器还采用一个可选offset参数,该参数可用于更改窗口的对齐方式。例如,在没有偏移的情况下,每小时滑动30分钟的窗口将与epoch对齐,即您将获得诸如的窗口 1:00:00.000 - 1:59:59.999,1:30:00.000 - 2:29:59.999依此类推。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,1:45:00.000 - 2:44:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定的偏移量Time.hours(-8)。

Session Windows会话窗口


会话窗口中的元素是正在活跃的会话产生的数据。与滚动窗口和滑动窗口相比,会话窗口不重叠且没有固定的窗口开始和结束时间。相反,当会话窗口在一定时间段内未收到元素时(即不发生活动数据间隙时),它将关闭。会话窗口分配器配置静态会话间隔,其限定当会话不活动周期有多长。当此时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。


在现实中的意义就是,当数据处于连续活跃的状态下,并不划分窗口,但是当数据处于不活跃的状态,相邻数据的时间超过了设定的gap,则划定一个新窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
val input: DataStream[T] = ...

// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)

静态间隙可以通过使用中的一个来指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)。


动态间隙是通过实现SessionWindowTimeGapExtractor接口指定的。


由于会话窗口没有固定的开始和结束,因此对它们的评估不同于滚动窗口和滑动窗口。在内部,会话窗口运算符会为每个到达的记录创建一个新窗口,如果窗口彼此之间的距离比已定义的间隔小,则将它们合并在一起。为了可以将数据合并,会话窗口需要一个合并触发器以及合并窗口函数,如ReduceFunction,AggregateFunction,或ProcessWindowFunction (FoldFunction不能合并。)


接下来,所有的窗口函数操作的数据,都是基于下面我用python下的模拟生产用户数据。


python模拟数据代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import random
import time
from datetime import datetime
from faker import Faker
from kafka import KafkaProducer
import json

# 初始化
fake = Faker(locale='zh_CN')

producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=['localhost:9092']
)

"""生成随机时间(注册时间和最近登陆时间)"""
register_start = '2017-06-02 12:12:12'
register_end = '2017-11-01 00:00:00'

login_start = '2018-05-01 12:12:12'
login_end = '2018-12-01 00:00:00'

def strTimeProp(start, end, prop, frmt):
stime = time.mktime(time.strptime(start, frmt))
etime = time.mktime(time.strptime(end, frmt))
ptime = stime + prop * (etime - stime)
return int(ptime)

def register_randomDate(register_start, register_end, frmt='%Y-%m-%d %H:%M:%S'):
register_time = time.strftime(frmt, time.localtime(strTimeProp(register_start, register_end, random.random(), frmt)))
return register_time

def login_randomDate(login_start, login_end, frmt='%Y-%m-%d %H:%M:%S'):
login_time = time.strftime(frmt, time.localtime(strTimeProp(login_start, login_end, random.random(), frmt)))
return login_time

"""生成渠道,设备"""

def get_device():
web_devices = ['Chrome', '360_browser', 'QQ_browser', 'Firefox', 'Opera', 'UC_browser']
app_devices = ['xiaomi', 'iphone', 'Samsung', 'meizu', '1+', 'huawei','vivo', 'oppo']
pc_devices = ['Windows','Mac']
channels = {'web':web_devices, 'app': app_devices, 'pc': pc_devices}
channel = random.choice(list(channels.keys()))
device = random.choice(channels[channel])
return channel, device


def main():
sex_list = ['male', 'female']

while True:
channel, device = get_device()
register_date_time = register_randomDate(register_start, register_end, frmt='%Y-%m-%d %H:%M:%S')
last_login_time = login_randomDate(login_start, login_end, frmt='%Y-%m-%d %H:%M:%S')
real_name, user_name, age, sex, phone_number, province, email, ip_address, company, channel, device, register_date_time, last_login_time = fake.name(), fake.user_name(), random.choice(range(18,65)) ,random.choice(sex_list), fake.phone_number(), fake.province(), fake.ascii_free_email(), fake.ipv4(), fake.company(), channel, device, register_date_time, last_login_time
raw_data = {'real_name': real_name, 'user_name': user_name, 'age':age, 'sex':sex, 'phone_number':phone_number,'province':province, 'email':email, 'ip_address':ip_address, 'company':company, 'channel':channel, 'device':device, 'register_date_time':register_date_time, 'last_login_time':last_login_time}
print(raw_data)
producer.send('user_information', raw_data)
time.sleep(0.5)

if __name__ == '__main__':
main()

数据样式
屏幕快照 2020-05-23 下午2.09.25.png

ReduceFunction


ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用a ReduceFunction来逐步增量的聚合窗口元素。ReduceFunction的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Applies a reduce function to the window. The window function is called for each evaluation
* of the window for each key individually. The output of the reduce function is interpreted
* as a regular non-windowed stream.
*
* This window will try and pre-aggregate data as much as the window policies permit. For example,
* tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
* key is stored. Sliding time windows will pre-aggregate on the granularity of the slide
* interval, so a few elements are stored per key (one per slide interval).
* Custom windows may not be able to pre-aggregate, or may need to store extra values in an
* aggregation tree.
*
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
def reduce(function: ReduceFunction[T]): DataStream[T] = {
asScalaStream(javaStream.reduce(clean(function)))
}


利用tumbling window举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package data_stream.window

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import utils.KafkaUtil

/**
* reduce Function
* 增量累加
* 输入输出类型要一致
*/
object window_function_reduce {

case class UserLogData(real_name: String, user_name: String, age: Int, sex: String, phone_number: String,
province: String, email: String, ip_address: String, company: String, channel: String,
device: String, register_date_time: String, last_login_time: String)

private val KAFKA_TOPIC: String = "user_information"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource/tumbling"))

val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(KAFKA_TOPIC)

//计算窗口内5秒的数据
val original_stream: DataStream[String] = env.addSource(kafkaSource)
"""
|original_stream> UserLogData(纪淑兰,ehao,37,female,18529008264,辽宁省,apeng@gmail.com,110.75.179.140,易动力科技有限公司,app,xiaomi,2017-08-18 12:29:29,2018-09-02 05:05:56)
|original_stream> UserLogData(刘玉英,caojing,42,male,15647635274,香港特别行政区,houwei@hotmail.com,50.188.173.136,同兴万点传媒有限公司,web,Firefox,2017-10-27 12:20:20,2018-10-11 09:26:20)
|original_stream> UserLogData(薛萍,leizhong,32,male,18928404956,云南省,xxiao@yahoo.com,115.123.146.193,网新恒天传媒有限公司,web,360_browser,2017-06-17 16:42:24,2018-09-19 19:36:39)
""".stripMargin

/**
* 这里我们设定一个具体的需求,统计过去的5秒内,各省份访问的用户数量统计
* 下面第二次map操作=》(String, Int),是因为接下来要使用reduce函数
* reduce函数聚合是要求输入和输出格式相同的。
*
* 以下的转换步骤可以串联的写在一起,但是在这里分开写是想清晰的展示每一步转换后,Stream的类型变换
*/

val original_format_stream: DataStream[(String, Int)] = original_stream
.map(match_data(_: String))
.map((raw: UserLogData) =>(raw.province, 1))

val province_window_stream: WindowedStream[(String, Int), String, TimeWindow] = original_format_stream
.keyBy((_: (String, Int))._1)
.timeWindow(Time.seconds(5))

val result_stream: DataStream[(String, Int)] = province_window_stream
.reduce((p1: (String, Int), p2: (String, Int)) => (p1._1, p1._2 + p2._2))

result_stream.print("province_count").setParallelism(1)
env.execute("province count stream")

}
def match_data(original_str: String): UserLogData = {
val original_json: JSONObject = JSON.parseObject(original_str)
UserLogData(original_json.getString("real_name"), original_json.getString("user_name"), original_json.getInteger("age"),
original_json.getString("sex"), original_json.getString("phone_number"), original_json.getString("province"),
original_json.getString("email"), original_json.getString("ip_address"), original_json.getString("company"),
original_json.getString("channel"), original_json.getString("device"), original_json.getString("register_date_time"),
original_json.getString("last_login_time"))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
province_count> (安徽省,2)
province_count> (贵州省,1)
province_count> (江苏省,2)
province_count> (北京市,1)
province_count> (重庆市,1)
province_count> (湖南省,1)
province_count> (海南省,2)
province_count> (甘肃省,2)
province_count> (河南省,2)
province_count> (香港特别行政区,4)
province_count> (四川省,2)
province_count> (云南省,2)
province_count> (辽宁省,2)
province_count> (青海省,3)
province_count> (澳门特别行政区,1)
province_count> (天津市,1)
province_count> (新疆维吾尔自治区,1)
province_count> (河北省,2)
province_count> (吉林省,1)
province_count> (宁夏回族自治区,1)
province_count> (江西省,1)
province_count> (黑龙江省,1)
province_count> (山西省,5)
province_count> (西藏自治区,1)

AggregateFunction


AggregateFunction是一个普通版本ReduceFunction,其具有三种类型:输入类型(IN),累加器类型(ACC),和一个输出类型(OUT)。输入类型是输入流中数据的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法。该接口还具有创建初始累加器,将两个累加器合并为一个累加器以及OUT从累加器提取输出(类型)的方法。在下面的示例中,我们将了解其工作原理。与ReduceFunction一样,Flink将在窗口输入元素到达时逐步增量的聚合它们。


一个AggregateFunction可以被这样定义并使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)

override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)

override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)


利用sliding window举例


aggregate()方法中不能像reduce(function: (T, T) => T)一样使用匿名函数,而是需要自定义继承了AggregateFunction的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package data_stream.window

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.util.Collector
import utils.KafkaUtil

object window_function_aggregate {

/**
* 增量处理,来一条数据处理一条。
*/
class aggregate_channel_devices extends AggregateFunction[((String,String), Int),Long, Long]{
// 数据累加逻辑
override def add(in: ((String, String), Int), acc: Long): Long = acc + in._2
// 累加器初始化
override def createAccumulator(): Long = 0L
// 累加器计算的结果
override def getResult(acc: Long): Long = acc
// 不同分区之间,累加器的值相加
override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

/**
* 作用与窗口内,aggregate后数据的函数,窗口结束时运行
* 基于接口WindowFunction[IN, OUT, KEY, W <: Window]
* IN :输入参数,为 AggregateFunction的输出
* OUT:输出参数
*
*/
class channel_devices_windowFuction extends WindowFunction[Long, ((String, String), Long), (String, String), TimeWindow]{
override def apply(key: (String, String), window: TimeWindow, input: Iterable[Long], out: Collector[((String, String), Long)]): Unit = {
out.collect((key, input.iterator.next()))
}
}


case class UserLogData(real_name: String, user_name: String, age: Int, sex: String, phone_number: String,
province: String, email: String, ip_address: String, company: String, channel: String,
device: String, register_date_time: String, last_login_time: String)

private val KAFKA_TOPIC: String = "user_information"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource/tumbling"))

val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(KAFKA_TOPIC)

//计算窗口内5秒的数据
val original_stream: DataStream[String] = env.addSource(kafkaSource)

"""
|original_stream> UserLogData(纪淑兰,ehao,37,female,18529008264,辽宁省,apeng@gmail.com,110.75.179.140,易动力科技有限公司,app,xiaomi,2017-08-18 12:29:29,2018-09-02 05:05:56)
|original_stream> UserLogData(刘玉英,caojing,42,male,15647635274,香港特别行政区,houwei@hotmail.com,50.188.173.136,同兴万点传媒有限公司,web,Firefox,2017-10-27 12:20:20,2018-10-11 09:26:20)
|original_stream> UserLogData(薛萍,leizhong,32,male,18928404956,云南省,xxiao@yahoo.com,115.123.146.193,网新恒天传媒有限公司,web,360_browser,2017-06-17 16:42:24,2018-09-19 19:36:39)
""".stripMargin


/**
* 这里我们设定一个具体的需求,每5秒统计过去10秒中,用户渠道来源以及使用设备的排行
*/

val original_format_stream: DataStream[((String, String),Int)] = original_stream
.map(match_data(_: String))
.map((raw: UserLogData) => ((raw.channel, raw.device), 1))

original_format_stream
.keyBy((_: ((String, String), Int))._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(new aggregate_channel_devices, new channel_devices_windowFuction)
.print("channel_device")
env.execute("channel_device stream")

}

def match_data(original_str: String): UserLogData = {
val original_json: JSONObject = JSON.parseObject(original_str)
UserLogData(original_json.getString("real_name"), original_json.getString("user_name"), original_json.getInteger("age"),
original_json.getString("sex"), original_json.getString("phone_number"), original_json.getString("province"),
original_json.getString("email"), original_json.getString("ip_address"), original_json.getString("company"),
original_json.getString("channel"), original_json.getString("device"), original_json.getString("register_date_time"),
original_json.getString("last_login_time"))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
channel_device:1> ((app,Samsung),27)
channel_device:2> ((app,vivo),20)
channel_device:3> ((app,1+),19)
channel_device:8> ((pc,Mac),84)
channel_device:4> ((web,Chrome),26)
channel_device:6> ((app,meizu),22)
channel_device:4> ((app,oppo),21)
channel_device:8> ((pc,Windows),90)
channel_device:3> ((web,360_browser),25)
channel_device:1> ((web,QQ_browser),32)
channel_device:2> ((web,Opera),26)
channel_device:3> ((web,Firefox),34)
channel_device:8> ((web,UC_browser),25)
channel_device:3> ((app,huawei),19)

ProcessWindowFunction


ProcessWindowFunction是处理窗口函数中,底层的处理数据API,获取一个Iterable,该Iterable包含窗口的所有元素,以及一个Context对象,该对象可以访问时间和状态信息,从而使其比其他窗口函数更具灵活性。这是以性能和资源消耗为代价的,因为无法增量聚合元素,而是需要在内部对其进行缓冲,直到将窗口视为已准备好进行处理为止。


ProcessWindowFunctionlook 的抽象类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])

/**
* The context holding window metadata
*/
abstract class Context {
/**
* Returns the window that is being evaluated.
*/
def window: W

/**
* Returns the current processing time.
*/
def currentProcessingTime: Long

/**
* Returns the current event-time watermark.
*/
def currentWatermark: Long

/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore

/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
}

}


Sliding Window 举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package data_stream.window
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.util.Collector
import utils.KafkaUtil

object window_function_process {

case class UserLogData(real_name: String, user_name: String, age: Int, sex: String, phone_number: String,
province: String, email: String, ip_address: String, company: String, channel: String,
device: String, register_date_time: String, last_login_time: String)

private val KAFKA_TOPIC: String = "user_information"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource/tumbling"))

val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(KAFKA_TOPIC)

//计算窗口内5秒的数据
val original_stream: DataStream[String] = env.addSource(kafkaSource)

"""
|original_stream> UserLogData(纪淑兰,ehao,37,female,18529008264,辽宁省,apeng@gmail.com,110.75.179.140,易动力科技有限公司,app,xiaomi,2017-08-18 12:29:29,2018-09-02 05:05:56)
|original_stream> UserLogData(刘玉英,caojing,42,male,15647635274,香港特别行政区,houwei@hotmail.com,50.188.173.136,同兴万点传媒有限公司,web,Firefox,2017-10-27 12:20:20,2018-10-11 09:26:20)
|original_stream> UserLogData(薛萍,leizhong,32,male,18928404956,云南省,xxiao@yahoo.com,115.123.146.193,网新恒天传媒有限公司,web,360_browser,2017-06-17 16:42:24,2018-09-19 19:36:39)
""".stripMargin


/**
* 这里我们设定一个具体的需求,每5秒统计过去10秒中,用户渠道来源以及使用设备的排行
* 这里是使用ProcessWindowFunction底层函数来做处理
* 为了清楚的显示,在每个窗口上打印了时间。
*/

val original_format_stream: DataStream[((String, String), Int)] = original_stream
.map(match_data(_: String))
.map((raw: UserLogData) => ((raw.channel, raw.device), 1))


original_format_stream
.keyBy((_: ((String, String), Int))._1)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new ProcessWindowFunction[((String, String), Int), ((String, String), Long), (String, String), TimeWindow] {
override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Int)], out: Collector[((String, String), Long)]): Unit = {
val process_time: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date)
println(s"=========$process_time=============")
out.collect((key, elements.size))
}
}).print("processFunctionWindow")


env.execute("process function stream")

}


def match_data(original_str: String): UserLogData = {
val original_json: JSONObject = JSON.parseObject(original_str)
UserLogData(original_json.getString("real_name"), original_json.getString("user_name"), original_json.getInteger("age"),
original_json.getString("sex"), original_json.getString("phone_number"), original_json.getString("province"),
original_json.getString("email"), original_json.getString("ip_address"), original_json.getString("company"),
original_json.getString("channel"), original_json.getString("device"), original_json.getString("register_date_time"),
original_json.getString("last_login_time"))
}

}


结果打印

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,QQ_browser),76)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((pc,Mac),217)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,oppo),50)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,Samsung),58)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,huawei),54)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,UC_browser),54)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,Firefox),63)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,360_browser),66)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,meizu),56)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,xiaomi),53)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,vivo),49)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,iphone),61)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,Chrome),57)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((pc,Windows),230)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((web,Opera),73)
=========2018-05-17 19:29:15=============
processFunctionWindow> ((app,1+),50)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,UC_browser),55)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,Chrome),60)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,1+),50)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,huawei),54)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,QQ_browser),76)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,Samsung),58)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((pc,Windows),231)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,oppo),51)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,Opera),74)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,Firefox),64)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,vivo),49)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,xiaomi),53)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,meizu),57)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((pc,Mac),218)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((app,iphone),61)
=========2018-05-17 19:29:20=============
processFunctionWindow> ((web,360_browser),66)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((web,Opera),3)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((web,Chrome),3)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((app,meizu),1)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((web,Firefox),5)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((pc,Mac),2)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((app,oppo),1)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((pc,Windows),1)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((web,UC_browser),3)
=========2018-05-17 19:29:25=============
processFunctionWindow> ((app,huawei),1)