HBase中bloomfilter源码实现

Bloom filter最优的大小计算

Bloom过滤器对插入到其中的元素的数量非常敏感。对于HBase来说,条目的数量取决于存储在列中的数据的大小。当前默认区域大小为256MB,因此条目计数~=256MB/(列的平均值大小)。尽管有这个经验法则,但是由于压缩,我们并没有有效的方法来计算压缩后的条目计数。因此,通常使用动态bloom过滤器来添加额外的空间,而不是允许错误率增长。


Bloom filter最优的大小计算公示为:bloom size m = -(n * ln(err) / (ln(2)^2) ~= n * ln(err) / ln(0.6185)

  • m表示Bloom filter中的位数(bitSize)
  • n表示插入bloomfilter中的元素数(maxKeys)
  • k表示使用的哈希函数数(nbHash)
  • e表示Bloom所需的误报率(err)

但且仅当k=m/n ln(2)时,误报概率最小。

Hbase中Bloom filter的设置是在创建列族时通过setBloomFilterType方法设定,Hbase支持ROW、ROWCOL、ROWPREFIX_FIXED_LENGTH三种类型的Bloom filter,创建列族时默认设置为ROW,对所有插入数据的rowkey写入到Bloom filter中。**

HBase中布隆过滤器的实现

Bloom filter接口实现

Hbase的布隆过滤器由父接口BloomFilterBase类定义,包含2个子继承接口BloomFilter和BloomFilterWriter。
BloomFilter负责读取、判断,BloomFilterWriter负责将数据写入布隆过滤器。

read类

BloomFilter负责数据的读取判断其中定义了三个方法

1
2
3
4
5
6
//检查所定义的keyCell是否包含
boolean contains(Cell keyCell, ByteBuff bloom, BloomType type);
boolean contains(byte[] buf, int offset, int length, ByteBuff bloom);

//是否允许Bloom filter自动load数据,默认实现为true
boolean supportsAutoLoading();


BloomFilter最终的实现类是CompoundBloomFilter类,CompoundBloomFilter的核心方法是contains方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean contains(Cell keyCell, ByteBuff bloom, BloomType type) {

//如果根索引不包含keyCell,返回false,根索引在Hfile创建时构建,不是对所有rowkey
int block = index.rootBlockContainingKey(keyCell);
if (block < 0) {
return false; // This key is not in the file.
}
boolean result;

//获得Bloom的Block
HFileBlock bloomBlock = getBloomBlock(block);
try {
ByteBuff bloomBuf = bloomBlock.getBufferReadOnly();

//通过BloomFilterUtil的contains方法判断

result = BloomFilterUtil.contains(keyCell, bloomBuf, bloomBlock.headerSize(),
bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount, type);
} finally {
// After the use return back the block if it was served from a cache.
reader.returnBlock(bloomBlock);
}
if (numPositivesPerChunk != null && result) {
// Update statistics. Only used in unit tests.
++numPositivesPerChunk[block];
}
return result;
}


BloomFilterUtil.contains方法中,通过不同的 BloomType,构建不同的BloomHashKey,然后读取bloomBuf中的bitvals,计算cell
对应类型的HashKey,判断在bitvals中是否为1.

1
2
3
4
5
6
7
8
public static boolean contains(Cell cell, ByteBuff bloomBuf, int bloomOffset, int bloomSize,
Hash hash, int hashCount, BloomType type) {
HashKey<Cell> hashKey = type == BloomType.ROWCOL ? new RowColBloomHashKey(cell)
: new RowBloomHashKey(cell);

//最终的判断方法,实现还是比较简单
return contains(bloomBuf, bloomOffset, bloomSize, hash, hashCount, hashKey);
}

write类

BloomFilterWriter类定义了一些写入方法

1
2
3
4
5
6
7
8
9
10
11
12
 
//在数据写入磁盘之前,压缩Bloom filter
void compactBloom();

//获得一个meta data 的Writer,写入Bloom TYpe 、数据大小等
Writable getMetaWriter();

//获取一个 Bloom bits 的Writer
Writable getDataWriter();

// previous cell written
Cell getPrevCell();


BloomFilterWriter的最终实现类为CompoundBloomFilterWriter。子类CompoundBloomFilterWriter的核心方法是append方法,负责将添加的数据写入到BloomFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void append(Cell cell) throws IOException {
if (cell == null)
throw new NullPointerException();

enqueueReadyChunk(false);

if (chunk == null) {
if (firstKeyInChunk != null) {
throw new IllegalStateException("First key in chunk already set: "
+ Bytes.toStringBinary(firstKeyInChunk));
}

//第一添加时需要allocateNewChunk,Chunk动态添加,完成hash等
// This will be done only once per chunk
if (bloomType == BloomType.ROWCOL) {
firstKeyInChunk =
PrivateCellUtil
.getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell));
} else {
firstKeyInChunk = CellUtil.copyRow(cell);
}
allocateNewChunk();
}

