Spark SQL 入门

程序起点


在Spark2.0之前, Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。
每个JVM只能激活一个SparkContext。

1
2
3
4
5
val sparkConf: SparkConf = new SparkConf()
.setAppName("transformation_func")
.setMaster("local")

val sc = new SparkContext(sparkConf)


在Spark2.0之后, SparkSession类是Spark中所有功能的入口点。为了引入dataframe和dataset的API,要创建一个基本的SparkSession,只需使用SparkSession.builder()。SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,不需要显示的创建。并且提供了对Hive功能的内置支持,下图是SparkSession的源码定义:
屏幕快照 2020-04-16 下午8.29.55.png

SparkSession创建**

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

创建DataFrame


使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame 。


基于RDD转化DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package read

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object sparksql_rdd {

case class Person(user_name: String, sex: String)

def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("function_case")
.master("local")
.getOrCreate()

val rdd_ss: RDD[(String, String)] = spark.sparkContext.makeRDD(List(("brent","male"),("haylee","female"),("vicky","male")))

import spark.implicits._
val df2: DataFrame = rdd_ss.map((x: (String, String)) =>{Person(x._1,x._2)}).toDF()

}
}



基于JSON文件的内容创建一个DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package spark_sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object sparksql_1 {
def main(args: Array[String]): Unit ={
val spark: SparkSession = SparkSession
.builder()
.appName("sql_case")
.master("local")
.getOrCreate()

val df_json: DataFrame = spark.read.json("hdfs://localhost:8020/data/user_data.json")

df_json.show(5)
}
}


基于Hive表内容创建一个DataFrame:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package spark_sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object sparksql_hive {
def main(args: Array[String]): Unit = {
val warehouseLocation="hdfs://localhost:8020/user/hive/warehouse"
val spark: SparkSession = SparkSession
.builder()
.appName("sql_case")
.master("local")
.config("spark.sql.warehouse.dir",warehouseLocation)
.enableHiveSupport()
.getOrCreate()

val df_databases: DataFrame = spark.sql("show databases")
df_databases.show()
}
}

注意: Spark读取Hive是需要两三个步骤的

  1. 如果你是在集群上运行,需要注意,要将hive-site.xml复制一份到spark目录下的conf文件夹中,如果你是在本地连接集群中的Hive,那么请将hive-site.xml复制一份到你IDEA中,resources目录下。
  1. 如果你是在集群上运行,需要注意,要将mysql-connector-java-8.0.19.jar复制一份到spark目录下的jars文件夹中,如果你是在本地连接集群中的Hive,那么请在pom文件中不要忘记引入mysql-connector-java
  1. 在本地运行,可能会遇到/tmp/hive的权限问题,请用chmod修改/tmp权限为777


**基于HBase内容创建一个DataFrame:**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package spark_sql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

import scala.collection.immutable

object sparksql_hbase {

def main(args: Array[String]): Unit = {
case class Record(col0: Int, col1: Int, col2: Boolean)

val spark: SparkSession = SparkSession
.builder()
.appName("Spark HBase Example")
.master("local[4]")
.getOrCreate()

def catalog: String =
// 这里,我们在读取数据的过程中,无论什么类型的数据,type字段统一指定成 string 即可。否则读取报错
s"""{
|"table":{"namespace":"default", "name":"t_user"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"user_name", "type":"string"},
|"col2":{"cf":"cf1", "col":"customer_id", "type":"string"},
|"col3":{"cf":"cf1", "col":"age", "type":"string"},
|"col4":{"cf":"cf1", "col":"birthday", "type":"string"},
|"col5":{"cf":"cf1", "col":"deposit_amount", "type":"string"},
|"col6":{"cf":"cf1", "col":"last_login_time", "type":"string"},
|"col7":{"cf":"cf1", "col":"flag", "type":"string"}
|}
|}""".stripMargin


// read
val df: DataFrame = spark
.read
.option(HBaseTableCatalog.tableCatalog, catalog)
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()

df.show()

}
}


