| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 
 | package rddimport java.sql.{Connection, DriverManager, PreparedStatement}
 
 import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.{SparkConf, SparkContext}
 
 object process_json {
 def main(args: Array[String]): Unit={
 val sparkConf: SparkConf = new SparkConf()
 .setAppName("process_json")
 
 """
 data_demo:
 {"user_name":"brent","customer_id":12031602,"age":22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
 {"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
 {"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
 """.stripMargin
 val sc = new SparkContext(sparkConf)
 
 var json_rdd: RDD[String] = sc.textFile("hdfs://localhost:8020/data/user_data.json")
 
 val result_rdd: RDD[JSONObject] = json_rdd.map((x: String) => {
 val json_item: JSONObject = JSON.parseObject(x)
 json_item.put("flag", 1)
 json_item
 })
 
 val driverClassName = "com.mysql.jdbc.Driver"
 val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=false"
 val user = "root"
 val password = "cpx726175"
 
 result_rdd.foreachPartition((partition: Iterator[JSONObject]) => {
 Class.forName(driverClassName)
 val connection: Connection = DriverManager.getConnection(url, user, password)
 val sql = "insert into t_user(user_name, customer_id, age, birthday, deposit_amount, last_login_time,flag) values(?,?,?,?,?,?,?)"
 val statement: PreparedStatement = connection.prepareStatement(sql)
 try {
 partition.foreach {
 json_data: JSONObject => {
 statement.setString(1, json_data.getString("user_name"))
 statement.setInt(2, json_data.getInteger("customer_id"))
 statement.setInt(3, json_data.getInteger("age"))
 statement.setString(4, json_data.getString("birthday"))
 statement.setFloat(5, json_data.getFloat("deposit_amount"))
 statement.setString(6, json_data.getString("last_login_time"))
 statement.setInt(7, json_data.getInteger("flag"))
 statement.executeUpdate()
 }
 }
 }catch {
 case e: Exception =>  println(e.printStackTrace())
 }
 finally {
 if(statement!=null) statement.close()
 if(connection!=null) connection.close()
 }
 connection.close()
 }
 )
 sc.stop()
 }
 }
 
 |