SHC:使用 Spark SQL 高效地读写 HBase


Apache Spark 和 Apache HBase 是两个使用比较广泛的大数据组件。很多场景需要使用 Spark 分析/查询 HBase 中的数据,而目前 Spark 内置是支持很多数据源的,其中就包括了 HBase,但是内置的读取数据源还是使用了 TableInputFormat 来读取 HBase 中的数据。这个 TableInputFormat 有一些缺点:

  • 一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据;
  • TableInputFormat 中不支持 BulkGet;
  • 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。


从另一个方面来讲,如果先不谈关于性能的方面,那么你在写HBase的过程中,是不是步骤感觉很麻烦呢,Foreach每条数据Put到HBase中,往往写入逻辑部分的代码就要写很长很长的一段。


基于这些问题,来自 Hortonworks 的工程师们为我们带来了全新的 Apache Spark—Apache HBase Connector,下面简称 SHC。通过这个类库,我们可以直接使用 Spark SQL 将 DataFrame 中的数据写入到 HBase 中;而且我们也可以使用 Spark SQL 去查询 HBase 中的数据,在查询 HBase 的时候充分利用了 catalyst 引擎做了许多优化,比如分区修剪(partition pruning),列修剪(column pruning),谓词下推(predicate pushdown)和数据本地性(data locality)等等。因为有了这些优化,通过 Spark 查询 HBase 的速度有了很大的提升。


项目地址
具体的引入方式,在之前的Spark SQL入门中有写到

Catalog

对于每个表,必须提供一个目录,其中包括行键和具有预定义列族的数据类型的列,并定义hbase列与表模式之间的映射。目录是用户定义的json格式。

数据类型转换

支持Java基本类型。将来,将支持其他数据类型,这些数据类型依赖于用户指定的Serdes。SHC支持三种内部Serdes:Avro,Phoenix和PrimitiveType。用户可以通过在目录中定义“ tableCoder”来指定要使用的serde。为此,请参考示例和单元测试。以Avro为例。用户定义的Serdes将负责将字节数组转换为Avro对象,而连接器将负责将Avro对象转换为催化剂支持的数据类型。用户定义新Serde时,需要使其“实现”特征’SHCDataType’。


请注意,如果用户希望DataFrame仅处理字节数组,则可以指定二进制类型。然后,用户可以获得每个列为字节数组的催化剂行。用户可以使用定制的解串器进一步对它进行反序列化,或者直接在DataFrame的RDD上进行操作。

数据局部性

当Spark Worker节点与hbase区域服务器位于同一位置时,通过标识区域服务器位置并将执行程序与区域服务器一起定位来实现数据局部性。每个执行程序将仅对位于同一主机上的同一部分数据执行Scan / BulkGet。

谓词下推

该库使用HBase提供的现有标准HBase过滤器,并且不能在协处理器上运行。

分区修剪

通过从谓词中提取行键,我们将scan / BulkGet划分为多个非重叠区域,只有具有请求数据的区域服务器才会执行scan / BulkGet。当前,分区修剪是在行键的第一维上执行的。请注意,需要仔细定义WHERE条件。否则,结果扫描可能会包含一个比用户预期大的区域。例如,以下条件将导致完全扫描(rowkey1是行键的第一维,而column是常规的hbase列)。其中rowkey1>“ abc” OR列=“ xyz”

扫描和批量获取

都通过指定WHERE子句向用户公开,例如,其中column> x和column <y用于扫描,而column = x用于get。所有操作都在执行程序中执行,而驱动程序仅构造这些操作。在内部,我们将它们转换为扫描或获取或两者结合,从而将Iterator [Row]返回至催化剂引擎。

可创建的数据源

该库支持从HBase读取/向HBase写入。

应用用途

下面说明了如何使用连接器的基本步骤。有关更多详细信息和高级用例(例如Avro和复合键支持),请参考存储库中的示例。

定义了HBase目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin

上面定义了一个名称为table1,行键为键,列数为(col1-col8)的HBase表的架构。请注意,行键还必须详细定义为具有特定cf(行键)的列(col0)。