注意:如果我们对于读取和写入HBase的场景很频繁的话,就需要考虑性能的问题,内置的读取数据源是使用了 TableInputFormat 来读取 HBase 中的数据。这个 TableInputFormat 有一些缺点:

  • 一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据;
  • TableInputFormat 中不支持 BulkGet;
  • 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。


基于这些问题,来自 Hortonworks 的工程师们为我们带来了全新的 Apache Spark—Apache HBase Connector,下面简称 SHC。通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。


但是对于使用SHC,目前还是有些麻烦的,网上的maven依赖可能是因为版本的原因,程序引入找不到org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog类,这里推荐自己下载源码,进行编译成jar文件或者编译后上传到自己的maven库中进行使用

  1. 下载源码 https://github.com/hortonworks-spark/shc,选择相应低于或者等于spark,hbase的版本
  1. 本地中打开,点击程序根目录下的pom文件,注释掉distributionManagement,直接点击install,将jar包生成到你本地的maven库中,当然你也可以上传到你远程的私有Maven 库中。

屏幕快照 2020-04-19 下午11.05.09.png

  1. pom文件中,引入下面依赖,就可以使用了(注意 version 版本号)
    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.hortonworks</groupId>
    <artifactId>shc-core</artifactId>
    <version>1.1.2-2.2-s_2.11</version>
    </dependency>

DataFrame操作

上篇文章中,我们讲到DataFrame每一列并不存储类型信息,所以在编译时并不能发现类型错误,所以在这里我们也可以叫做** 无类型的数据集操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
package function

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}



