程序起点
在Spark2.0之前, Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建一个SparkContext您首先需要构建一个SparkConf对象,其中包含有关您的应用程序的信息。
每个JVM只能激活一个SparkContext。
1 | val sparkConf: SparkConf = new SparkConf() |
在Spark2.0之后, SparkSession类是Spark中所有功能的入口点。为了引入dataframe和dataset的API,要创建一个基本的SparkSession,只需使用SparkSession.builder()。SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,不需要显示的创建。并且提供了对Hive功能的内置支持,下图是SparkSession的源码定义:
SparkSession创建**
1 | import org.apache.spark.sql.SparkSession |
创建DataFrame
使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame 。
基于RDD转化DataFrame:
1 | package read |
基于JSON文件的内容创建一个DataFrame:
1 | package spark_sql |
基于Hive表内容创建一个DataFrame:
1 | package spark_sql |
注意: Spark读取Hive是需要两三个步骤的
- 如果你是在集群上运行,需要注意,要将hive-site.xml复制一份到spark目录下的conf文件夹中,如果你是在本地连接集群中的Hive,那么请将hive-site.xml复制一份到你IDEA中,resources目录下。
- 如果你是在集群上运行,需要注意,要将mysql-connector-java-8.0.19.jar复制一份到spark目录下的jars文件夹中,如果你是在本地连接集群中的Hive,那么请在pom文件中不要忘记引入mysql-connector-java
- 在本地运行,可能会遇到/tmp/hive的权限问题,请用chmod修改/tmp权限为777
**基于HBase内容创建一个DataFrame:**
1 | package spark_sql |
注意:如果我们对于读取和写入HBase的场景很频繁的话,就需要考虑性能的问题,内置的读取数据源是使用了 TableInputFormat 来读取 HBase 中的数据。这个 TableInputFormat 有一些缺点:
- 一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据;
- TableInputFormat 中不支持 BulkGet;
- 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。
基于这些问题,来自 Hortonworks 的工程师们为我们带来了全新的 Apache Spark—Apache HBase Connector,下面简称 SHC。通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。
但是对于使用SHC,目前还是有些麻烦的,网上的maven依赖可能是因为版本的原因,程序引入找不到org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog类,这里推荐自己下载源码,进行编译成jar文件或者编译后上传到自己的maven库中进行使用
- 下载源码 https://github.com/hortonworks-spark/shc,选择相应低于或者等于spark,hbase的版本
- 本地中打开,点击程序根目录下的pom文件,注释掉distributionManagement,直接点击install,将jar包生成到你本地的maven库中,当然你也可以上传到你远程的私有Maven 库中。
- pom文件中,引入下面依赖,就可以使用了(注意 version 版本号)
1
2
3
4
5<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc-core</artifactId>
<version>1.1.2-2.2-s_2.11</version>
</dependency>
DataFrame操作
上篇文章中,我们讲到DataFrame每一列并不存储类型信息,所以在编译时并不能发现类型错误,所以在这里我们也可以叫做** 无类型的数据集操作。
1 | package function |
以编程方式运行SQL查询
1 | package sql |
DataSet创建
1 | import org.apache.spark.sql.{Dataset, SparkSession} |
数据存储
文件
1 | // $example on:generic_load_save_functions$ |
jdbc
1 | package write |
hive
1 | import java.io.File |
hbase
1 | object Application { |