Flink DataStream API


Flink中的DataStream程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。


Flink程序可以在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。

范例程序

以下程序是流式窗口单词计数应用程序的一个完整的工作示例,该应用程序在5秒的窗口中对来自Web套接字的单词进行计数。您可以复制并粘贴代码以在本地运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print()
env.execute("Window Stream WordCount")
}
}


要运行示例程序,请首先从终端使用netcat启动输入流:

1
nc -lk 9999


只需输入一些单词,然后按回车键即可获得一个新单词。这些将作为单词计数程序的输入。如果您想看到计数大于1,请在5秒钟内一次又一次地键入相同的单词(如果您不能快速键入☺,则将窗口大小从5秒钟增加)。

数据源


源是程序读取其输入的位置。您可以使用将源附加到程序StreamExecutionEnvironment.addSource(sourceFunction)。Flink附带了许多预先实现的源函数,但是您始终可以通过实现SourceFunction for非并行源,实现ParallelSourceFunction接口或扩展 RichParallelSourceFunctionfor并行源来编写自己的自定义源。


可从以下位置访问几个预定义的流源StreamExecutionEnvironment:

基于文件

  • readTextFile(path)- TextInputFormat逐行读取文本文件,即符合规范的文件,并将其作为字符串返回。
  • readFile(fileInputFormat, path) -根据指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter)-这是前两个内部调用的方法。它path根据给定的读取文件fileInputFormat。根据提供的内容watchType,此源可以定期(每intervalms)监视路径中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理一次路径中当前的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步从文件中排除文件。


实施:在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。监视由单个非并行(并行度= 1)任务实现,而读取由并行运行的多个任务执行。后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(根据定期扫描或仅扫描一次watchType),找到要处理的文件,将它们分成多个部分,并将这些拆分分配给下游阅读器。读者将是阅读实际数据的人。每个拆分只能由一个阅读器读取,而阅读器可以一一阅读多个拆分。重要笔记:

  1. 如果将watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。这可能会破坏“完全一次”的语义,因为在文件末尾附加数据将导致重新处理其所有内容。
  2. 如果将watchType设置为FileProcessingMode.PROCESS_ONCE,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。

    基于套接字

  • socketTextStream-从套接字读取。元素可以由定界符分隔。

### 基于集合
  • fromCollection(Seq)-从Java Java.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。
  • fromCollection(Iterator)-从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
  • fromElements(elements: _*)-从给定的对象序列创建数据流。所有对象必须具有相同的类型。
  • fromParallelCollection(SplittableIterator)-从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) -并行生成给定间隔中的数字序列。

### 自定义
  • addSource-附加新的源功能。例如,要阅读Apache Kafka,可以使用 addSource(new FlinkKafkaConsumer08<>(…))。有关更多详细信息,请参见连接器


Flink 连接 kafka实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package data_stream

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase}

object wordcount_stream {

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))


val dataSource: FlinkKafkaConsumerBase[String] = new FlinkKafkaConsumer(
KAFKA_TOPIC,
new SimpleStringSchema(),
properties)
// .setStartFromEarliest() // 指定从最早offset开始消费
.setStartFromLatest() // 指定从最新offset开始消费

env.addSource(dataSource)
.flatMap(get_value(_: String).split(","))
.map((_: String, 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
// .setParallelism(1) 设置并行度
env.execute("Flink Streaming—————KafkaSource")
}

def get_value(string_data: String): String = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
keywordList
}
}

数据接收器


数据接收器使用DataStream,并将其转发到文件,套接字,外部系统或打印它们。Flink带有各种内置的输出格式,这些格式封装在DataStream的操作后面:

  • writeAsText()/ TextOutputFormat-将元素按行写为字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsCsv(…)/ CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前缀(msg),该前缀在输出之前。这可以帮助区分不同的打印调用。如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。
  • writeUsingOutputFormat()/ FileOutputFormat-自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket -根据以下内容将元素写入套接字 SerializationSchema
  • addSink-调用自定义接收器功能。Flink捆绑有连接到其他系统(例如Apache Kafka)的连接器,这些连接器实现为接收器功能。


请注意,上的write*()方法DataStream主要用于调试目的。它们不参与Flink的检查点,这意味着这些功能通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。同样,在失败的情况下,这些记录可能会丢失。
为了将流可靠,准确地一次传输到文件系统中,请使用flink-connector-filesystem。同样,通过该.addSink(…)方法的自定义实现可以参与Flink一次精确语义的检查点。

执行参数

在StreamExecutionEnvironment包含了ExecutionConfig允许用于运行组工作的具体配置值。
请参考执行配置 以获取大多数参数的说明。这些参数专门与DataStream API有关:

  • setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。您可以使用获取当前值long getAutoWatermarkInterval()

容错能力

状态和检查点描述了如何启用和配置Flink的检查点机制。

