Flink 数据输出 Sink

在Flink中,主要分为source,transform,sink三个部分,前面我们使用Flink利用封装好的方法对接Kafka,就是属于source部分,那么在sink阶段也是一样,我们只能通过定义好的sink方法,将数据落地到我们想存储的地方,这里虽看看起来没有Spark那么灵活,要么foreachRDD,foreachPartition遍历输出,或者是第三方组件方法进行输出,但是Flink这样设计更有利于工程师降低代码复杂度,更多精力去关心业务开发就可以。

一些比较基本的 Sink 已经内置在 Flink 里。数据接收器使用DataStream,并将其转发到文件,套接字,外部系统或打印它们。Flink带有各种内置的输出格式,这些格式封装在DataStream的操作后面:

  • writeAsText()/ TextOutputFormat-将元素按行写为字符串。通过调用每个元素的toString()方法获得字符串。
  • writeAsCsv(…)/ CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的toString()方法。
  • print()/ printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前缀(msg),该前缀在输出之前。这可以帮助区分不同的打印调用。如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。
  • writeUsingOutputFormat()/ FileOutputFormat-自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • writeToSocket -根据以下内容将元素写入套接字 SerializationSchema
  • addSink-调用自定义接收器功能。Flink捆绑有连接到其他系统(例如Apache Kafka)的连接器,这些连接器实现为接收器功能。


请注意,上的write*()方法DataStream主要用于调试目的。它们不参与Flink的检查点,这意味着这些功能通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。


这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。同样,在失败的情况下,这些记录可能会丢失。为了将流可靠,准确地一次传输到文件系统中,请使用flink-connector-filesystem。同样,通过该.addSink(…)方法的自定义实现可以参与Flink一次精确语义的检查点。

Sink原理


SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Writes the given value to the sink. This function is called for every record.
default void invoke(IN value) throws Exception
default void invoke(IN value, Context context) throws Exception
// Context接口中返回关于时间的信息
interface Context<T> {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}


我们一般自定义Sink的时候,都是继承AbstractRichFunction,他是一个抽象类,实现了RichFunction接口。

1
public abstract class AbstractRichFunction implements RichFunction, Serializable


并且提供了关于RuntimContext的操作和open,clone方法。AbstractRichFunction 有很多实现类,比如关于msyql操作的JDBCSinkFunction,比如直接输出结果的 PrintSinkFunction,在我们开发的过程中,我们进程用print语句来打印结果,但是print函数中就是讲PrintSinkFunction类传递到addSink方法中。

1
2
3
4
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
return addSink(printFunction).name("Print to Std. Out");
}


PrintSinkFunction
我们这里分析一下PrintSinkFunction这个类,这个类就是将没个元素输出到标准输出或者是标准错误输出流中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private final PrintSinkOutputWriter<IN> writer;
/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(stdErr);
}
/**
* Instantiates a print sink function that prints to standard out and gives a sink identifier.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
* @param sinkIdentifier Message that identify sink and is prefixed to the output of the value
*/
public PrintSinkFunction(final String sinkIdentifier, final boolean stdErr) {
writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
@Override
public String toString() {
return writer.toString();
}
}

分析:
1、调用构造函数来创建一个PrintSinkOutputWriter
2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化
3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出

第三方Sink

下面我们将使用 Elasticsearch Connector 作为Sink 为例示范Sink的使用。Elasticsearch Connector 提供了 *at least once *语义支持,at lease once 支持需要用到Flink的checkpoint 机制。要使用Elasticsearch Connector 需要根据Elasticsearch 版本添加依赖,版本参考

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>


ES版本6.7, 写入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package data_stream

import java.net.{InetAddress, InetSocketAddress}
import java.util
import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler
import org.apache.flink.streaming.connectors.elasticsearch.{ActionRequestFailureHandler, ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.util.ExceptionUtils
import org.apache.http.HttpHost
import org.elasticsearch.ElasticsearchParseException
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException
import utils.KafkaUtil


object datastream_2_es_HandFail {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))


val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)

val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})


val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))


val esSinkBuilder = new ElasticsearchSink.Builder[Raw] (
httpHosts, new ElasticsearchSinkFunction[Raw] {
override def process(data: Raw, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
print("saving data" + data)
//包装成一个Map或者JsonObject
val hashMap = new util.HashMap[String, String]()
hashMap.put("date_time", data.date_time)
hashMap.put("keyword_list", data.keywordList)
hashMap.put("rowkey", "i am rowkey haha")
//创建index request,准备发送数据
val indexRequest: IndexRequest = Requests.indexRequest().index("weibo_keyword-2018-04-30").`type`("default").source(hashMap)
//发送请求,写入数据
requestIndexer.add(indexRequest)
println("data saved successfully")
}
}
)

esSinkBuilder.setBulkFlushMaxActions(2)
esSinkBuilder.setBulkFlushInterval(1000L)
// 自定义异常处理
esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler {
override def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = {
println("@@@@@@@On failure from ElasticsearchSink:-->" + throwable.getMessage)
}
})

word_stream.addSink(esSinkBuilder.build())

env.execute("Flink Streaming—————es sink")

}


def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}
}

自定义Sink

mysql


在向MySQL中写数据的时候,千万不要忘记引入驱动依赖:

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>