//Chunk 实现具体的hash计算,和bit置位
chunk.add(cell);
this.prevCell = cell;
++totalKeyCount;
}


BloomFilterChunk继承自BloomFilterBase,实现了BloomFilter的读写,其中add方法实现写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  public void add(Cell cell) {
/*
* For faster hashing, use combinatorial generation
* http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf
*/
int hash1;
int hash2;
HashKey<Cell> hashKey;

// 计算2次hash 写入位

if (this.bloomType == BloomType.ROWCOL) {
hashKey = new RowColBloomHashKey(cell);
hash1 = this.hash.hash(hashKey, 0);
hash2 = this.hash.hash(hashKey, hash1);
} else {
hashKey = new RowBloomHashKey(cell);
hash1 = this.hash.hash(hashKey, 0);
hash2 = this.hash.hash(hashKey, hash1);
}
setHashLoc(hash1, hash2);
}


get方法完成数据查询,和之前BloomFilterUtil.contains方法一致

1
2
3
4
5
6
7
8
9
10
static boolean get(int pos, ByteBuffer bloomBuf, int bloomOffset) {

//实现位查找
int bytePos = pos >> 3; //pos / 8
int bitPos = pos & 0x7; //pos % 8
// TODO access this via Util API which can do Unsafe access if possible(?)
byte curByte = bloomBuf.get(bloomOffset + bytePos);
curByte &= BloomFilterUtil.bitvals[bitPos];
return (curByte != 0);
}

布隆过滤器的使用

对于之前创建的布隆过滤器的使用,hbase中体现在两个地方,一个是构建scannner时,判断scanner的是否包含所需要的数据列或者列族


在构建StoreFileScanner时,会通过shouldUseScanner方法判断,时都用到当前Scanner,其中用到了reader.passesBloomFilter的方法。

1
2
3
4
5
6
7
8
9
10
11
12
public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) {
// if the file has no entries, no need to validate or create a scanner.
byte[] cf = store.getColumnFamilyDescriptor().getName();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf);
if (timeRange == null) {
timeRange = scan.getTimeRange();
}

//从时间范围、startkey和endkey范围、bloomfilter判断
return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader
.passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf));
}


StoreFileReader在创建StoreFileScanner的时候创建,主要用来读取hfile文件。 passesBloomFilter方法当前Hfile的bloomFilter的类型,构建具体的bloomFilter。bloomFilter的类型是创建表时,列族中定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
byte[] row = scan.getStartRow();
switch (this.bloomFilterType) {
case ROW:
if (!scan.isGetScan()) {
return true;
}
return passesGeneralRowBloomFilter(row, 0, row.length);

case ROWCOL:
if (!scan.isGetScan()) {
return true;
}
if (columns != null && columns.size() == 1) {
byte[] column = columns.first();
// create the required fake key
Cell kvKey = PrivateCellUtil.createFirstOnRow(row, HConstants.EMPTY_BYTE_ARRAY, column);
return passesGeneralRowColBloomFilter(kvKey);
}

// For multi-column queries the Bloom filter is checked from the
// seekExact operation.
return true;
case ROWPREFIX_FIXED_LENGTH:
return passesGeneralRowPrefixBloomFilter(scan);
default:
return true;
}
}

passesGeneralRowBloomFilter方法中this.generalBloomFilter是创建reader时构建的BloomFilter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) {
BloomFilter bloomFilter = this.generalBloomFilter;
if (bloomFilter == null) {
return true;
}

// Used in ROW bloom
byte[] key = null;
if (rowOffset != 0 || rowLen != row.length) {
throw new AssertionError(
"For row-only Bloom filters the row must occupy the whole array");
}
key = row;

//判断row是否在本hfile中
return checkGeneralBloomFilter(key, null, bloomFilter);
}

checkGeneralBloomFilter方法中调用contains完成最终的判断。