Flink 程序是实现了分布式集合转换(例如过滤、映射、更新状态、join、分组、定义窗口、聚合)的规范化程序。集合初始创建自 source(例如读取文件、kafka 主题,或本地内存中的集合)。结果通过 sink 返回,例如,它可以将数据写入(分布式)文件,或标准输出(例如命令行终端)。 Flink 程序可以在多种环境中运行,独立运行或嵌入到其他程序中。可以在本地 JVM 中执行,也可以在多台机器的集群上执行。 针对有界和无界两种数据 source 类型,你可以使用 DataSet API 来编写批处理程序或使用 DataStream API 来编写流处理程序。本篇指南将介绍这两种 API 通用的基本概念,关于使用 API 编写程序的具体信息请查阅 流处理指南 和 批处理指南 。 请注意:** 当展示如何使用 API 的实际示例时我们使用 StreamingExecutionEnvironment
和 DataStream API
。对于批处理,将他们替换为 ExecutionEnvironment
和 DataSet API
即可,概念是完全相同的。
DataSet 和 DataStream Flink 用特有的 DataSet
和 DataStream
类来表示程序中的数据。你可以将他们视为可能包含重复项的不可变数据集合。对于 DataSet
,数据是有限的 ,而对于 DataStream
,元素的数量可以是无限的。 这些集合与标准的 Java 集合有一些关键的区别。首先它们是不可变的,也就是说它们一旦被创建你就不能添加或删除元素了。你也不能简单地检查它们内部的元素。 在 Flink 程序中,集合最初通过添加数据 source 来创建,通过使用诸如 map
、filter
等 API 方法对数据 source 进行转换从而派生新的集合。
剖析一个 Flink 程序 Flink 程序看起来像是转换数据集合的规范化程序。每个程序由一些基本的部分组成:
获取执行环境, 加载/创建初始数据, 指定对数据的转换操作, 指定计算结果存放的位置, 触发程序执行 我们现在将概述每个步骤,详细信息请参阅相应章节。请注意所有 Scala DataSet API 可以在这个包 org.apache.flink.api.scala 中找到,同时,所有 Scala DataStream API 可以在这个包 org.apache.flink.streaming.api.scala 中找到。StreamExecutionEnvironment
是所有 Flink 程序的基础。你可以使用它的这些静态方法获取:
1 2 3 getExecutionEnvironment() createLocalEnvironment() createRemoteEnvironment(host: String , port: Int , jarFiles: String *)
通常你只需要使用 getExecutionEnvironment()
,因为它会根据上下文环境完成正确的工作,如果你在 IDE 中执行程序或者作为标准的 Java 程序来执行,它会创建你的本机执行环境。如果你将程序封装成 JAR 包,然后通过命令行 调用,Flink 集群管理器会执行你的 main 方法并且 getExecutionEnvironment()
会返回在集群上执行程序的执行环境。 针对不同的数据 source,执行环境有若干不同的读取文件的方法:你可以逐行读取 CSV 文件,或者使用完全自定义的输入格式。要将文本文件作为一系列行读取,你可以使用:
1 2 val env = StreamExecutionEnvironment .getExecutionEnvironment()val text: DataStream [String ] = env.readTextFile("file:///path/to/file" )
如此你会得到一个 DataStream 然后对其应用转换操作从而创建新的派生 DataStream。 通过调用 DataStream 的转换函数来进行转换。下面是一个映射转换的实例:
1 2 val input: DataSet[String] = ... val mapped = input.map { x => x.toInt }
这会通过把原始数据集合的每个字符串转换为一个整数创建一个新的 DataStream。 一旦你得到了包含最终结果的 DataStream,就可以通过创建 sink 将其写入外部系统。如下是一些创建 sink 的示例:
1 2 writeAsText(path: String ) print()
当设定好整个程序以后你需要调用 StreamExecutionEnvironment
的 execute()
方法触发程序执行 。至于在你的本机触发还是提交到集群运行取决于 ExecutionEnvironment
的类型。execute()
方法返回 JobExecutionResult
,它包括执行耗时和一个累加器的结果。 如果你不需要等待作业的结束,只是想要触发程序执行,你可以调用 StreamExecutionEnvironment
的 executeAsync()
方法。这个方法将返回一个 JobClient
对象,通过 JobClient
能够与程序对应的作业进行交互。作为例子,这里介绍通过 executeAsync()
实现与 execute()
相同行为的方法。
1 2 final JobClient jobClient = env.executeAsync();final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
有关流数据的 source 和 sink 以及有关 DataStream 支持的转换操作的详细信息请参阅流处理指南 。 有关批数据的 source 和 sink 以及有关 DataSet 支持的转换操作的详细信息请参阅批处理指南 。
延迟计算 无论在本地还是集群执行,所有的 Flink 程序都是延迟执行的:当程序的 main 方法被执行时,并不立即执行数据的加载和转换,而是创建每个操作并将其加入到程序的执行计划中。当执行环境调用 execute()
方法显式地触发执行的时候才真正执行各个操作。 延迟计算允许你构建复杂的程序,Flink 将其作为整体计划单元来执行。
指定键 一些转换操作(join, coGroup, keyBy, groupBy)要求在元素集合上定义键。另外一些转换操作 (Reduce, GroupReduce, Aggregate, Windows)允许在应用这些转换之前将数据按键分组。 如下对 DataSet 分组
1 2 3 4 DataSet<...> input = DataSet<...> reduced = input .groupBy() .reduceGroup();
如下对 DataStream 指定键
1 2 3 4 DataStream<...> input = DataStream<...> windowed = input .keyBy() .window();
Flink 的数据模型不是基于键值对的。因此你不需要将数据集类型物理地打包到键和值中。键都是“虚拟的”:它们的功能是指导分组算子用哪些数据来分组。 请注意:*下面的讨论中我们将以 * DataStream
** 和 ** keyby
** 为例。 对于 DataSet API 你只需要用 ** DataSet
** 和 ** groupBy
** 替换即可。 **
为 Tuple 定义键 最简单的方式是按照 Tuple 的一个或多个字段进行分组:
1 2 val input: DataStream [(Int , String , Long )] = val keyed = input.keyBy(0 )
按照第一个字段(整型字段)对 Tuple 分组。
1 2 val input: DataSet [(Int , String , Long )] = val grouped = input.groupBy(0 ,1 )
这里我们用第一个字段和第二个字段组成的组合键对 Tuple 分组 对于嵌套 Tuple 请注意: 如果你的 DataStream 是嵌套 Tuple,例如:
1 DataStream <Tuple3 <Tuple2 <Integer , Float >,String ,Long >> ds;
指定 keyBy(0)
将导致系统使用整个 Tuple2
作为键(一个整数和一个浮点数)。 如果你想“进入”到 Tuple2
的内部,你必须使用如下所述的字段表达式键。
使用字段表达式定义键 可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组、排序、join 或 coGrouping 的键。 字段表达式可以很容易地选取复合(嵌套)类型中的字段,例如 Tuple 和 POJO 类型。 下例中,我们有一个包含“word”和“count”两个字段的 POJO:WC
。要用 word
字段分组,我们只需要把它的名字传给 keyBy()
函数即可。
1 2 3 4 5 6 7 8 9 10 class WC (var word: String , var count: Int ) { def this () { this ("" , 0 L) } } val words: DataStream [WC ] = val wordCounts = words.keyBy("word" ).window()case class WC (word: String , count: Int )val words : DataStream [WC ] = val wordCounts = words.keyBy("word" ).window()
字段表达式语法**:
根据字段名称选择 POJO 的字段。例如 “user”
就是指 POJO 类型的“user”字段。 根据 1 开始的字段名称或 0 开始的字段索引选择 Tuple 的字段。例如 “_1”
和 “5”
分别指 Java Tuple 类型的第一个和第六个字段。 可以选择 POJO 和 Tuple 的嵌套字段。 例如,一个 POJO 类型有一个“user”字段还是一个 POJO 类型,那么 “user.zip”
即指这个“user”字段的“zip”字段。任意嵌套和混合的 POJO 和 Tuple都是支持的,例如 “_2.user.zip”
或 “user._4.1.zip”
。 可以使用 "*"
通配符表达式选择完整的类型。这也适用于非 Tuple 或 POJO 类型。 字段表达式示例**:
1 2 3 4 5 6 7 8 9 10 class WC (var complex: ComplexNestedClass , var count: Int ) { def this () { this (null , 0 ) } } class ComplexNestedClass ( var someNumber: Int , someFloat: Float , word: (Long , Long , String ), hadoopCitizen : IntWritable ) { def this () { this (0 , 0 , (0 , 0 , "" ), new IntWritable (0 )) } }
这些字段表达式对于以上代码示例都是合法的:
"count"
:WC
类的 count 字段。"complex"
:递归选择 POJO 类型 ComplexNestedClass
的 complex 字段的全部字段。"complex.word._3"
:选择嵌套 Tuple3
类型的最后一个字段。"complex.hadoopCitizen"
:选择 hadoop 的 IntWritable
类型。使用键选择器函数定义键 定义键的另一种方法是“键选择器”函数。键选择器函数将单个元素作为输入并返回元素的键。键可以是任意类型,并且可以由确定性计算得出。 下例展示了一个简单返回对象字段的键选择器函数:1 2 3 4 // 普通的 case class case class WC(word: String, count: Int) val words: DataStream[WC] = // [...] val keyed = words.keyBy( _.word )
指定转换函数 大多数转换操作需要用户定义函数。本节列举了指定它们的不同方法。
Lambda 函数 正如前面的例子中所见,所有操作都接受 lambda 函数来描述操作:
1 2 val data: DataSet [String ] = data.filter { _.startsWith("http://" ) }
1 2 3 4 val data: DataSet [Int ] = data.reduce { (i1,i2) => i1 + i2 } data.reduce { _ + _ }
富函数 所有需要用户定义函数的转换操作都可以将富 函数作为参数。例如,对于
1 data.map { x => x.toInt }
你可以替换成
1 2 3 class MyMapFunction extends RichMapFunction [String , Int ] { def map (in: String ):Int = { in.toInt } };
并像往常一样将函数传递给 map
转换操作:
1 data.map(new MyMapFunction ())
富函数也可以被定义为匿名类:
1 2 3 data.map (new RichMapFunction[String, Int] { def map(in: String):Int = { in.toInt } })
富函数为用户定义函数(map、reduce 等)额外提供了 4 个方法: open
、close
、getRuntimeContext
和 setRuntimeContext
。 这些方法有助于向函数传参(请参阅 向函数传递参数 )、 创建和终止本地状态、访问广播变量(请参阅 广播变量 )、访问诸如累加器和计数器等运行时信息(请参阅 累加器和计数器 )和迭代信息(请参阅 迭代 )。
支持的数据类型 Flink 对于 DataSet 或 DataStream 中可以包含的元素类型做了一些限制。这么做是为了使系统能够分析类型以确定有效的执行策略。 有七种不同的数据类型:
Java Tuple 和 Scala Case Class Java POJO 基本数据类型 常规的类 值 Hadoop Writable 特殊类型 Tuple 和 Case Class Scala Case Class(以及作为 Case Class 的特例的 Scala Tuple)是复合类型,包含固定数量的各种类型的字段。Tuple 的字段从 1 开始索引。例如 _1
指第一个字段。Case Class 字段用名称索引。 1 2 3 4 5 6 7 8 9 case class WordCount (word: String , count: Int )val input = env.fromElements( WordCount ("hello" , 1 ), WordCount ("world" , 2 )) input.keyBy("word" ) val input2 = env.fromElements(("hello" , 1 ), ("world" , 2 )) input2.keyBy(0 , 1 )
POJO Flink 将满足如下条件的 Java 和 Scala 的类作为特殊的 POJO 数据类型处理:类必须是公有的。 它必须有一个公有的无参构造器(默认构造器)。 所有的字段要么是公有的要么必须可以通过 getter 和 setter 函数访问。例如一个名为 foo
的字段,它的 getter 和 setter 方法必须命名为 getFoo()
和 setFoo()
。 字段的类型必须被已注册的序列化程序所支持。 POJO 通常用 PojoTypeInfo
表示,并使用 PojoSerializer
(Kryo 作为可配置的备用序列化器)序列化。 例外情况是 POJO 是 Avro 类型(Avro 指定的记录)或作为“Avro 反射类型”生成时。 在这种情况下 POJO 由 AvroTypeInfo
表示,并且由 AvroSerializer
序列化。 如果需要,你可以注册自己的序列化器;更多信息请参阅 序列化 。 Flink 分析 POJO 类型的结构,也就是说,它会推断出 POJO 的字段。因此,POJO 类型比常规类型更易于使用。此外,Flink 可以比一般类型更高效地处理 POJO。 下例展示了一个拥有两个公有字段的简单 POJO。
1 2 3 4 5 6 7 8 9 class WordWithCount (var word: String , var count: Int ) { def this () { this (null , -1 ) } } val input = env.fromElements( new WordWithCount ("hello" , 1 ), new WordWithCount ("world" , 2 )) input.keyBy("word" )
基本数据类型 Flink 支持所有 Java 和 Scala 的基本数据类型如 Integer
、 String
、和 Double
。
常规的类 Flink 支持大部分 Java 和 Scala 的类(API 和自定义)。 除了包含无法序列化的字段的类,如文件指针,I / O流或其他本地资源。遵循 Java Beans 约定的类通常可以很好地工作。 Flink 对于所有未识别为 POJO 类型的类(请参阅上面对于的 POJO 要求)都作为常规类处理。 Flink 将这些数据类型视为黑盒,并且无法访问其内容(为了诸如高效排序等目的)。常规类使用 Kryo 序列化框架进行序列化和反序列化。
值 值 类型手工描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过实现org.apache.flinktypes.Value
接口的 read
和 write
方法来为这些操作提供自定义编码。当通用序列化效率非常低时,使用值类型是合理的。例如,用数组实现稀疏向量。已知数组大部分元素为零,就可以对非零元素使用特殊编码,而通用序列化只会简单地将所有数组元素都写入。org.apache.flinktypes.CopyableValue
接口以类似的方式支持内部手工克隆逻辑。 Flink 有与基本数据类型对应的预定义值类型。(ByteValue
、 ShortValue
、 IntValue
、LongValue
、 FloatValue
、DoubleValue
、 StringValue
、CharValue
、 BooleanValue
)。这些值类型充当基本数据类型的可变变体:它们的值可以改变,允许程序员重用对象并减轻垃圾回收器的压力。
Hadoop Writable 可以使用实现了 org.apache.hadoop.Writable
接口的类型。它们会使用 write()
和 readFields()
方法中定义的序列化逻辑。
特殊类型 可以使用特殊类型,包括 Scala 的 Either
、Option
和 Try
。 Java API 有对 Either
的自定义实现。 类似于 Scala 的 Either
,它表示一个具有 Left 或 Right 两种可能类型的值。 Either
可用于错误处理或需要输出两种不同类型记录的算子。
类型擦除和类型推断 _ Java 编译器在编译后抛弃了大量泛型类型信息。这在 Java 中被称作 _类型擦除_。它意味着在运行时,对象的实例已经不知道它的泛型类型了。例如 DataStream<String>
和 DataStream<Long>
的实例在 JVM 看来是一样的。 Flink 在准备程序执行时(程序的 main 方法被调用时)需要类型信息。Flink Java API 尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和算子中。你可以通过 DataStream.getType()
获取数据类型。此方法返回 TypeInformation
的一个实例,这是 Flink 内部表示类型的方式。 类型推断有其局限性,在某些情况下需要程序员的“配合”。 这方面的示例是从集合创建数据集的方法,例如 ExecutionEnvironment.fromCollection()
,你可以在这里传递一个描述类型的参数。 像MapFunction<I, O>
这样的泛型函数同样可能需要额外的类型信息。 可以通过输入格式和函数实现 ResultTypeQueryable 接口,以明确告知 API 其返回类型。 被调函数的_输入类型_通常可以通过先前操作的结果类型来推断。
累加器和计数器 累加器简单地由 加法操作 和 最终累加结果 构成,可在作业结束后使用。 最简单的累加器是一个 计数器 :你可以使用 Accumulator.add(V value)
方法递增它。作业结束时 Flink 会合计(合并)所有的部分结果并发送给客户端。累加器在 debug 或者你想快速了解数据的时候非常有用。 Flink 目前有如下 内置累加器 。它们每一个都实现了 Accumulator 接口。
如何使用累加器:** 首先你必须在要使用它的用户定义转换函数中创建累加器对象(下例为计数器)。
1 private IntCounter numLines = new IntCounter ();
其次,你必须注册累加器对象,通常在富函数的 open()
方法中。在这里你还可以定义名称。
1 getRuntimeContext().addAccumulator("num-lines" , this .numLines);
你现在可以在算子函数中的任何位置使用累加器,包括 open()
和 close()
方法。
总体结果将存储在 JobExecutionResult
对象中,该对象是从执行环境的 execute()
方法返回的 (目前这仅在执行等待作业完成时才有效)。
1 myJobExecutionResult.getAccumulatorResult("num-lines")
每个作业的所有累加器共享一个命名空间。 这样你就可以在作业的不同算子函数中使用相同的累加器。Flink 会在内部合并所有同名累加器。 关于累加器和迭代请注意: 目前,累加器的结果只有在整个作业结束以后才可用。我们还计划实现在下一次迭代中使前一次迭代的结果可用。你可以使用 Aggregators 计算每次迭代的统计信息,并根据这些信息确定迭代何时终止。自定义累加器: 要实现你自己的累加器,只需编写累加器接口的实现即可。如果你认为 Flink 应该提供你的自定义累加器,请创建 pull request。 你可以选择实现 Accumulator 或者 SimpleAccumulator 。Accumulator<V,R>
最灵活:它为要递增的值定义类型 V
,为最终结果定义类型 R
。例如对于 histogram,V
是数字而 R
是 histogram。SimpleAccumulator
则适用于两个类型相同的情况,例如计数器。
Scala API扩展 为了在Scala和Java API之间保持相当程度的一致性,用于批处理和流传输的标准API省略了一些允许在Scala中进行高水平表达的功能。 如果您想_享受完整的Scala体验_,则可以选择加入通过隐式转换来增强Scala API的扩展。 要使用所有可用的扩展,您只需import
为DataSet API 添加一个简单的
1 import org.apache.flink.api.scala.extensions._
或DataStream API
1 import org.apache.flink.streaming.api.scala.extensions._
另外,您也可以导入单个扩展_一个点菜_只使用那些你喜欢。 通常,DataSet和DataStream API都不接受匿名模式匹配函数来解构元组,案例类或集合,如下所示:
1 2 3 4 5 6 val data: DataSet [(Int , String , Double )] = data.map { case (id, name, temperature) => }
此扩展在DataSet和DataStream Scala API中引入了新方法,这些新方法在扩展API中具有一对一的对应关系。这些委托方法确实支持匿名模式匹配功能。
DataSet API Method Original Example mapWith map (DataSet) ``` data.mapWith { case (_, value) => value.toString } 1 2 | | **mapPartitionWith** | **mapPartition (DataSet)** |
data.mapPartitionWith { case head #:: _ => head } 1 2 | | **flatMapWith** | **flatMap (DataSet)** |
data.flatMapWith { case (_, name, visitTimes) => visitTimes.map(name -> _) } 1 2 | | **filterWith** | **filter (DataSet)** |
data.filterWith { case Train(_, isOnTime) => isOnTime } 1 2 | | **reduceWith** | **reduce (DataSet, GroupedDataSet)** |
data.reduceWith { case ((, amount1), ( , amount2)) => amount1 + amount2 } 1 2 | | **reduceGroupWith** | **reduceGroup (GroupedDataSet)** |
data.reduceGroupWith { case id #:: value #:: _ => id -> value } 1 2 | | **groupingBy** | **groupBy (DataSet)** |
data.groupingBy { case (id, _, _) => id } 1 2 | | **sortGroupWith** | **sortGroup (GroupedDataSet)** |
grouped.sortGroupWith(Order.ASCENDING) { case House(_, value) => value } 1 2 | | **combineGroupWith** | **combineGroup (GroupedDataSet)** |
grouped.combineGroupWith { case header #:: amounts => amounts.sum } 1 2 | | **projecting** | **apply (JoinDataSet, CrossDataSet)** |
data1.join(data2). whereClause(case (pk, _) => pk). isEqualTo(case (_, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products } data1.cross(data2).projecting { case ((a, ), ( , b) => a -> b } 1 2 | | **projecting** | **apply (CoGroupDataSet)** |
data1.coGroup(data2). whereClause(case (pk, _) => pk). isEqualTo(case (_, fk) => fk). projecting { case (head1 #:: _, head2 #:: _) => head1 -> head2 } } 1 2 3 4 5 6 7 8 9 | <a name="datastream-api"></a> #### <a name="n1Myp"></a> #### DataStream API | Method | Original | Example | | :--- | :--- | :---: | | **mapWith** | **map (DataStream)** |
data.mapWith { case (_, value) => value.toString } 1 2 | | **flatMapWith** | **flatMap (DataStream)** |
data.flatMapWith { case (_, name, visits) => visits.map(name -> _) } 1 2 | | **filterWith** | **filter (DataStream)** |
data.filterWith { case Train(_, isOnTime) => isOnTime } 1 2 | | **keyingBy** | **keyBy (DataStream)** |
data.keyingBy { case (id, _, _) => id } 1 2 | | **mapWith** | **map (ConnectedDataStream)** |
data.mapWith( map1 = case (_, value) => value.toString, map2 = case (_, _, value, _) => value + 1 ) 1 2 | | **flatMapWith** | **flatMap (ConnectedDataStream)** |
data.flatMapWith( flatMap1 = case (_, json) => parse(json), flatMap2 = case (_, _, json, _) => parse(json) ) 1 2 | | **keyingBy** | **keyBy (ConnectedDataStream)** |
data.keyingBy( key1 = case (_, timestamp) => timestamp, key2 = case (id, _, _) => id ) 1 2 | | **reduceWith** | **reduce (KeyedStream, WindowedStream)** |
data.reduceWith { case ((, sum1), ( , sum2) => sum1 + sum2 } 1 2 | | **foldWith** | **fold (KeyedStream, WindowedStream)** |
data.foldWith(User(bought = 0)) { case (User(b), (_, items)) => User(b + items.size) } 1 2 | | **applyWith** | **apply (WindowedStream)** |
data.applyWith(0)( foldFunction = case (sum, amount) => sum + amount windowFunction = case (k, w, sum) => // […] ) 1 2 | | **projecting** | **apply (JoinedStream)** |
data1.join(data2). whereClause(case (pk, _) => pk). isEqualTo(case (_, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products } 1 2 3 4 5 6 7 | <br />有关每种方法的语义的更多信息,请参考 [DataSet](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/batch/index.html)和[DataStream](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html) API文档。<br /> <br />要专门使用此扩展,可以添加以下内容`import`: ```scala import org.apache.flink.api.scala.extensions.acceptPartialFunctions
用于数据集扩展和
1 import org.apache.flink.streaming.api.scala.extensions.acceptPartialFunctions
以下代码片段显示了如何一起使用这些扩展方法(与DataSet API一起使用)的最小示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 object Main { import org.apache.flink.api.scala.extensions._ case class Point (x: Double , y: Double ) def main (args: Array [String ] ) : Unit = { val env = ExecutionEnvironment .getExecutionEnvironment val ds = env.fromElements(Point (1 , 2 ), Point (3 , 4 ), Point (5 , 6 )) ds.filterWith { case Point (x, _) => x > 1 }.reduceWith { case (Point (x1, y1), (Point (x2, y2))) => Point (x1 + y1, x2 + y2) }.mapWith { case Point (x, y) => (x, y) }.flatMapWith { case (x, y) => Seq ("x" -> x, "y" -> y) }.groupingBy { case (id, value) => id } } }