写入HBase表以填充数据

1
2
3
4
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

给定具有指定架构的DataFrame,上面将创建一个具有5个区域的HBase表并将该DataFrame保存在其中。请注意,如果未指定HBaseTableCatalog.newTable,则必须预先创建表。

在HBase表的顶部执行DataFrame操作

1
2
3
4
5
6
7
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}

复杂查询

1
2
3
4
5
6
7
8
9
10
val df = withCatalog(catalog)
val s = df.filter((($"col0" <= "row050" && $"col0" > "row040") ||
$"col0" === "row005" ||
$"col0" === "row020" ||
$"col0" === "r20" ||
$"col0" <= "row005") &&
($"col4" === 1 ||
$"col4" === 42))
.select("col0", "col1", "col4")
s.show

SQL支持

1
2
3
4
5
// Load the dataframe
val df = withCatalog(catalog)
//SQL example
df.createOrReplaceTempView("table")
sqlContext.sql("select count(col1) from table").show

## 支持Avro模式 该连接器完全支持所有avro模式。用户可以在其目录中使用完整记录模式或部分字段模式作为数据类型(有关更多详细信息,请参阅[此处](https://github.com/hortonworks-spark/shc/wiki/2.-Native-Avro-Support))。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val schema_array = s"""{"type": "array", "items": ["string","null"]}""".stripMargin
val schema_record =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
val catalog = s"""{
|"table":{"namespace":"default", "name":"htable"},
|"rowkey":"key1",
|"columns":{
|"col1":{"cf":"rowkey", "col":"key1", "type":"double"},
|"col2":{"cf":"cf1", "col":"col1", "avro":"schema_array"},
|"col3":{"cf":"cf1", "col":"col2", "avro":"schema_record"},
|"col4":{"cf":"cf1", "col":"col3", "type":"double"},
|"col5":{"cf":"cf1", "col":"col4", "type":"string"}
|}
|}""".stripMargin
val df = sqlContext.read.options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()
df.write.options(Map("schema_array"->schema_array,"schema_record"->schema_record, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()
####
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val complex = s"""MAP<int, struct<varchar:string>>"""
val schema =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
val catalog = s"""{
|"table":{"namespace":"default", "name":"htable"},
|"rowkey":"key1:key2",
|"columns":{
|"col1":{"cf":"rowkey", "col":"key1", "type":"binary"},
|"col2":{"cf":"rowkey", "col":"key2", "type":"double"},
|"col3":{"cf":"cf1", "col":"col1", "avro":"schema1"},
|"col4":{"cf":"cf1", "col":"col2", "type":"string"},
|"col5":{"cf":"cf1", "col":"col3", "type":"double", "sedes":"org.apache.spark.sql.execution.datasources.hbase.DoubleSedes"},
|"col6":{"cf":"cf1", "col":"col4", "type":"$complex"}
|}
|}""".stripMargin

val df = sqlContext.read.options(Map("schema1"->schema, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").load()
df.write.options(Map("schema1"->schema, HBaseTableCatalog.tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()

以上说明了我们的下一步,其中包括复合键支持,复杂数据类型,客户化Serde和Avro的支持。请注意,尽管所有主要部分都包含在当前代码库中,但现在可能无法运行。


SHC查询优化


SHC 主要使用下面的几种优化,使得 Spark 获取 HBase 的数据扫描范围得到减少,提高了数据读取的效率。

将使用 Rowkey 的查询转换成 get 查询**
我们都知道,HBase 中使用 Get 查询的效率是非常高的,所以如果查询的过滤条件是针对 RowKey 进行的,那么我们可以将它转换成 Get 查询。为了说明这点,我们使用下面的例子进行说明。假设我们定义好的 HBase catalog 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val catalog = s"""{
|"table":{"namespace":"default", "name":"iteblog", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"id", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin

那么如果有类似下面的查询

1
2
3
4
5
val df = withCatalog(catalog)
df.createOrReplaceTempView("iteblog_table")
sqlContext.sql("select * from iteblog_table where id = 1")
sqlContext.sql("select * from iteblog_table where id = 1 or id = 2")
sqlContext.sql("select * from iteblog_table where id in (1, 2)")

因为查询条件直接是针对 RowKey 进行的,所以这种情况直接可以转换成 Get 或者 BulkGet 请求的。第一个 SQL 查询过程类似于下面过程



但是,如果碰到非 RowKey 的过滤,那么这种查询是需要扫描 HBase 的全表的。上面的查询在 shc 里面就是将 HBase 里面的所有数据拿到,然后传输到 Spark ,再通过 Spark 里面进行过滤,可见 shc 在这种情况下效率是很低下的。


注意,上面的查询在 shc 返回的结果是错误的。具体原因是在将 id = 1 or col7 = ‘xxx’ 查询条件进行合并时,丢弃了所有的查找条件,相当于返回表的所有数据。定位到代码可以参见下面的

1
2
3
4
5
6
def or[T](left: HRF[T],
right: HRF[T])(implicit ordering: Ordering[T]): HRF[T] = {
val ranges = ScanRange.or(left.ranges, right.ranges)
val typeFilter = TypedFilter.or(left.tf, right.tf)
HRF(ranges, typeFilter, left.handled && right.handled)
}

同理,类似于下面的查询在 shc 里面其实都是全表扫描,并且将所有的数据返回到 Spark 层面上再进行一次过滤。

1
2
3
sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 <= 'xxx'")
sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 >= 'xxx'")
sqlContext.sql("select id, col6, col8 from iteblog_table where col7 = 'xxx'")

很显然,这种方式查询效率并不高,一种可行的方案是将算子下推到 HBase 层面,在 HBase 层面通过 SingleColumnValueFilter 过滤一部分数据,然后再返回到 Spark,这样可以节省很多数据的传输。

组合 RowKey 的查询优化


shc 还支持组合 RowKey 的方式来建表,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def cat =
s"""{
|"table":{"namespace":"default", "name":"iteblog", "tableCoder":"PrimitiveType"},
|"rowkey":"key1:key2",
|"columns":{
|"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"},
|"col01":{"cf":"rowkey", "col":"key2", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin

上面的 col00 和 col01 两列组合成一个 rowkey,并且 col00 排在前面,col01 排在后面。比如 col00 =’row002’,col01 = 2,那么组合的 rowkey 为 row002\x00\x00\x00\x02。那么在组合 Rowkey 的查询 shc 都有哪些优化呢?现在我们有如下查询

1
df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

根据上面的信息,RowKey 其实是由 col00 和 col01 组合而成的,那么上面的查询其实可以将 col00 和 col01 进行拼接,然后组合成一个 RowKey,然后上面的查询其实可以转换成一个 Get 查询。但是在 shc 里面,上面的查询是转换成一个 scan 和一个 get 查询的。scan 的 startRow 为 row000,endRow 为 row000\xff\xff\xff\xff;get 的 rowkey 为 row000\xff\xff\xff\xff,然后再将所有符合条件的数据返回,最后再在 Spark 层面上做一次过滤,得到最后查询的结果。因为 shc 里面组合键查询的代码还没完善,所以当前实现应该不是最终的。
在 shc 里面下面两条 SQL 查询下沉到 HBase 的逻辑一致

1
df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000'").show()df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0").show()

唯一区别是在 Spark 层面上的过滤。

scan 查询优化

如果我们的查询有 < 或 > 等查询过滤条件,比如下面的查询条件:

1
df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 > 'row000' and col00 < 'row005'").show()

这个在 shc 里面转换成 HBase 的过滤为一条 get 和 一个 scan,具体为 get 的 Rowkey 为 row0005\xff\xff\xff\xff;scan 的 startRow 为 row000,endRow 为 row005\xff\xff\xff\xff,然后将查询的结果返回到 spark 层面上进行过滤。
总体来说,shc 能在一定程度上对查询进行优化,避免了全表扫描。但是经过评测,shc 其实还有很多地方不够完善,算子下沉并没有下沉到 HBase 层面上进行。