Flink apply()&process() 讲解

在处理流数据计算时,我们在对流数据使用了keyby()和window()后,需要对分组后的数据做分组处理,那么除了对分组数据直接做reduce(),aggregate()等聚合操作之外,还有另一种场景就是对分组后的数据,每一个key对应的Iterable做稍微复杂一点的数据计算或者数据的整合或者变换。


那么这里我们就可以使用apply()或者process()来实现更底层的计算逻辑。那么两者之间有什么区别呢?我们来看一下两者的源码:
屏幕快照 2020-06-03 上午12.31.12.png
从上面的图片中,我们可以看到,两个方法的注释居然是一样的。内部的实现则不同,分别调用了ScalaProcessWindowFunctionWrapper和ScalaWindowFunctionWrapper


屏幕快照 2020-06-03 上午12.35.04.png


image.png

我们来看ScalaProcessWindowFunctionWrapper,内部实现的process中,传入了context上下文,还有其他window相关有用的参数,从功能上来讲,process已经包含了apply能提供的功能,apply()是旧版的process(),并且没有提供上下文信息和其他window的高级功能,例如每个窗口的keyed state。apply()将在某个时间被弃用。process算子常见的有ProcessFunction和KeyedProcessFunction两种计算的方式,具体的实现可以看源码。

process和apply算子最大的区别在于process可以自己定时触发计算的定时器,在processElement方法定义定时器 context.timerService().registerEventTimeTimer(timestamp);


当定时器时间到达,会回调onTimer()方法的计算任务时器允许应用程序对processing time和event time的变化做出反应,每次对processElement()的调用都会得到一个Context对象,该对象允许访问元素事件时间的时间戳和TimeServer。TimeServer可以用来为尚未发生的event-time或者processing-time注册回调,当定时器的时间到达时,onTimer(…)方法会被调用。在这个调用期间,所有的状态都会限定到创建定时器的键,并允许定时器操纵键控状态(keyed states)。

apply算子和process算子
**
(1) window数据apply算子对应的fuction
WindowFunction(keyed window) 与 AllWindowFunction(no key window)


(2) window数据process算子对应的fuction
process算子可以获取到window窗口的Context信息 而 apply算子无法获取到
ProcessWindowFunction(keyed-window) 和 ProcessAllWindowFunction(no keyd window)
keyed-window 和 nokeyed-window


(3) 普通流数据,process算子,对应的fuction
KeyedProcessFunction: A keyed function that processes elements of a stream. (可实现定时任务)
ProcessFunction:A function that processes elements of a stream.(可实现定时任务)




下面我们用一个小实例来演示两种方法的使用


需求:求每个user,周一至周日,每天成绩的排序详情


数据格式:user001,1,95(用户ID,星期几,成绩)


程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package data_stream.function

import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer
import scala.collection.parallel.immutable

object keyby_datastream {


case class UserGrade(user_id: String, weekday: String, grade: Int)
case class UserGradeList(user_id: String, gradeList: List[Int])
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val socketStream: DataStream[String] = env.socketTextStream("localhost", 8888)

val userGradeStream: DataStream[UserGrade] = socketStream.map((data: String) => {
val user_array: Array[String] = data.split(",")
UserGrade(user_array(0), user_array(1), user_array(2).toInt)
})

val keyWondowStream: WindowedStream[UserGrade, String, TimeWindow] = userGradeStream.keyBy((_: UserGrade).user_id)
.timeWindow(Time.seconds(5))

// Base interface for functions that are evaluated over keyed (grouped) windows.
// trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable
// val resultStream: DataStream[(String, List[Int])] = keyWondowStream.apply(new WindowFunction[UserGrade, (String, List[Int]), String, TimeWindow] {
// override def apply(key: String, window: TimeWindow, input: Iterable[UserGrade], out: Collector[(String, List[Int])]): Unit = {
// var gradeList: ListBuffer[Int] = ListBuffer[Int]()
// for (data <- input) {
// gradeList += data.grade
// }
// out.collect((key,gradeList.toList.sorted))
// }
// })


// val resultStream: DataStream[String] = keyWondowStream.apply(new WindowFunction[UserGrade, String, String, TimeWindow] {
// override def apply(key: String, window: TimeWindow, input: Iterable[UserGrade], out: Collector[String]): Unit = {
// var gradeList: ListBuffer[Int] = ListBuffer[Int]()
// for (data <- input){
// gradeList+=data.grade
// }
// val result: String = key + "===" + gradeList.toList.sorted.toString
// out.collect(result)
// }
// })


val resultStream: DataStream[UserGradeList] = keyWondowStream.process(new ProcessWindowFunction[UserGrade, UserGradeList, String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[UserGrade], out: Collector[UserGradeList]): Unit = {
var gradeList: ListBuffer[Int] = ListBuffer[Int]()
for (data <- elements) {
gradeList += data.grade
}

out.collect(UserGradeList(key,gradeList.toList.sorted))
}
})

resultStream.print("===")
env.execute("process apply function")
}
}


控制台输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
user001,1,95
user001,2,200
user001,3,97
user001,4,189
user001,5,99
user001,6,100
user001,7,40
user002,1,1
user002,2,5
user002,3,2
user002,4,88
user002,5,4
user002,6,33
user002,7,22


结果打印:

1
2
===:3> UserGradeList(user001,List(40, 95, 97, 99, 100, 189, 200))
===:2> UserGradeList(user002,List(1, 2, 4, 5, 22, 33, 88))


这里有一点需要注意,我在程序中使用了scala可变集合ListBuffer,程序执行后,则报错Caused by: java.lang.NumberFormatException: Not a version: 9

后来Google得到,这是Flink版本和Scala版本之间引起的错误,这里要注意,原来的版本对应是Flink 1.10和Scala 2.11.8,后来Scala版本修改为2.11.12,这个问题就解决了。**