object sparksql_function {
def main(args: Array[String]): Unit ={
val spark: SparkSession = SparkSession
.builder()
.appName("function_case")
.master("local")
.config("spark.sql.crossJoin.enabled", "true")
.getOrCreate()

// 样例数据
/**
* {"user_name":"brent","customer_id":12031602,"age": 22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
{"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
{"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
*/
val df: DataFrame = spark.read.json("hdfs://localhost:8020/data/user_data.json")

val rdd_row: RDD[Row] = spark.sparkContext
.makeRDD(List(("brent", "male"), ("haylee", "female"), ("vicky", "male")))
.map((x: (String, String)) => Row(x._1, x._2))



// The schema is encoded in a string
val schemaString = "user_name sex"

// Generate the schema based on the string of schema
val fields: Array[StructField] = schemaString.split(" ")
.map((fieldName: String) => StructField(fieldName, StringType, nullable = true))

val schema = StructType(fields)

val df2: DataFrame = spark.createDataFrame(rdd_row, schema)


show_get_data(spark, df)
map_data(spark, df)
filter_data(spark, df)
sort_data(spark, df)
groupBy_data(spark, df)
join_data(spark, df, df2)
intersect_data(spark, df, df2)
withColumn_rename_dataframe(spark, df)
}

def show_get_data(spark: SparkSession, df: DataFrame): Unit = {

df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- birthday: string (nullable = true)
// |-- customer_id: long (nullable = true)
// |-- deposit_amount: double (nullable = true)
// |-- last_login_time: string (nullable = true)
// |-- user_name: string (nullable = true)

df.show(5)
//默认打印前20条结果
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+

// Select only the "name" column
// 这个表达式不能进行计算操作
df.select("user_name", "age").show()
// +---------+
// |user_name|
// +---------+
// | brent|
// | haylee|
// | vicky|
// +---------+

// Select everybody, but increment the age by 1
// This import is needed to use the $-notation
import spark.implicits._
df.select($"user_name", $"age" + 1 as "new_age").show()
// +---------+-------+
// |user_name|new_age|
// +---------+-------+
// | brent| 23|
// | haylee| 24|
// | vicky| 31|
// +---------+-------+

import org.apache.spark.sql.functions._
df.select(col("customer_id"), col("deposit_amount")).show()

df.limit(5).show()

df.describe()



}

def map_data(spark: SparkSession, df: DataFrame): Unit = {
import spark.implicits._
// 注意 这里是Row类型
df.map((x: Row) => {"name: "+x.getAs[String]("user_name")}).show()

}

def filter_data(spark: SparkSession, df: DataFrame): Unit = {
import spark.implicits._
// 取等于时必须用===
df.filter($"user_name" === "brent").show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// +---+----------+-----------+--------------+-------------------+---------+
df.filter($"age" > 25).show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+
df.filter("deposit_amount = 3000.0").show()
df.filter($"deposit_amount" > 200 and $"age" < 25).show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// +---+----------+-----------+--------------+-------------------+---------+

df.filter("substring(user_name,0,1) = 'h'").show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// +---+----------+-----------+--------------+-------------------+---------+

// 在源码中可以看到,where算子,底层是filter实现的。
import org.apache.spark.sql.functions._
df.where(col("age") > 23).show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+

df.where("age> 23").show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+
}

def sort_data(spark: SparkSession, df: DataFrame): Unit = {
import spark.implicits._
df.sort($"age".desc).show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// +---+----------+-----------+--------------+-------------------+---------+
df.sort($"age".asc).show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+

// 只能对数字类型和日期类型生效
df.orderBy($"age")

df.orderBy(- df("age"))

df.orderBy(df("age").desc)

}

def groupBy_data(spark: SparkSession, df: DataFrame): Unit = {
df.groupBy("age").count().show()
// +---+-----+
// |age|count|
// +---+-----+
// | 22| 1|
// | 30| 1|
// | 23| 1|
// +---+-----+
// 只能作用于数值字段
df.groupBy("user_name").max("deposit_amount").show()
df.groupBy("user_name").min("deposit_amount").show()
df.groupBy("user_name").mean("deposit_amount").as("mean_deposit_amount").show()
df.groupBy("user_name").sum("deposit_amount").toDF("user_name", "sum_deposit_amount").show()
// +---------+------------------+
// |user_name|sum_deposit_amount|
// +---------+------------------+
// | vicky| 200.4|
// | haylee| 4000.56|
// | brent| 3000.0|
// +---------+------------------+


import org.apache.spark.sql.functions._
df.groupBy("user_name", "age")
.agg(min("deposit_amount").as("min_deposit_amount"))
.show()
// +---------+---+------------------+
// |user_name|age|min_deposit_amount|
// +---------+---+------------------+
// | vicky| 30| 200.4|
// | haylee| 23| 4000.56|
// | brent| 22| 3000.0|
// +---------+---+------------------+


//单独使用 agg
df.agg("age" -> "max").show()

}

def distinct_data(spark: SparkSession, df: DataFrame): Unit = {
// distinct 底层实现实则为 dropDuplicates()
df.distinct()
df.dropDuplicates()
}

def join_data(spark: SparkSession, df: DataFrame, df2: DataFrame): Unit = {
//笛卡尔积, spark2中默认不开启笛卡尔积,需添加"spark.sql.crossJoin.enabled", "true"配置
df.join(df2).show()

df.join(df2, "user_name").show()

df.join(df2, Seq("user_name"), "left").show()
// +---------+---+----------+-----------+--------------+-------------------+------+
// |user_name|age| birthday|customer_id|deposit_amount| last_login_time| sex|
// +---------+---+----------+-----------+--------------+-------------------+------+
// | vicky| 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| male|
// | haylee| 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00|female|
// | brent| 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| male|
// +---------+---+----------+-----------+--------------+-------------------+------+

}


def intersect_data(spark: SparkSession, df: DataFrame, df2: DataFrame): Unit = {
// 获取两个DataFrame中共有的记录
df.intersect(df2).show(false)
}

def withColumn_rename_dataframe(spark: SparkSession, df: DataFrame): Unit = {
// 字段重命名
df.withColumnRenamed("deposit_amount","withdraw_amount").show()
// 添加新列
import spark.implicits._
df.withColumn("next_year_age", $"age"+1).show()
}

}

