Spark  和 Apache HBase  是两个使用比较广泛的大数据组件。很多场景需要使用 Spark  分析/查询 HBase  中的数据,而目前 Spark  内置是支持很多数据源的,其中就包括了 HBase,但是内置的读取数据源还是使用了 TableInputFormat 来读取 HBase 中的数据。这个 TableInputFormat 有一些缺点:
一个 Task 里面只能启动一个 Scan 去 HBase 中读取数据; TableInputFormat 中不支持 BulkGet; 不能享受到 Spark SQL 内置的 catalyst 引擎的优化。 项目地址 
Catalog 对于每个表,必须提供一个目录,其中包括行键和具有预定义列族的数据类型的列,并定义hbase列与表模式之间的映射。目录是用户定义的json格式。
数据类型转换 支持Java基本类型。将来,将支持其他数据类型,这些数据类型依赖于用户指定的Serdes。SHC支持三种内部Serdes:Avro,Phoenix和PrimitiveType。用户可以通过在目录中定义“ tableCoder”来指定要使用的serde。为此,请参考示例和单元测试。以Avro为例。用户定义的Serdes将负责将字节数组转换为Avro对象,而连接器将负责将Avro对象转换为催化剂支持的数据类型。用户定义新Serde时,需要使其“实现”特征’SHCDataType’。
数据局部性 当Spark Worker节点与hbase区域服务器位于同一位置时,通过标识区域服务器位置并将执行程序与区域服务器一起定位来实现数据局部性。每个执行程序将仅对位于同一主机上的同一部分数据执行Scan / BulkGet。
谓词下推 该库使用HBase提供的现有标准HBase过滤器,并且不能在协处理器上运行。
分区修剪 通过从谓词中提取行键,我们将scan / BulkGet划分为多个非重叠区域,只有具有请求数据的区域服务器才会执行scan / BulkGet。当前,分区修剪是在行键的第一维上执行的。请注意,需要仔细定义WHERE条件。否则,结果扫描可能会包含一个比用户预期大的区域。例如,以下条件将导致完全扫描(rowkey1是行键的第一维,而column是常规的hbase列)。其中rowkey1>“ abc” OR列=“ xyz”
扫描和批量获取 都通过指定WHERE子句向用户公开,例如,其中column> x和column <y用于扫描,而column = x用于get。所有操作都在执行程序中执行,而驱动程序仅构造这些操作。在内部,我们将它们转换为扫描或获取或两者结合,从而将Iterator [Row]返回至催化剂引擎。
可创建的数据源 该库支持从HBase读取/向HBase写入。
应用用途 下面说明了如何使用连接器的基本步骤。有关更多详细信息和高级用例(例如Avro和复合键支持),请参考存储库中的示例。
定义了HBase目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def  catalog  s"" "{         |" table":{" namespace":" default", " name":" table1"},         |" rowkey":" key",         |" columns":{           |" col0":{" cf":" rowkey", " col":" key", " type":" string"},           |" col1":{" cf":" cf1", " col":" col1", " type":" boolean"},           |" col2":{" cf":" cf2", " col":" col2", " type":" double"},           |" col3":{" cf":" cf3", " col":" col3", " type":" float"},           |" col4":{" cf":" cf4", " col":" col4", " type":" int"},           |" col5":{" cf":" cf5", " col":" col5", " type":" bigint"},           |" col6":{" cf":" cf6", " col":" col6", " type":" smallint"},           |" col7":{" cf":" cf7", " col":" col7", " type":" string"},           |" col8":{" cf":" cf8", " col":" col8", " type":" tinyint"}         |}       |}" "" .stripMargin
上面定义了一个名称为table1,行键为键,列数为(col1-col8)的HBase表的架构。请注意,行键还必须详细定义为具有特定cf(行键)的列(col0)。
写入HBase表以填充数据 1 2 3 4 sc.parallelize(data).toDF.write.options(   Map (HBaseTableCatalog .tableCatalog -> catalog, HBaseTableCatalog .newTable -> "5" ))   .format("org.apache.spark.sql.execution.datasources.hbase" )   .save() 
给定具有指定架构的DataFrame,上面将创建一个具有5个区域的HBase表并将该DataFrame保存在其中。请注意,如果未指定HBaseTableCatalog.newTable,则必须预先创建表。
在HBase表的顶部执行DataFrame操作 1 2 3 4 5 6 7 def  withCatalog String ): DataFrame  = {  sqlContext   .read   .options(Map (HBaseTableCatalog .tableCatalog->cat))   .format("org.apache.spark.sql.execution.datasources.hbase" )   .load() } 
复杂查询 1 2 3 4 5 6 7 8 9 10 val  df = withCatalog(catalog)val  s = df.filter((($"col0"  <= "row050"  && $"col0"  > "row040" ) ||  $"col0"  === "row005"  ||   $"col0"  === "row020"  ||   $"col0"  ===  "r20"  ||   $"col0"  <= "row005" ) &&   ($"col4"  === 1  ||   $"col4"  === 42 ))   .select("col0" , "col1" , "col4" ) s.show 
SQL支持 1 2 3 4 5 val  df = withCatalog(catalog)df.createOrReplaceTempView("table" ) sqlContext.sql("select count(col1) from table" ).show 
 ## 支持Avro模式 该连接器完全支持所有avro模式。用户可以在其目录中使用完整记录模式或部分字段模式作为数据类型(有关更多详细信息,请参阅[此处](https://github.com/hortonworks-spark/shc/wiki/2.-Native-Avro-Support))。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 val  schema_array = s"" "{" type": " array", " items": [" string"," null"]}" "" .stripMarginval  schema_record =  s"" "{" namespace": " example.avro",       |   " type": " record",      " name": " User ",      |    " fields": [      {" name": " name", " type": " string"},      |      {" name": " favorite_number",  " type": [" int", " null"]},      |        {" name": " favorite_color", " type": [" string", " null"]}      ]    }" "" .stripMarginval  catalog = s"" "{         |" table":{" namespace":" default", " name":" htable"},         |" rowkey":" key1",         |" columns":{           |" col1":{" cf":" rowkey", " col":" key1", " type":" double"},           |" col2":{" cf":" cf1", " col":" col1", " avro":" schema_array"},           |" col3":{" cf":" cf1", " col":" col2", " avro":" schema_record"},           |" col4":{" cf":" cf1", " col":" col3", " type":" double"},           |" col5":{" cf":" cf1", " col":" col4", " type":" string"}         |}       |}" "" .stripMargin val  df = sqlContext.read.options(Map ("schema_array" ->schema_array,"schema_record" ->schema_record, HBaseTableCatalog .tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase" ).load() df.write.options(Map ("schema_array" ->schema_array,"schema_record" ->schema_record, HBaseTableCatalog .tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase" ).save() 
 ####
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 val  complex = s"" "MAP<int, struct<varchar:string>>" "" val  schema =  s"" "{" namespace": " example.avro",       |   " type": " record",      " name": " User ",      |    " fields": [      {" name": " name", " type": " string"},      |      {" name": " favorite_number",  " type": [" int", " null"]},      |        {" name": " favorite_color", " type": [" string", " null"]}      ]    }" "" .stripMarginval  catalog = s"" "{         |" table":{" namespace":" default", " name":" htable"},         |" rowkey":" key1:key2",         |" columns":{           |" col1":{" cf":" rowkey", " col":" key1", " type":" binary"},           |" col2":{" cf":" rowkey", " col":" key2", " type":" double"},           |" col3":{" cf":" cf1", " col":" col1", " avro":" schema1"},           |" col4":{" cf":" cf1", " col":" col2", " type":" string"},           |" col5":{" cf":" cf1", " col":" col3", " type":" double",        " sedes":" org.apache.spark.sql.execution.datasources.hbase.DoubleSedes "},           |" col6":{" cf":" cf1", " col":" col4", " type":" $complex"}         |}       |}" "" .stripMargin    val  df = sqlContext.read.options(Map ("schema1" ->schema, HBaseTableCatalog .tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase" ).load()df.write.options(Map ("schema1" ->schema, HBaseTableCatalog .tableCatalog->catalog)).format("org.apache.spark.sql.execution.datasources.hbase" ).save() 
以上说明了我们的下一步,其中包括复合键支持,复杂数据类型,客户化Serde和Avro的支持。请注意,尽管所有主要部分都包含在当前代码库中,但现在可能无法运行。
SHC查询优化 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 val  catalog = s"" "{   |" table":{" namespace":" default", " name":" iteblog", " tableCoder":" PrimitiveType "},   |" rowkey":" key",   |" columns":{     |" col0":{" cf":" rowkey", " col":" id", " type":" int"},     |" col1":{" cf":" cf1", " col":" col1", " type":" boolean"},     |" col2":{" cf":" cf2", " col":" col2", " type":" double"},     |" col3":{" cf":" cf3", " col":" col3", " type":" float"},     |" col4":{" cf":" cf4", " col":" col4", " type":" int"},     |" col5":{" cf":" cf5", " col":" col5", " type":" bigint"},     |" col6":{" cf":" cf6", " col":" col6", " type":" smallint"},     |" col7":{" cf":" cf7", " col":" col7", " type":" string"},     |" col8":{" cf":" cf8", " col":" col8", " type":" tinyint"}   |} |}" "" .stripMargin
那么如果有类似下面的查询
1 2 3 4 5 val  df = withCatalog(catalog)df.createOrReplaceTempView("iteblog_table" ) sqlContext.sql("select * from iteblog_table where id = 1" ) sqlContext.sql("select * from iteblog_table where id = 1 or id = 2" ) sqlContext.sql("select * from iteblog_table where id in (1, 2)" ) 
因为查询条件直接是针对 RowKey 进行的,所以这种情况直接可以转换成 Get 或者 BulkGet 请求的。第一个 SQL 查询过程类似于下面过程传输到 Spark ,再通过 Spark 里面进行过滤 ,可见 shc 在这种情况下效率是很低下的。
1 2 3 4 5 6 def  or T ](left: HRF [T ],            right: HRF [T ])(implicit  ordering: Ordering [T ]): HRF [T ] = {     val  ranges = ScanRange .or(left.ranges, right.ranges)     val  typeFilter = TypedFilter .or(left.tf, right.tf)     HRF (ranges, typeFilter, left.handled && right.handled) } 
同理,类似于下面的查询在 shc 里面其实都是全表扫描,并且将所有的数据返回到 Spark 层面上再进行一次过滤。
1 2 3 sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 <= 'xxx'" ) sqlContext.sql("select id, col6, col8 from iteblog_table where id = 1 or col7 >= 'xxx'" ) sqlContext.sql("select id, col6, col8 from iteblog_table where col7 = 'xxx'" ) 
很显然,这种方式查询效率并不高,一种可行的方案是将算子下推到 HBase 层面,在 HBase 层面通过 SingleColumnValueFilter 过滤一部分数据,然后再返回到 Spark,这样可以节省很多数据的传输。
组合 RowKey 的查询优化 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def  cat    s"" "{       |" table":{" namespace":" default", " name":" iteblog", " tableCoder":" PrimitiveType "},      |" rowkey":" key1:key2",      |" columns":{      |" col00":{" cf":" rowkey", " col":" key1", " type":" string", " length":" 6 "},      |" col01":{" cf":" rowkey", " col":" key2", " type":" int"},      |" col1":{" cf":" cf1", " col":" col1", " type":" boolean"},      |" col2":{" cf":" cf2", " col":" col2", " type":" double"},      |" col3":{" cf":" cf3", " col":" col3", " type":" float"},      |" col4":{" cf":" cf4", " col":" col4", " type":" int"},      |" col5":{" cf":" cf5", " col":" col5", " type":" bigint"},      |" col6":{" cf":" cf6", " col":" col6", " type":" smallint"},      |" col7":{" cf":" cf7", " col":" col7", " type":" string"},      |" col8":{" cf":" cf8", " col":" col8", " type":" tinyint"}      |}      |}" "" .stripMargin
上面的 col00 和 col01 两列组合成一个 rowkey,并且 col00 排在前面,col01 排在后面。比如 col00 =’row002’,col01 = 2,那么组合的 rowkey 为 row002\x00\x00\x00\x02。那么在组合 Rowkey 的查询 shc 都有哪些优化呢?现在我们有如下查询
1 df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0" ).show() 
根据上面的信息,RowKey 其实是由 col00 和 col01 组合而成的,那么上面的查询其实可以将 col00 和 col01 进行拼接,然后组合成一个 RowKey,然后上面的查询其实可以转换成一个 Get 查询。但是在 shc 里面,上面的查询是转换成一个 scan 和一个 get 查询的。scan 的 startRow 为 row000,endRow 为 row000\xff\xff\xff\xff;get 的 rowkey 为 row000\xff\xff\xff\xff,然后再将所有符合条件的数据返回,最后再在 Spark 层面上做一次过滤,得到最后查询的结果。因为 shc 里面组合键查询的代码还没完善,所以当前实现应该不是最终的。
1 df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000'" ).show()df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 = 'row000' and col01 = 0" ).show() 
唯一区别是在 Spark 层面上的过滤。
scan 查询优化 如果我们的查询有 < 或 > 等查询过滤条件,比如下面的查询条件:
1 df.sqlContext.sql("select col00, col01, col1 from iteblog where col00 > 'row000' and col00 < 'row005'" ).show() 
这个在 shc 里面转换成 HBase 的过滤为一条 get 和 一个 scan,具体为 get 的 Rowkey 为 row0005\xff\xff\xff\xff;scan 的 startRow 为 row000,endRow 为 row005\xff\xff\xff\xff,然后将查询的结果返回到 spark 层面上进行过滤。