Bloom filter最优的大小计算 Bloom过滤器对插入到其中的元素的数量非常敏感。对于HBase来说,条目的数量取决于存储在列中的数据的大小。当前默认区域大小为256MB,因此条目计数~=256MB/(列的平均值大小)。尽管有这个经验法则,但是由于压缩,我们并没有有效的方法来计算压缩后的条目计数。因此,通常使用动态bloom过滤器来添加额外的空间,而不是允许错误率增长。 Bloom filter最优的大小计算公示为:bloom size m = -(n * ln(err) / (ln(2)^2) ~= n * ln(err) / ln(0.6185)
m表示Bloom filter中的位数(bitSize) n表示插入bloomfilter中的元素数(maxKeys) k表示使用的哈希函数数(nbHash) e表示Bloom所需的误报率(err) 但且仅当k=m/n ln(2)时,误报概率最小。 Hbase中Bloom filter的设置是在创建列族时通过setBloomFilterType方法设定,Hbase支持ROW、ROWCOL、ROWPREFIX_FIXED_LENGTH三种类型的Bloom filter,创建列族时默认设置为ROW,对所有插入数据的rowkey写入到Bloom filter中。**
HBase中布隆过滤器的实现
Bloom filter接口实现 Hbase的布隆过滤器由父接口BloomFilterBase类定义,包含2个子继承接口BloomFilter和BloomFilterWriter。 BloomFilter负责读取、判断,BloomFilterWriter负责将数据写入布隆过滤器。
read类 BloomFilter负责数据的读取判断其中定义了三个方法
1 2 3 4 5 6 //检查所定义的keyCell是否包含 boolean contains(Cell keyCell, ByteBuff bloom, BloomType type); boolean contains(byte[] buf, int offset, int length, ByteBuff bloom); //是否允许Bloom filter自动load数据,默认实现为true boolean supportsAutoLoading();
BloomFilter最终的实现类是CompoundBloomFilter类,CompoundBloomFilter的核心方法是contains方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public boolean contains(Cell keyCell, ByteBuff bloom, BloomType type) { //如果根索引不包含keyCell,返回false,根索引在Hfile创建时构建,不是对所有rowkey int block = index.rootBlockContainingKey(keyCell); if (block < 0) { return false; // This key is not in the file. } boolean result; //获得Bloom的Block HFileBlock bloomBlock = getBloomBlock(block); try { ByteBuff bloomBuf = bloomBlock.getBufferReadOnly(); //通过BloomFilterUtil的contains方法判断 result = BloomFilterUtil.contains(keyCell, bloomBuf, bloomBlock.headerSize(), bloomBlock.getUncompressedSizeWithoutHeader(), hash, hashCount, type); } finally { // After the use return back the block if it was served from a cache. reader.returnBlock(bloomBlock); } if (numPositivesPerChunk != null && result) { // Update statistics. Only used in unit tests. ++numPositivesPerChunk[block]; } return result; }
BloomFilterUtil.contains方法中,通过不同的 BloomType,构建不同的BloomHashKey,然后读取bloomBuf中的bitvals,计算cell 对应类型的HashKey,判断在bitvals中是否为1.
1 2 3 4 5 6 7 8 public static boolean contains(Cell cell, ByteBuff bloomBuf, int bloomOffset, int bloomSize, Hash hash, int hashCount, BloomType type) { HashKey<Cell> hashKey = type == BloomType.ROWCOL ? new RowColBloomHashKey(cell) : new RowBloomHashKey(cell); //最终的判断方法,实现还是比较简单 return contains(bloomBuf, bloomOffset, bloomSize, hash, hashCount, hashKey); }
write类 BloomFilterWriter类定义了一些写入方法
1 2 3 4 5 6 7 8 9 10 11 12 //在数据写入磁盘之前,压缩Bloom filter void compactBloom(); //获得一个meta data 的Writer,写入Bloom TYpe 、数据大小等 Writable getMetaWriter(); //获取一个 Bloom bits 的Writer Writable getDataWriter(); // previous cell written Cell getPrevCell();
BloomFilterWriter的最终实现类为CompoundBloomFilterWriter。子类CompoundBloomFilterWriter的核心方法是append方法,负责将添加的数据写入到BloomFilter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public void append(Cell cell) throws IOException { if (cell == null) throw new NullPointerException(); enqueueReadyChunk(false); if (chunk == null) { if (firstKeyInChunk != null) { throw new IllegalStateException("First key in chunk already set: " + Bytes.toStringBinary(firstKeyInChunk)); } //第一添加时需要allocateNewChunk,Chunk动态添加,完成hash等 // This will be done only once per chunk if (bloomType == BloomType.ROWCOL) { firstKeyInChunk = PrivateCellUtil .getCellKeySerializedAsKeyValueKey(PrivateCellUtil.createFirstOnRowCol(cell)); } else { firstKeyInChunk = CellUtil.copyRow(cell); } allocateNewChunk(); } //Chunk 实现具体的hash计算,和bit置位 chunk.add(cell); this.prevCell = cell; ++totalKeyCount; }
BloomFilterChunk继承自BloomFilterBase,实现了BloomFilter的读写,其中add方法实现写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void add(Cell cell) { /* * For faster hashing, use combinatorial generation * http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf */ int hash1; int hash2; HashKey<Cell> hashKey; // 计算2次hash 写入位 if (this.bloomType == BloomType.ROWCOL) { hashKey = new RowColBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } else { hashKey = new RowBloomHashKey(cell); hash1 = this.hash.hash(hashKey, 0); hash2 = this.hash.hash(hashKey, hash1); } setHashLoc(hash1, hash2); }
get方法完成数据查询,和之前BloomFilterUtil.contains方法一致
1 2 3 4 5 6 7 8 9 10 static boolean get(int pos, ByteBuffer bloomBuf, int bloomOffset) { //实现位查找 int bytePos = pos >> 3; //pos / 8 int bitPos = pos & 0x7; //pos % 8 // TODO access this via Util API which can do Unsafe access if possible(?) byte curByte = bloomBuf.get(bloomOffset + bytePos); curByte &= BloomFilterUtil.bitvals[bitPos]; return (curByte != 0); }
布隆过滤器的使用 对于之前创建的布隆过滤器的使用,hbase中体现在两个地方,一个是构建scannner时,判断scanner的是否包含所需要的数据列或者列族 在构建StoreFileScanner时,会通过shouldUseScanner方法判断,时都用到当前Scanner,其中用到了reader.passesBloomFilter的方法。
1 2 3 4 5 6 7 8 9 10 11 12 public boolean shouldUseScanner(Scan scan, HStore store, long oldestUnexpiredTS) { // if the file has no entries, no need to validate or create a scanner. byte[] cf = store.getColumnFamilyDescriptor().getName(); TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); if (timeRange == null) { timeRange = scan.getTimeRange(); } //从时间范围、startkey和endkey范围、bloomfilter判断 return reader.passesTimerangeFilter(timeRange, oldestUnexpiredTS) && reader .passesKeyRangeFilter(scan) && reader.passesBloomFilter(scan, scan.getFamilyMap().get(cf)); }
StoreFileReader在创建StoreFileScanner的时候创建,主要用来读取hfile文件。 passesBloomFilter方法当前Hfile的bloomFilter的类型,构建具体的bloomFilter。bloomFilter的类型是创建表时,列族中定义的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) { byte[] row = scan.getStartRow(); switch (this.bloomFilterType) { case ROW: if (!scan.isGetScan()) { return true; } return passesGeneralRowBloomFilter(row, 0, row.length); case ROWCOL: if (!scan.isGetScan()) { return true; } if (columns != null && columns.size() == 1) { byte[] column = columns.first(); // create the required fake key Cell kvKey = PrivateCellUtil.createFirstOnRow(row, HConstants.EMPTY_BYTE_ARRAY, column); return passesGeneralRowColBloomFilter(kvKey); } // For multi-column queries the Bloom filter is checked from the // seekExact operation. return true; case ROWPREFIX_FIXED_LENGTH: return passesGeneralRowPrefixBloomFilter(scan); default: return true; } }
passesGeneralRowBloomFilter方法中this.generalBloomFilter是创建reader时构建的BloomFilter。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private boolean passesGeneralRowBloomFilter(byte[] row, int rowOffset, int rowLen) { BloomFilter bloomFilter = this.generalBloomFilter; if (bloomFilter == null) { return true; } // Used in ROW bloom byte[] key = null; if (rowOffset != 0 || rowLen != row.length) { throw new AssertionError( "For row-only Bloom filters the row must occupy the whole array"); } key = row; //判断row是否在本hfile中 return checkGeneralBloomFilter(key, null, bloomFilter); }
checkGeneralBloomFilter方法中调用contains完成最终的判断。