Skip to content

Commit

Permalink
HBASE-28596 Optimise BucketCache usage upon regions splits/merges. (#…
Browse files Browse the repository at this point in the history
…5906)

Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
Reviewed-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
wchevreuil committed Jun 19, 2024
1 parent 885ae0c commit 05e09b9
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class HalfStoreFileReader extends StoreFileReader {

private boolean firstKeySeeked = false;

private AtomicBoolean closed = new AtomicBoolean(false);

/**
* Creates a half file reader for a hfile referred to by an hfilelink.
* @param context Reader context info
Expand Down Expand Up @@ -335,4 +339,42 @@ public long getFilterEntries() {
// Estimate the number of entries as half the original file; this may be wildly inaccurate.
return super.getFilterEntries() / 2;
}

/**
* Overrides close method to handle cache evictions for the referred file. If evictionOnClose is
* true, we will seek to the block containing the splitCell and evict all blocks from offset 0 up
* to that block offset if this is a bottom half reader, or the from the split block offset up to
* the end of the file if this is a top half reader.
* @param evictOnClose true if it should evict the file blocks from the cache.
*/
@Override
public void close(boolean evictOnClose) throws IOException {
if (closed.compareAndSet(false, true)) {
if (evictOnClose) {
final HFileReaderImpl.HFileScannerImpl s =
(HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, false);
final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName();
final String referred = StoreFileInfo.getReferredToRegionAndFile(reference).getSecond();
s.seekTo(splitCell);
if (s.getCurBlock() != null) {
long offset = s.getCurBlock().getOffset();
LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}",
this, reference, top, offset);
((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> {
int numEvictedReferred = top
? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE)
: cache.evictBlocksRangeByHfileName(referred, 0, offset);
int numEvictedReference = cache.evictBlocksByHfileName(reference);
LOG.trace(
"Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};"
+ "evicted for reference: {}",
reference, referred, top, numEvictedReferred, numEvictedReference);
});
}
reader.close(false);
} else {
reader.close(evictOnClose);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,15 @@ default Optional<Map<String, Pair<String, Long>>> getFullyCachedFiles() {
default Optional<Map<String, Long>> getRegionCachedInfo() {
return Optional.empty();
}

/**
* Evict all blocks for the given file name between the passed offset values.
* @param hfileName The file for which blocks should be evicted.
* @param initOffset the initial offset for the range of blocks to be evicted.
* @param endOffset the end offset for the range of blocks to be evicted.
* @return number of blocks evicted.
*/
default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.NavigableMap;
Expand All @@ -25,7 +27,9 @@
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -244,6 +248,44 @@ public static int getMaxCachedBlocksByFile(Configuration conf) {
return conf == null ? DEFAULT_MAX : conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX);
}

/**
* Similarly to HFileBlock.Writer.getBlockForCaching(), creates a HFileBlock instance without
* checksum for caching. This is needed for when we cache blocks via readers (either prefetch or
* client read), otherwise we may fail equality comparison when checking against same block that
* may already have been cached at write time.
* @param cacheConf the related CacheConfig object.
* @param block the HFileBlock instance to be converted.
* @return the resulting HFileBlock instance without checksum.
*/
public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) {
// Calculate how many bytes we need for checksum on the tail of the block.
int numBytes = cacheConf.shouldCacheCompressed(block.getBlockType().getCategory())
? 0
: (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(),
block.getHFileContext().getBytesPerChecksum());
ByteBuff buff = block.getBufferReadOnly();
HFileBlockBuilder builder = new HFileBlockBuilder();
return builder.withBlockType(block.getBlockType())
.withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
.withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes)
.withHFileContext(cloneContext(block.getHFileContext()))
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
}

public static HFileContext cloneContext(HFileContext context) {
HFileContext newContext = new HFileContextBuilder().withBlockSize(context.getBlocksize())
.withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
.withCompression(context.getCompression())
.withDataBlockEncoding(context.getDataBlockEncoding())
.withHBaseCheckSum(context.isUseHBaseChecksum()).withCompressTags(context.isCompressTags())
.withIncludesMvcc(context.isIncludesMvcc()).withIncludesTags(context.isIncludesTags())
.withColumnFamily(context.getColumnFamily()).withTableName(context.getTableName()).build();
return newContext;
}

/**
* Use one of these to keep a running account of cached blocks by file. Throw it away when done.
* This is different than metrics in that it is stats on current state of a cache. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class CacheConfig implements ConfigurationObserver {
*/
public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose";

public static final String EVICT_BLOCKS_ON_SPLIT_KEY = "hbase.rs.evictblocksonsplit";

