在Flink中,主要分为source,transform,sink三个部分,前面我们使用Flink利用封装好的方法对接Kafka,就是属于source部分,那么在sink阶段也是一样,我们只能通过定义好的sink方法,将数据落地到我们想存储的地方,这里虽看看起来没有Spark那么灵活,要么foreachRDD,foreachPartition遍历输出,或者是第三方组件方法进行输出,但是Flink这样设计更有利于工程师降低代码复杂度,更多精力去关心业务开发就可以。
一些比较基本的 Sink 已经内置在 Flink 里。数据接收器使用DataStream,并将其转发到文件,套接字,外部系统或打印它们。Flink带有各种内置的输出格式,这些格式封装在DataStream的操作后面:
writeAsText()/ TextOutputFormat-将元素按行写为字符串。通过调用每个元素的toString()方法获得字符串。 writeAsCsv(…)/ CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的toString()方法。 print()/ printToErr() - 在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前缀(msg),该前缀在输出之前。这可以帮助区分不同的打印调用。如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。 writeUsingOutputFormat()/ FileOutputFormat-自定义文件输出的方法和基类。支持自定义对象到字节的转换。 writeToSocket -根据以下内容将元素写入套接字 SerializationSchema addSink-调用自定义接收器功能。Flink捆绑有连接到其他系统(例如Apache Kafka)的连接器,这些连接器实现为接收器功能。 请注意,上的write*()方法DataStream主要用于调试目的。它们不参与Flink的检查点,这意味着这些功能通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。 这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。同样,在失败的情况下,这些记录可能会丢失。为了将流可靠,准确地一次传输到文件系统中,请使用flink-connector-filesystem。同样,通过该.addSink(…)方法的自定义实现可以参与Flink一次精确语义的检查点。
Sink原理 SinkFunction 是一个接口,类似于SourceFunction接口。SinkFunction中主要包含一个方法,那就是用于数据输出的invoke 方法,每条记录都会执行一次invoke方法,用于执行输出操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 default void invoke (IN value) throws Exception default void invoke (IN value, Context context) throws Exception interface Context<T> { long currentProcessingTime () ; long currentWatermark () ; Long timestamp () ; }
我们一般自定义Sink的时候,都是继承AbstractRichFunction,他是一个抽象类,实现了RichFunction接口。
1 public abstract class AbstractRichFunction implements RichFunction , Serializable
并且提供了关于RuntimContext的操作和open,clone方法。AbstractRichFunction 有很多实现类,比如关于msyql操作的JDBCSinkFunction,比如直接输出结果的 PrintSinkFunction,在我们开发的过程中,我们进程用print语句来打印结果,但是print函数中就是讲PrintSinkFunction类传递到addSink方法中。
1 2 3 4 public DataStreamSink<T> print () { PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(); return addSink(printFunction).name("Print to Std. Out" ); }
PrintSinkFunction 我们这里分析一下PrintSinkFunction这个类,这个类就是将没个元素输出到标准输出或者是标准错误输出流中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class PrintSinkFunction <IN > extends RichSinkFunction <IN > { private static final long serialVersionUID = 1L ; private final PrintSinkOutputWriter<IN> writer; public PrintSinkFunction () { writer = new PrintSinkOutputWriter<>(false ); } public PrintSinkFunction (final boolean stdErr) { writer = new PrintSinkOutputWriter<>(stdErr); } public PrintSinkFunction (final String sinkIdentifier, final boolean stdErr) { writer = new PrintSinkOutputWriter<>(sinkIdentifier, stdErr); } @Override public void open (Configuration parameters) throws Exception { super .open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); } @Override public void invoke (IN record) { writer.write(record); } @Override public String toString () { return writer.toString(); } }
分析: 1、调用构造函数来创建一个PrintSinkOutputWriter 2、调用open方法中在调用PrintSinkOutputWriter 的open方法,进行初始化 3、调用invoke方法,通过PrintSinkOutputWriter 的writer方法吧record输出
第三方Sink 下面我们将使用 Elasticsearch Connector 作为Sink 为例示范Sink的使用。Elasticsearch Connector 提供了 *at least once * 语义支持,at lease once 支持需要用到Flink的checkpoint 机制。要使用Elasticsearch Connector 需要根据Elasticsearch 版本添加依赖,版本参考
1 2 3 4 5 <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-connector-elasticsearch6_2.11</artifactId > <version > ${flink.version}</version > </dependency >
ES版本6.7, 写入示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package data_streamimport java.net.{InetAddress , InetSocketAddress }import java.utilimport java.util.Properties import com.alibaba.fastjson.{JSON , JSONObject }import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment , _}import org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler import org.apache.flink.streaming.connectors.elasticsearch.{ActionRequestFailureHandler , ElasticsearchSinkFunction , RequestIndexer }import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase import org.apache.flink.util.ExceptionUtils import org.apache.http.HttpHost import org.elasticsearch.ElasticsearchParseException import org.elasticsearch.action.ActionRequest import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException import utils.KafkaUtil object datastream_2_es_HandFail { case class Raw (date_time: String , keywordList: String ) private val KAFKA_TOPIC : String = "weibo_keyword" def main (args: Array [String ]) { val properties: Properties = new Properties () properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "kafka_consumer" ) val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode .EXACTLY_ONCE ) env.enableCheckpointing(5000 ) env.setStateBackend(new FsStateBackend ("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource" )) val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase [String ] = KafkaUtil .getKafkaSource(topic) val word_stream: DataStream [Raw ] = env.addSource(kafkaSource) .map((x: String ) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw (date_time, keywordList) }) val httpHosts = new java.util.ArrayList [HttpHost ] httpHosts.add(new HttpHost ("127.0.0.1" , 9200 , "http" )) val esSinkBuilder = new ElasticsearchSink .Builder [Raw ] ( httpHosts, new ElasticsearchSinkFunction [Raw ] { override def process (data: Raw , runtimeContext: RuntimeContext , requestIndexer: RequestIndexer ): Unit = { print("saving data" + data) val hashMap = new util.HashMap [String , String ]() hashMap.put("date_time" , data.date_time) hashMap.put("keyword_list" , data.keywordList) hashMap.put("rowkey" , "i am rowkey haha" ) val indexRequest: IndexRequest = Requests .indexRequest().index("weibo_keyword-2018-04-30" ).`type `("default" ).source (hashMap ) //发送请求 ,写入数据 requestIndexer .add (indexRequest ) println ("data saved successfully" ) } } ) esSinkBuilder .setBulkFlushMaxActions (2 ) esSinkBuilder .setBulkFlushInterval (1000L ) // 自定义异常处理 esSinkBuilder .setFailureHandler (new ActionRequestFailureHandler { override def onFailure(actionRequest: ActionRequest , throwable: Throwable , i: Int , requestIndexer: RequestIndexer ) : Unit = { println("@@@@@@@On failure from ElasticsearchSink:-->" + throwable.getMessage) } }) word_stream.addSink(esSinkBuilder.build()) env.execute("Flink Streaming—————es sink" ) } def get_value (string_data: String ): (String , String ) = { val json_data: JSONObject = JSON .parseObject(string_data) val date_time: String = json_data.get("datetime" ).toString val keywordList: String = json_data.get("keywordList" ).toString (date_time, keywordList) } }
自定义Sink
mysql 在向MySQL中写数据的时候,千万不要忘记引入驱动依赖:
1 2 3 4 5 <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 8.0.15</version > </dependency >
示例代码: 自定义Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import java.sql.{Connection , DriverManager , PreparedStatement , SQLException }import data_stream.datastream_2_mysql.Raw import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction , SinkFunction }import org.slf4j.{Logger , LoggerFactory }class MysqlSink_v2 extends RichSinkFunction [Raw ] { val logger: Logger = LoggerFactory .getLogger("MysqlSink" ) var conn: Connection = _ var ps: PreparedStatement = _ val jdbcUrl = "jdbc:mysql://localhost:3306?useSSL=false&allowPublicKeyRetrieval=true" val username = "root" val password = "password" val driverName = "com.mysql.jdbc.Driver" override def open (parameters: Configuration ): Unit = { Class .forName(driverName) try { Class .forName(driverName) conn = DriverManager .getConnection(jdbcUrl, username, password) conn.setAutoCommit(false ) } catch { case e@(_: ClassNotFoundException | _: SQLException ) => logger.error("init mysql error" ) e.printStackTrace() System .exit(-1 ); } } override def invoke (raw: Raw , context: SinkFunction .Context [_]): Unit = { println("data : " + raw.toString) ps = conn.prepareStatement("insert into test.t_weibo_keyword(date_time,keywordList) values(?,?)" ) ps.setString(1 , raw.date_time) ps.setString(2 , raw.keywordList) ps.execute() conn.commit() } override def close (): Unit = { if (conn != null ){ conn.commit() conn.close() } } }
主程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package data_streamimport java.util.Properties import com.alibaba.fastjson.{JSON , JSONObject }import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment , _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase import utils.KafkaUtil object datastream_2_mysql { case class Raw (date_time: String , keywordList: String ) private val KAFKA_TOPIC : String = "weibo_keyword" def main (args: Array [String ]) { val properties: Properties = new Properties () properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "kafka_consumer" ) val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode .EXACTLY_ONCE ) env.enableCheckpointing(5000 ) env.setStateBackend(new FsStateBackend ("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource" )) val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase [String ] = KafkaUtil .getKafkaSource(topic) val word_stream: DataStream [Raw ] = env.addSource(kafkaSource) .map((x: String ) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw (date_time, keywordList) }) val sink = new MysqlSink_v2 () word_stream.addSink(sink) env.execute("Flink Streaming—————mysql sink" ) } def get_value (string_data: String ): (String , String ) = { val json_data: JSONObject = JSON .parseObject(string_data) val date_time: String = json_data.get("datetime" ).toString val keywordList: String = json_data.get("keywordList" ).toString (date_time, keywordList) } }
HBase 自定义sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package data_streamimport data_stream.datastream_2_hbase.Raw import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction , SinkFunction }import org.apache.hadoop.hbase.{HBaseConfiguration , HConstants , TableName }import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.util.Bytes class HBaseSink_v4 (tableName: String , family: String ) extends RichSinkFunction [Raw ] { var conn: Connection = _ override def open (parameters: Configuration ): Unit = { super .open(parameters) val conf = HBaseConfiguration .create() conf.set(HConstants .ZOOKEEPER_QUORUM , "localhost:2181" ) conn = ConnectionFactory .createConnection(conf) } override def invoke (value: Raw , context: SinkFunction .Context [_]): Unit = { println(value) val t: Table = conn.getTable(TableName .valueOf(tableName)) val put: Put = new Put (Bytes .toBytes(value.date_time)) put.addColumn(Bytes .toBytes(family), Bytes .toBytes("name" ), Bytes .toBytes(value.date_time.toString)) put.addColumn(Bytes .toBytes(family), Bytes .toBytes("id_no" ), Bytes .toBytes(value.keywordList.toString)) t.put(put) t.close() } override def close (): Unit = { super .close() conn.close() } }
主程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package data_streamimport java.util.Properties import com.alibaba.fastjson.{JSON , JSONObject }import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.datastream.DataStreamSink import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment , _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase import utils.KafkaUtil object datastream_2_hbase { case class Raw (date_time: String , keywordList: String ) private val KAFKA_TOPIC : String = "weibo_keyword" def main (args: Array [String ]) { val properties: Properties = new Properties () properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "kafka_consumer_hbase" ) val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode .EXACTLY_ONCE ) env.enableCheckpointing(5000 ) env.setStateBackend(new FsStateBackend ("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource/hbase" )) val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase [String ] = KafkaUtil .getKafkaSource(topic).setStartFromLatest() val word_stream: DataStream [Raw ] = env.addSource(kafkaSource) .map((x: String ) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw (date_time, keywordList) }) val value: DataStreamSink [Raw ] = word_stream.addSink(new HBaseSink_v4 ("t_weibo_keyword_2" ,"cf1" )).name("write_2_hbase" ) env.execute("Flink Streaming—————hbase sink" ) } def get_value (string_data: String ): (String , String ) = { val json_data: JSONObject = JSON .parseObject(string_data) val date_time: String = json_data.get("datetime" ).toString val keywordList: String = json_data.get("keywordList" ).toString (date_time, keywordList) } }
Redis 1 2 3 4 5 <dependency > <groupId > org.apache.bahir</groupId > <artifactId > flink-connector-redis_2.11</artifactId > <version > 1.0</version > </dependency >
创建Sink工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package utilsimport data_stream.datastream_2_redis.Raw import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand , RedisCommandDescription , RedisMapper }object RedisUtil { private val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig .Builder ().setHost("127.0.0.1" ).setPort(6379 ).build() def getRedisSink (): RedisSink [Raw ] = { new RedisSink [Raw ](config, new MyRedisMapper ) } class MyRedisMapper extends RedisMapper [Raw ] { override def getCommandDescription : RedisCommandDescription = { new RedisCommandDescription (RedisCommand .HSET , "weibo_keyword" ) } override def getValueFromData (t: Raw ): String = t.keywordList override def getKeyFromData (t: Raw ): String = t.date_time } }
主程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package data_streamimport java.util.Properties import com.alibaba.fastjson.{JSON , JSONObject }import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.{DataStream , StreamExecutionEnvironment , _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase import org.apache.flink.streaming.connectors.redis.RedisSink import utils.{KafkaUtil , RedisUtil }object datastream_2_redis { case class Raw (date_time: String , keywordList: String ) private val KAFKA_TOPIC : String = "weibo_keyword" def main (args: Array [String ]) { val properties: Properties = new Properties () properties.setProperty("bootstrap.servers" , "localhost:9092" ) properties.setProperty("group.id" , "kafka_consumer" ) val env: StreamExecutionEnvironment = StreamExecutionEnvironment .getExecutionEnvironment env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode .EXACTLY_ONCE ) env.enableCheckpointing(5000 ) env.setStateBackend(new FsStateBackend ("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource" )) val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase [String ] = KafkaUtil .getKafkaSource(topic) val word_stream: DataStream [Raw ] = env.addSource(kafkaSource) .map((x: String ) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw (date_time, keywordList) }) word_stream.print("2_redis" ).setParallelism(1 ) val redis_sink: RedisSink [Raw ] = RedisUtil .getRedisSink() word_stream.addSink(redis_sink).name("write_2_redis" ) env.execute("Flink Streaming—————redis sink" ) } def get_value (string_data: String ): (String , String ) = { val json_data: JSONObject = JSON .parseObject(string_data) val date_time: String = json_data.get("datetime" ).toString val keywordList: String = json_data.get("keywordList" ).toString (date_time, keywordList) } }