示例代码:


自定义Sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.sql.{Connection, DriverManager, PreparedStatement, SQLException}

import data_stream.datastream_2_mysql.Raw
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.slf4j.{Logger, LoggerFactory}

class MysqlSink_v2 extends RichSinkFunction[Raw] {

val logger: Logger = LoggerFactory.getLogger("MysqlSink")
var conn: Connection = _
var ps: PreparedStatement = _
val jdbcUrl = "jdbc:mysql://localhost:3306?useSSL=false&allowPublicKeyRetrieval=true"
val username = "root"
val password = "password"
val driverName = "com.mysql.jdbc.Driver"

override def open(parameters: Configuration): Unit = {

Class.forName(driverName)
try {
Class.forName(driverName)
conn = DriverManager.getConnection(jdbcUrl, username, password)

// close auto commit
conn.setAutoCommit(false)
} catch {
case e@(_: ClassNotFoundException | _: SQLException) =>
logger.error("init mysql error")
e.printStackTrace()
System.exit(-1);
}
}

/**
* 吞吐量不够话,可以将数据暂存在状态中,批量提交的方式提高吞吐量(如果oom,可能就是数据量太大,资源没有及时释放导致的)
* @param raw
* @param context
*/
override def invoke(raw: Raw, context: SinkFunction.Context[_]): Unit = {
println("data : " + raw.toString)
ps = conn.prepareStatement("insert into test.t_weibo_keyword(date_time,keywordList) values(?,?)")
ps.setString(1, raw.date_time)
ps.setString(2, raw.keywordList)
ps.execute()
conn.commit()
}
override def close(): Unit = {
if (conn != null){
conn.commit()
conn.close()
}
}
}

主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package data_stream

import java.util.Properties
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import utils.KafkaUtil


object datastream_2_mysql {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))

val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)

val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})

val sink = new MysqlSink_v2()
word_stream.addSink(sink)

env.execute("Flink Streaming—————mysql sink")
}
def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}
}

HBase


自定义sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package data_stream

import data_stream.datastream_2_hbase.Raw
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes


class HBaseSink_v4(tableName: String, family: String) extends RichSinkFunction[Raw] {


var conn: Connection = _

override def open(parameters: Configuration): Unit = {
super.open(parameters)
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181")
conn = ConnectionFactory.createConnection(conf)
}

override def invoke(value: Raw, context: SinkFunction.Context[_]): Unit = {
println(value)
val t: Table = conn.getTable(TableName.valueOf(tableName))

val put: Put = new Put(Bytes.toBytes(value.date_time))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("name"), Bytes.toBytes(value.date_time.toString))
put.addColumn(Bytes.toBytes(family), Bytes.toBytes("id_no"), Bytes.toBytes(value.keywordList.toString))
t.put(put)
t.close()
}

override def close(): Unit = {
super.close()
conn.close()
}
}


主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package data_stream

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import utils.KafkaUtil


object datastream_2_hbase {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer_hbase")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource/hbase"))


val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic).setStartFromLatest()


val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})

val value: DataStreamSink[Raw] = word_stream.addSink(new HBaseSink_v4("t_weibo_keyword_2","cf1")).name("write_2_hbase")

env.execute("Flink Streaming—————hbase sink")
}
def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}
}

Redis

1
2
3
4
5
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>


创建Sink工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package utils

import data_stream.datastream_2_redis.Raw
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisUtil {

private val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build()

def getRedisSink(): RedisSink[Raw] = {
new RedisSink[Raw](config, new MyRedisMapper)
}

class MyRedisMapper extends RedisMapper[Raw] {
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "weibo_keyword")
}
// value
override def getValueFromData(t: Raw): String = t.keywordList
//key
override def getKeyFromData(t: Raw): String = t.date_time
}
}


主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package data_stream

import java.util.Properties

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import org.apache.flink.streaming.connectors.redis.RedisSink
import utils.{KafkaUtil, RedisUtil}


object datastream_2_redis {

case class Raw(date_time: String, keywordList: String)

private val KAFKA_TOPIC: String = "weibo_keyword"

def main(args: Array[String]) {

val properties: Properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "kafka_consumer")

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// exactly-once 语义保证整个应用内端到端的数据一致性
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 开启检查点并指定检查点时间间隔为5s
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
// 设置StateBackend,并指定状态数据存储位置
env.setStateBackend(new FsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))


val topic: String = "weibo_keyword"
val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)

val word_stream: DataStream[Raw] = env.addSource(kafkaSource)
.map((x: String) => {
val date_time: String = get_value(x)._1
val keywordList: String = get_value(x)._2
Raw(date_time, keywordList)
})

word_stream.print("2_redis").setParallelism(1)

val redis_sink: RedisSink[Raw] = RedisUtil.getRedisSink()
word_stream.addSink(redis_sink).name("write_2_redis")


env.execute("Flink Streaming—————redis sink")
}
def get_value(string_data: String): (String, String) = {
val json_data: JSONObject = JSON.parseObject(string_data)
val date_time: String = json_data.get("datetime").toString
val keywordList: String = json_data.get("keywordList").toString
(date_time, keywordList)
}
}

屏幕快照 2020-05-22 下午11.17.19.png