/**
* Configuration key to prefetch all blocks of a given file into the block cache when the file is
* opened.
Expand Down Expand Up @@ -113,6 +115,7 @@ public class CacheConfig implements ConfigurationObserver {
public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false;
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_EVICT_ON_SPLIT = true;
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea
}
} else {
if (existInL1) {
LOG.warn("Cache key {} had block type {}, but was found in L1 cache.", cacheKey,
cacheKey.getBlockType());
updateBlockMetrics(block, cacheKey, l1Cache, caching);
} else {
updateBlockMetrics(block, cacheKey, l2Cache, caching);
Expand Down Expand Up @@ -504,4 +502,9 @@ public Optional<Integer> getBlockSize(BlockCacheKey key) {
return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key);
}

@Override
public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) {
return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset)
+ l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ public boolean isUnpacked() {
* when block is returned to the cache.
* @return the offset of this block in the file it was read from
*/
long getOffset() {
public long getOffset() {
if (offset < 0) {
throw new IllegalStateException("HFile block offset not initialized properly");
}
Expand Down Expand Up @@ -1205,16 +1205,7 @@ void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException {
* being wholesome (ECC memory or if file-backed, it does checksumming).
*/
HFileBlock getBlockForCaching(CacheConfig cacheConf) {
HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize())
.withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data
.withCompression(fileContext.getCompression())
.withDataBlockEncoding(fileContext.getDataBlockEncoding())
.withHBaseCheckSum(fileContext.isUseHBaseChecksum())
.withCompressTags(fileContext.isCompressTags())
.withIncludesMvcc(fileContext.isIncludesMvcc())
.withIncludesTags(fileContext.isIncludesTags())
.withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName())
.build();
HFileContext newContext = BlockCacheUtil.cloneContext(fileContext);
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
ByteBuff buff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
});

// Prefetch file blocks upon open if requested
if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && shouldCache.booleanValue()) {
if (cacheConf.shouldPrefetchOnOpen() && shouldCache.booleanValue()) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;

import io.opentelemetry.api.common.Attributes;
Expand All @@ -42,14 +41,12 @@
import org.apache.hadoop.hbase.SizeCachedKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
Expand Down Expand Up @@ -159,6 +156,10 @@ public BlockIndexNotLoadedException(Path path) {
}
}

public CacheConfig getCacheConf() {
return cacheConf;
}

private Optional<String> toStringFirstKey() {
return getFirstKey().map(CellUtil::getCellKeyAsString);
}
Expand Down Expand Up @@ -307,7 +308,7 @@ public NotSeekedException(Path path) {
}
}

protected static class HFileScannerImpl implements HFileScanner {
public static class HFileScannerImpl implements HFileScanner {
private ByteBuff blockBuffer;
protected final boolean cacheBlocks;
protected final boolean pread;
Expand All @@ -331,6 +332,7 @@ protected static class HFileScannerImpl implements HFileScanner {
* loaded yet.
*/
protected Cell nextIndexedKey;

// Current block being used. NOTICE: DON't release curBlock separately except in shipped() or
// close() methods. Because the shipped() or close() will do the release finally, even if any
// exception occur the curBlock will be released by the close() method (see
Expand All @@ -340,6 +342,11 @@ protected static class HFileScannerImpl implements HFileScanner {
// Whether we returned a result for curBlock's size in recordBlockSize().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;

public HFileBlock getCurBlock() {
return curBlock;
}

// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

Expand Down Expand Up @@ -1283,8 +1290,6 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
new BlockCacheKey(path, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType);
Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString());

boolean cacheable = cacheBlock && cacheIfCompactionsOff();

boolean useLock = false;
IdLock.Entry lockEntry = null;
final Span span = Span.current();
Expand Down Expand Up @@ -1326,7 +1331,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
return cachedBlock;
}

if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
// check cache again with lock
useLock = true;
continue;
Expand All @@ -1337,7 +1342,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
span.addEvent("block cache miss", attributes);
// Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType, cacheable));
!isCompaction, shouldUseHeap(expectedBlockType, cacheBlock));
try {
validateBlockType(hfileBlock, expectedBlockType);
} catch (IOException e) {
Expand All @@ -1350,25 +1355,30 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo

// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
HFileBlock blockNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock);
cacheConf.getBlockCache().ifPresent(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly);
}
});

if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.DATABLOCK_READ_COUNT.increment();
}
return hfileBlock;
return blockNoChecksum;
}
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cache.cacheBlock(cacheKey,
cacheCompressed
? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock)
: unpackedNoChecksum,
cacheConf.isInMemory(), cacheOnly);
}
});
Expand All @@ -1380,7 +1390,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFile.DATABLOCK_READ_COUNT.increment();
}

return unpacked;
return unpackedNoChecksum;
}
} finally {
if (lockEntry != null) {
Expand Down Expand Up @@ -1691,9 +1701,4 @@ public int getMajorVersion() {
public void unbufferStream() {
fsBlockReader.unbufferStream();
}

protected boolean cacheIfCompactionsOff() {
return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
|| !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
}
}
Loading

0 comments on commit 05e09b9

Please sign in to comment.