以编程方式运行SQL查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object spark_use_sql {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("sql_case")
.master("local")
.getOrCreate()

// 样例数据
/**
* {"user_name":"brent","customer_id":12031602,"age": 22,"birthday":"1993-04-05","deposit_amount":3000,"last_login_time":"2017-03-10 14:55:22"}
* {"user_name":"haylee","customer_id":12031603,"age":23,"birthday":"1992-08-10","deposit_amount":4000.56,"last_login_time":"2017-03-11 10:55:00"}
* {"user_name":"vicky","customer_id":12031604,"age":30,"birthday":"2000-03-02","deposit_amount":200.4,"last_login_time":"2017-03-10 09:10:00"}
*/
val df: DataFrame = spark.read.json("hdfs://localhost:8020/data/user_data.json")

df.createTempView("t_user")

spark.sql("select * from t_user").show()
// +---+----------+-----------+--------------+-------------------+---------+
// |age| birthday|customer_id|deposit_amount| last_login_time|user_name|
// +---+----------+-----------+--------------+-------------------+---------+
// | 22|1993-04-05| 12031602| 3000.0|2017-03-10 14:55:22| brent|
// | 23|1992-08-10| 12031603| 4000.56|2017-03-11 10:55:00| haylee|
// | 30|2000-03-02| 12031604| 200.4|2017-03-10 09:10:00| vicky|
// +---+----------+-----------+--------------+-------------------+---------+
import org.apache.spark.sql.functions._
spark.sql("select * from t_user").groupBy("user_name").agg("deposit_amount"->"sum").show()
}
}

DataSet创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.sql.{Dataset, SparkSession}

object dataset {
case class Person(name: String, age: Long)
def main(args: Array[String]): Unit ={
val spark: SparkSession = SparkSession
.builder()
.appName("dataset_case")
.master("local")
.getOrCreate()

import spark.implicits._
// $example on:create_ds$
// Encoders are created for case classes
val caseClassDS: Dataset[Person] = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS: Dataset[Int] = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS: Dataset[Person] = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_ds$
}
}

数据存储


文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// $example on:generic_load_save_functions$
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
// $example off:generic_load_save_functions$
// $example on:manual_load_options$
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
// $example off:manual_load_options$
// $example on:manual_load_options_csv$
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
// $example off:manual_load_options_csv$
// $example on:manual_save_options_orc$
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
// $example off:manual_save_options_orc$

// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
// $example off:direct_sql$
// $example on:write_sorting_and_bucketing$
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
// $example off:write_sorting_and_bucketing$
// $example on:write_partitioning$
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
// $example off:write_partitioning$
// $example on:write_partition_and_bucket$
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
// $example off:write_partition_and_bucket$

spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")


jdbc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package write

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

object write_data {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("write_case")
.master("local")
.getOrCreate()

// read
val jdbcDF: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()


// write
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.mode("append")
.save()

// or
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","secret_password")
jdbcDF.write
.mode("append")
.jdbc("jdbc:mysql://your_ip:3306/my_test?useUnicode=true&characterEncoding=UTF-8","t_result",properties)

}
}


hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()


hbase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
object Application {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("normal")
.getOrCreate()
spark.sparkContext.setLogLevel("warn")
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}

val df:DataFrame = spark.createDataFrame(data)

df.write
.mode(SaveMode.Overwrite)
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}

def catalog = s"""{
|"table":{"namespace":"rec", "name":"user_rec"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"t", "col":"col1", "type":"boolean"},
|"col2":{"cf":"t", "col":"col2", "type":"double"},
|"col3":{"cf":"t", "col":"col3", "type":"float"},
|"col4":{"cf":"t", "col":"col4", "type":"int"},
|"col5":{"cf":"t", "col":"col5", "type":"bigint"},
|"col6":{"cf":"t", "col":"col6", "type":"smallint"},
|"col7":{"cf":"t", "col":"col7", "type":"string"},
|"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
}
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)

object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}