Flink apply()&process() 讲解 在处理流数据计算时,我们在对流数据使用了keyby()和window()后,需要对分组后的数据做分组处理,那么除了对分组数据直接做reduce(),aggregate()等聚合操作之外,还有另一种场景就是对分组后的数据,每一个key对应的Iterable做稍微复杂一点的数据计算或者数据的整合或者变换。 那么这里我们就可以使用apply()或者process()来实现更底层的计算逻辑。那么两者之间有什么区别呢?我们来看一下两者的源码: 从上面的图片中,我们可以看到,两个方法的注释居然是一样的。内部的实现则不同,分别调用了ScalaProcessWindowFunctionWrapper和ScalaWindowFunctionWrapper
我们来看ScalaProcessWindowFunctionWrapper,内部实现的process中,传入了context上下文,还有其他window相关有用的参数,从功能上来讲,process已经包含了apply能提供的功能,apply()是旧版的process(),并且没有提供上下文信息和其他window的高级功能,例如每个窗口的keyed state。apply()将在某个时间被弃用。process算子常见的有ProcessFunction和KeyedProcessFunction两种计算的方式,具体的实现可以看源码。
process和apply算子最大的区别在于process可以自己定时触发计算的定时器,在processElement方法定义定时器 context.timerService().registerEventTimeTimer(timestamp); 当定时器时间到达,会回调onTimer()方法的计算任务时器允许应用程序对processing time和event time的变化做出反应,每次对processElement()的调用都会得到一个Context对象,该对象允许访问元素事件时间的时间戳和TimeServer。TimeServer可以用来为尚未发生的event-time或者processing-time注册回调,当定时器的时间到达时,onTimer(…)方法会被调用。在这个调用期间,所有的状态都会限定到创建定时器的键,并允许定时器操纵键控状态(keyed states)。
apply算子和process算子 ** (1) window数据apply算子对应的fuction WindowFunction(keyed window) 与 AllWindowFunction(no key window) (2) window数据process算子对应的fuction process算子可以获取到window窗口的Context信息 而 apply算子无法获取到 ProcessWindowFunction(keyed-window) 和 ProcessAllWindowFunction(no keyd window) keyed-window 和 nokeyed-window (3) 普通流数据,process算子,对应的fuction KeyedProcessFunction: A keyed function that processes elements of a stream. (可实现定时任务) ProcessFunction:A function that processes elements of a stream.(可实现定时任务) 下面我们用一个小实例来演示两种方法的使用 需求:求每个user,周一至周日,每天成绩的排序详情 数据格式:user001,1,95(用户ID,星期几,成绩) 程序如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package data_stream.functionimport org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction , WindowFunction }import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment , WindowedStream }import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer import scala.collection.parallel.immutableobject keyby_datastream { case class UserGrade (user_id: String , weekday: String , grade: Int ) case class UserGradeList (user_id: String , gradeList: List [Int ] ) def main (args: Array [String ] ) : Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment val socketStream: DataStream [String ] = env.socketTextStream("localhost" , 8888 ) val userGradeStream: DataStream [UserGrade ] = socketStream.map((data: String ) => { val user_array: Array [String ] = data.split("," ) UserGrade (user_array(0 ), user_array(1 ), user_array(2 ).toInt) }) val keyWondowStream: WindowedStream [UserGrade , String , TimeWindow ] = userGradeStream.keyBy((_: UserGrade ).user_id) .timeWindow(Time .seconds(5 )) val resultStream: DataStream [UserGradeList ] = keyWondowStream.process(new ProcessWindowFunction [UserGrade , UserGradeList , String , TimeWindow ]{ override def process (key: String , context: Context , elements: Iterable [UserGrade ], out: Collector [UserGradeList ]): Unit = { var gradeList: ListBuffer [Int ] = ListBuffer [Int ]() for (data <- elements) { gradeList += data.grade } out.collect(UserGradeList (key,gradeList.toList.sorted)) } }) resultStream.print("===" ) env.execute("process apply function" ) } }
控制台输入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 user001,1,95 user001,2,200 user001,3,97 user001,4,189 user001,5,99 user001,6,100 user001,7,40 user002,1,1 user002,2,5 user002,3,2 user002,4,88 user002,5,4 user002,6,33 user002,7,22
结果打印:
1 2 ===:3> UserGradeList(user001,List(40, 95, 97, 99, 100, 189, 200)) ===:2> UserGradeList(user002,List(1, 2, 4, 5, 22, 33, 88))
这里有一点需要注意,我在程序中使用了scala可变集合ListBuffer,程序执行后,则报错Caused by: java.lang.NumberFormatException: Not a version: 9 后来Google得到,这是Flink版本和Scala版本之间引起的错误,这里要注意,原来的版本对应是Flink 1.10和Scala 2.11.8,后来Scala版本修改为2.11.12,这个问题就解决了。**