控制延迟

默认情况下,元素不会在网络上一对一传输(这会导致不必要的网络通信),但是会进行缓冲。缓冲区的大小(实际上是在计算机之间传输的)可以在Flink配置文件中设置。尽管此方法可以优化吞吐量,但是当传入流不够快时,它可能会导致延迟问题。为了控制吞吐量和延迟,您可以env.setBufferTimeout(timeoutMillis)在执行环境(或各个运算符)上使用来设置缓冲区填充的最大等待时间。在此时间之后,即使缓冲区未满,也会自动发送缓冲区。此超时的默认值为100毫秒。


用法:

1
2
3
val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)
env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,请设置set setBufferTimeout(-1)来消除超时,并且仅在缓冲区已满时才刷新它们。为了使延迟最小化,请将超时设置为接近0的值(例如5或10 ms)。应避免将缓冲区超时设置为0,因为它可能导致严重的性能下降。

调试

在分布式群集中运行流式程序之前,最好确保已实现的算法按预期工作。因此,实施数据分析程序通常是检查结果,调试和改进的增量过程。
Flink提供的功能可通过在IDE中支持本地调试,注入测试数据和收集结果数据来大大简化数据分析程序的开发过程。本节提供一些提示,说明如何简化Flink程序的开发。

本地执行环境


A LocalStreamEnvironment在创建该JVM的同一JVM进程内启动Flink系统。如果从IDE启动LocalEnvironment,则可以在代码中设置断点并轻松调试程序。
创建并按如下方式使用LocalEnvironment:

1
2
3
4
val env = StreamExecutionEnvironment.createLocalEnvironment()
val lines = env.addSource(/* some source */)
// build your program
env.execute()

收集数据源


Flink提供了由Java集合支持的特殊数据源,以简化测试。一旦测试了程序,就可以轻松地将源和接收器替换为可读取/写入外部系统的源和接收器。
收集数据源可以按如下方式使用:

1
2
3
4
5
6
7
8
9
val env = StreamExecutionEnvironment.createLocalEnvironment()
// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)
// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)
// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意:当前,集合数据源要求数据类型和迭代器实现 Serializable。此外,收集数据源不能并行执行(并行度= 1)。

迭代器数据接收器


Flink还提供接收器以收集DataStream结果以进行测试和调试。可以如下使用:

1
2
3
4
import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter
val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala


注意: flink-streaming-contrib模块已从Flink 1.5.0中删除。已移入flink-streaming-java和flink-streaming-scala。

DataStream API

Map

Map 接受一个元素作为输入,并且根据开发者自定义的逻辑处理后输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
//Map
SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
@Override
public Object map(MyStreamingSource.Item item) throws Exception {
return item.getName();
}
});
//打印结果
mapItems.print().setParallelism(1);
String jobName = "user defined streaming source";
env.execute(jobName);
}
}

我们只取出每个 Item 的 name 字段进行打印。

注意,Map 算子是最常用的算子之一,官网中的表述是对一个 DataStream 进行映射,每次进行转换都会调用 MapFunction 函数。从源 DataStream 到目标 DataStream 的转换过程中,返回的是 SingleOutputStreamOperator。当然了,我们也可以在重写的 map 函数中使用 lambda 表达式。

1
2
3
SingleOutputStreamOperator<Object> mapItems = items.map(
item -> item.getName()
);

甚至,还可以自定义自己的 Map 函数。通过重写 MapFunction 或 RichMapFunction 来自定义自己的 map 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class StreamingDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1);
SingleOutputStreamOperator<String> mapItems = items.map(new MyMapFunction());
//打印结果
mapItems.print().setParallelism(1);
String jobName = "user defined streaming source";
env.execute(jobName);
}
static class MyMapFunction extends RichMapFunction<MyStreamingSource.Item,String> {
@Override
public String map(MyStreamingSource.Item item) throws Exception {
return item.getName();
}
}
}

此外,在 RichMapFunction 中还提供了 open、close 等函数方法,重写这些方法还能实现更为复杂的功能,比如获取累加器、计数器等。

FlatMap

FlatMap 接受一个元素,返回零到多个元素。FlatMap 和 Map 有些类似,但是当返回值是列表的时候,FlatMap 会将列表“平铺”,也就是以单个元素的形式进行输出。

1
2
3
4
5
6
7
SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
@Override
public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
String name = item.getName();
collector.collect(name);
}
});

上面的程序会把名字逐个输出。我们也可以在 FlatMap 中实现更为复杂的逻辑,比如过滤掉一些我们不需要的数据等。

Filter

顾名思义,Fliter 的意思就是过滤掉不需要的数据,每个元素都会被 filter 函数处理,如果 filter 函数返回 true 则保留,否则丢弃。

例如,我们只保留 id 为偶数的那些 item。

