1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package rdd import java.sql.{Connection, DriverManager, PreparedStatement}
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
object process_json { def main(args: Array[String]): Unit={ val sparkConf: SparkConf = new SparkConf() .setAppName("process_json")
""" data_demo: {"user_name":"brent","customer_id":12031602,"age":22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"} {"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"} {"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"} """.stripMargin val sc = new SparkContext(sparkConf) var json_rdd: RDD[String] = sc.textFile("hdfs://localhost:8020/data/user_data.json") val result_rdd: RDD[JSONObject] = json_rdd.map((x: String) => { val json_item: JSONObject = JSON.parseObject(x) json_item.put("flag", 1) json_item }) val driverClassName = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=false" val user = "root" val password = "cpx726175" result_rdd.foreachPartition((partition: Iterator[JSONObject]) => { Class.forName(driverClassName) val connection: Connection = DriverManager.getConnection(url, user, password) val sql = "insert into t_user(user_name, customer_id, age, birthday, deposit_amount, last_login_time,flag) values(?,?,?,?,?,?,?)" val statement: PreparedStatement = connection.prepareStatement(sql) try { partition.foreach { json_data: JSONObject => { statement.setString(1, json_data.getString("user_name")) statement.setInt(2, json_data.getInteger("customer_id")) statement.setInt(3, json_data.getInteger("age")) statement.setString(4, json_data.getString("birthday")) statement.setFloat(5, json_data.getFloat("deposit_amount")) statement.setString(6, json_data.getString("last_login_time")) statement.setInt(7, json_data.getInteger("flag")) statement.executeUpdate() } } }catch { case e: Exception => println(e.printStackTrace()) } finally { if(statement!=null) statement.close() if(connection!=null) connection.close() } connection.close() } ) sc.stop() } }
|