1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| package write
import java.security.MessageDigest
import com.alibaba.fastjson.{JSON, JSONObject} import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.{Connection, Put, Table} import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.elasticsearch.spark.streaming.EsSparkStreaming
import utils.HBaseUtil
import scala.util.Try
object streaming_to_hbase_1 { val logger:Logger = Logger.getRootLogger Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf() .setAppName("spark streaming window") .setMaster("local[2]") .set("spark.es.nodes", "localhost") .set("spark.es.port", "9200") .set("es.index.auto.create", "true")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka_spark_streaming", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) )
val topics = Array("weibo_keyword")
val kafkaStream: DStream[(String, String)] = KafkaUtils .createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) .map((x: ConsumerRecord[String, String]) => { val json_data: JSONObject = JSON.parseObject(x.value()) val date_time: String = json_data.get("datetime").toString val keywordList: String = json_data.get("keywordList").toString (date_time, keywordList) }) val es_dstream: DStream[String] = kafkaStream.map((x: (String, String)) => { val date_time: String = x._1 val keywordList: String = x._2 val data_es_json: JSONObject = new JSONObject() data_es_json.put("date_time", date_time) data_es_json.put("keyword_list", keywordList) data_es_json.put("rowkey", MD5Encode(date_time)) data_es_json.toJSONString })
EsSparkStreaming.saveJsonToEs(es_dstream,"weibo_keyword-2017-04-25/default") kafkaStream.foreachRDD((rdd: RDD[(String, String)]) => { rdd.foreachPartition((partitionRecords: Iterator[(String, String)]) => { try { val connection: Connection = HBaseUtil.getHBaseConn partitionRecords.foreach((s: (String, String)) => {
val tableName: TableName = TableName.valueOf("t_weibo_keyword") val table: Table = connection.getTable(tableName)
var date_time: String = s._1 val keywordList: String = s._2 val put = new Put(Bytes.toBytes(MD5Encode(date_time))) put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keywordLisr"), Bytes.toBytes(keywordList))
Try(table.put(put)).getOrElse(table.close())
}) } catch { case e: Exception => logger.info(e) logger.info("写入HBase失败") } }) })
ssc.start() ssc.awaitTermination() }
def MD5Encode(input: String): String = { val md5: MessageDigest = MessageDigest.getInstance("MD5") val encoded: Array[Byte] = md5.digest(input.getBytes) encoded.map("%02x".format(_: Byte)).mkString }
}
|