1
2
3
4
5
6
SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
@Override
public boolean filter(MyStreamingSource.Item item) throws Exception {
return item.getId() % 2 == 0;
}
});


当然,我们也可以在 filter 中使用 lambda 表达式:

1
2
3
SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter( 
item -> item.getId() % 2 == 0
);

KeyBy

在介绍 KeyBy 函数之前,需要你理解一个概念:KeyedStream。 在实际业务中,我们经常会需要根据数据的某种属性或者单纯某个字段进行分组,然后对不同的组进行不同的处理。举个例子,当我们需要描述一个用户画像时,则需要根据用户的不同行为事件进行加权;再比如,我们在监控双十一的交易大盘时,则需要按照商品的品类进行分组,分别计算销售额。

我们在使用 KeyBy 函数时会把 DataStream 转换成为 KeyedStream,事实上 KeyedStream 继承了 DataStream,KeyedStream 中的元素会根据用户传入的参数进行分组。
我们在第 02 课时中讲解的 WordCount 程序,曾经使用过 KeyBy:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 将接收的数据进行拆分,分组,窗口计算并且进行聚合输出
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds
....

在生产环境中使用 KeyBy 函数时要十分注意!该函数会把数据按照用户指定的 key 进行分组,那么相同分组的数据会被分发到一个 subtask 上进行处理,在大数据量和 key 分布不均匀的时非常容易出现数据倾斜和反压,导致任务失败。

常见的解决方式是把所有数据加上随机前后缀,这些我们会在后面的课时中进行深入讲解。

Aggregations

Aggregations 为聚合函数的总称,常见的聚合函数包括但不限于 sum、max、min 等。Aggregations 也需要指定一个 key 进行聚合,官网给出了几个常见的例子:

1
2
3
4
5
6
7
8
9
10
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

在上面的这几个函数中,max、min、sum 会分别返回最大值、最小值和汇总值;而 minBy 和 maxBy 则会把最小或者最大的元素全部返回。
我们拿 max 和 maxBy 举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<MyStreamingSource.Item> items = env.fromCollection(data);
items.keyBy(0).max(2).printToErr();
//打印结果
String jobName = "user defined streaming source";
env.execute(jobName);

我们直接运行程序,会发现奇怪的一幕:

从上图中可以看到,我们希望按照 Tuple3 的第一个元素进行聚合,并且按照第三个元素取最大值。结果如我们所料,的确是按照第三个元素大小依次进行的打印,但是结果却出现了一个这样的元素 (0,1,2),这在我们的源数据中并不存在。
我们在 Flink 官网中的文档可以发现:

The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

文档中说:min 和 minBy 的区别在于,min 会返回我们制定字段的最大值,minBy 会返回对应的元素(max 和 maxBy 同理)
网上很多资料也这么写:min 和 minBy 的区别在于 min 返回最小的值,而 minBy 返回最小值的key,严格来说这是不正确的。
min 和 minBy 都会返回整个元素,只是 min 会根据用户指定的字段取最小值,并且把这个值保存在对应的位置,而对于其他的字段,并不能保证其数值正确。max 和 maxBy 同理。
事实上,对于 Aggregations 函数,Flink 帮助我们封装了状态数据,这些状态数据不会被清理,所以在实际生产环境中应该尽量避免在一个无限流上使用 Aggregations。而且,对于同一个 keyedStream ,只能调用一次 Aggregation 函数。

Reduce

Reduce 函数的原理是,会在每一个分组的 keyedStream 上生效,它会按照用户自定义的聚合逻辑进行分组聚合。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));
DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
//items.keyBy(0).max(2).printToErr();
SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> reduce = items.keyBy(0).reduce(new ReduceFunction<Tuple3<Integer, Integer, Integer>>() {
@Override
public Tuple3<Integer,Integer,Integer> reduce(Tuple3<Integer, Integer, Integer> t1, Tuple3<Integer, Integer, Integer> t2) throws Exception {
Tuple3<Integer,Integer,Integer> newTuple = new Tuple3<>();
newTuple.setFields(0,0,(Integer)t1.getField(2) + (Integer) t2.getField(2));
return newTuple;
}
});
reduce.printToErr().setParallelism(1);

我们对下面的元素按照第一个元素进行分组,第三个元素分别求和,并且把第一个和第二个元素都置为 0:

1
2
3
4
5
6
7
8
data.add(new Tuple3<>(0,1,0));
data.add(new Tuple3<>(0,1,1));
data.add(new Tuple3<>(0,2,2));
data.add(new Tuple3<>(0,1,3));
data.add(new Tuple3<>(1,2,5));
data.add(new Tuple3<>(1,2,9));
data.add(new Tuple3<>(1,2,11));
data.add(new Tuple3<>(1,2,13));

那么最终会得到:(0,0,6) 和 (0,0,38)。