val properties: Properties = newProperties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "kafka_consumer")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // exactly-once 语义保证整个应用内端到端的数据一致性 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 开启检查点并指定检查点时间间隔为5s env.enableCheckpointing(5000) // checkpoint every 5000 msecs // 设置StateBackend,并指定状态数据存储位置 env.setStateBackend(newFsStateBackend("file:///Users/cpeixin/IdeaProjects/code_warehouse/data/KafkaSource"))
val topic: String = "weibo_keyword" val kafkaSource: FlinkKafkaConsumerBase[String] = KafkaUtil.getKafkaSource(topic)
val word_stream: DataStream[Raw] = env.addSource(kafkaSource) .map((x: String) => { val date_time: String = get_value(x)._1 val keywordList: String = get_value(x)._2 Raw(date_time, keywordList) })
val processStream: DataStream[Raw] = word_stream.process(newSideOutput()) val dirty_stream: DataStream[Raw] = processStream.getSideOutput(newOutputTag[Raw]("dirty_data"))
ALL :8> dirty_data ALL :7> 网购翻车的经历,经历 ALL :1> 彭昱畅的自拍和他拍,彭昱畅 ALL :7> dirty_data ALL :1> 彭昱畅的自拍和他拍,彭昱畅 ALL :7> dirty_data ALL :1> 网购翻车的经历,经历 ALL :7> dirty_data ALL :1> 网购翻车的经历,经历 ALL :8> dirty_data ALL :7> 痛仰演唱会,演唱会 ALL :1> 痛仰演唱会,演唱会 ALL :8> dirty_data ALL :1> dirty_data ALL :7> dirty_data ALL :1> 痛仰演唱会,演唱会 ALL :1> dirty_data ALL :8> dirty_data ALL :7> 痛仰演唱会,演唱会 ALL :8> dirty_data ALL :1> 痛仰演唱会,演唱会 ALL :8> dirty_data ALL :1> dirty_data ALL :7> dirty_data ALL :1> 幸福触手可及片花,片花 ALL :7> 幸福触手可及片花,片花