Spark Core 数据存储操作


上文讲解了RDD的概念和转换算子、行动算子的操作内容,对于算子的操作,属于Spark针对数据进行计算的过程。回到现实场景,我们使用Spark来做任务的过程,其实就是三个阶段:

  • 读取数据源
  • 对源数据进行计算,业务逻辑的编写
  • 数据存储,数据落地


spark-streaming-datanami.png


在日常的工作中,Spark的任务可以分为两类,离线计算和实时计算。


针对实时计算,几乎95%的情况下,我们是使用kafka作为消息队列与Spark进行数据对接。
针对离线计算,数据源就多种多样了,可以是产品业务线的业务数据库,可以是大数据平台的Hive,HBase等,也可以是CEO办公室给出的报表数据。


总结一下上面所讲述的,就是数据源多种多样,选择合适的组件和API将数据读进来就好。并且,在现实业务场景中,都是选择上层组件Spark SQL 和 Spark Streaming来读取数据,计算数据,存储数据。所以在这里就不复杂的数据操作了,只是针对在简单的业务场景,不需要Spark SQL 和 Spark Streaming组件,使用Spark Core处理简单的读取、计算和存储任务示例。

数据落地至HDFS文件系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package rdd
import com.google.gson.{JsonObject, JsonParser}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object process_json {
def main(args: Array[String]): Unit={
val sparkConf: SparkConf = new SparkConf()
.setAppName("process_json")
.setMaster("local")

val sc = new SparkContext(sparkConf)
"""
data_demo:
{"user_name":"brent","customer_id":12031602,"age":22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
{"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
{"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
""".stripMargin
var json_rdd: RDD[String] = sc.textFile("hdfs://localhost:8020/data/user_data.json")

val result_rdd: RDD[JsonObject] = json_rdd.map((x: String) => {
val json = new JsonParser()
val json_item = json.parse(x).asInstanceOf[JsonObject]
json_item.addProperty("flag", 1)
json_item
})
result_rdd.saveAsTextFile("hdfs://localhost:8020/data/result/user_data")
}
}



数据落地至MySQL**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package rdd
import java.sql.{Connection, DriverManager, PreparedStatement}

import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object process_json {
def main(args: Array[String]): Unit={
val sparkConf: SparkConf = new SparkConf()
.setAppName("process_json")

"""
data_demo:
{"user_name":"brent","customer_id":12031602,"age":22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
{"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
{"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
""".stripMargin
val sc = new SparkContext(sparkConf)
// 读取数据源
var json_rdd: RDD[String] = sc.textFile("hdfs://localhost:8020/data/user_data.json")
// 数据计算
val result_rdd: RDD[JSONObject] = json_rdd.map((x: String) => {
val json_item: JSONObject = JSON.parseObject(x)
json_item.put("flag", 1)
json_item
})

val driverClassName = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/test?characterEncoding=utf8&useSSL=false"
val user = "root"
val password = "cpx726175"

result_rdd.foreachPartition((partition: Iterator[JSONObject]) => {
Class.forName(driverClassName)
val connection: Connection = DriverManager.getConnection(url, user, password)
val sql = "insert into t_user(user_name, customer_id, age, birthday, deposit_amount, last_login_time,flag) values(?,?,?,?,?,?,?)"
val statement: PreparedStatement = connection.prepareStatement(sql)
try {
partition.foreach {
json_data: JSONObject => {
statement.setString(1, json_data.getString("user_name"))
statement.setInt(2, json_data.getInteger("customer_id"))
statement.setInt(3, json_data.getInteger("age"))
statement.setString(4, json_data.getString("birthday"))
statement.setFloat(5, json_data.getFloat("deposit_amount"))
statement.setString(6, json_data.getString("last_login_time"))
statement.setInt(7, json_data.getInteger("flag"))
statement.executeUpdate()
}
}
}catch {
case e: Exception => println(e.printStackTrace())
}
finally {
if(statement!=null) statement.close()
if(connection!=null) connection.close()
}
connection.close()
}
)
sc.stop()
}
}

注:在使用mysql建表时,float字段,double字段需要指定小数点位数。否则将会按照四舍五入整数显示

1
2
3
4
5
6
7
8
9
CREATE TABLE `t_user` (
`user_name` varchar(50) DEFAULT NULL,
`customer_id` int(50) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` varchar(50) DEFAULT NULL,
`deposit_amount` float(20,3) DEFAULT NULL,
`last_login_time` varchar(100) DEFAULT NULL,
`flag` int(10) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

落地数据如下:
屏幕快照 2020-04-14 下午10.23.52.png
**

数据落地至HBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package rdd
import java.lang
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object process_json {
def main(args: Array[String]): Unit={
val sparkConf: SparkConf = new SparkConf()
.setAppName("process_json")

"""
data_demo:
{"user_name":"brent","customer_id":12031602,"age":22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
{"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
{"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
""".stripMargin
val sc = new SparkContext(sparkConf)
var json_rdd: RDD[String] = sc.textFile("hdfs://localhost:8020/data/user_data.json")

val result_rdd: RDD[JSONObject] = json_rdd.map((x: String) => {

val json_item: JSONObject = JSON.parseObject(x)
json_item.put("flag", 1)
json_item
})

var resultConf: Configuration = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
resultConf.set("hbase.zookeeper.quorum", "localhost")
//设置zookeeper连接端口,默认2181
resultConf.set("hbase.zookeeper.property.clientPort", "2181")
//注意这里是output
resultConf.set(TableOutputFormat.OUTPUT_TABLE, "t_user")
var job: Job = Job.getInstance(resultConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[org.apache.hadoop.hbase.client.Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val hbaseOut: RDD[(ImmutableBytesWritable, Put)] = result_rdd.map((json_data: JSONObject) => {
val put = new Put(Bytes.toBytes(json_data.getInteger("customer_id").toString))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("user_name"), Bytes.toBytes(json_data.getString("user_name")))
//直接写入整型会以十六进制存储
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("customer_id"), Bytes.toBytes(json_data.get("customer_id").toString))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("age"), Bytes.toBytes(json_data.get("age").toString))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("birthday"), Bytes.toBytes(json_data.getString("birthday")))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("deposit_amount"), Bytes.toBytes(json_data.get("deposit_amount").toString))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("last_login_time"), Bytes.toBytes(json_data.getString("last_login_time")))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("flag"), Bytes.toBytes(json_data.get("flag").toString))
(new ImmutableBytesWritable, put)
})
hbaseOut.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}


hbase建表语句:

1
create 't_user', 'cf1'

落地数据如下:
屏幕快照 2020-04-14 下午10.24.12.png

以上就是针对RDD,进行数据写入的操作。过程并不复杂,但是需要注意的是,在写入过程中,要对应好数据类型。