From 3d7152990e33123957e1ec8c06148edfd4370f65 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 14 Feb 2024 15:35:36 +0000 Subject: [PATCH 01/13] HBASE-28596 Optimise BucketCache usage upon regions splits/merges. --- .../hadoop/hbase/io/HalfStoreFileReader.java | 32 +++++ .../hadoop/hbase/io/hfile/BlockCache.java | 5 + .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 34 +++++ .../hadoop/hbase/io/hfile/CacheConfig.java | 3 + .../hbase/io/hfile/CombinedBlockCache.java | 7 +- .../hadoop/hbase/io/hfile/HFileBlock.java | 2 +- .../hbase/io/hfile/HFilePreadReader.java | 2 +- .../hbase/io/hfile/HFileReaderImpl.java | 28 ++-- .../hbase/io/hfile/bucket/BucketCache.java | 79 +++++++---- .../TransitRegionStateProcedure.java | 10 +- .../hbase/regionserver/StoreFileReader.java | 2 +- .../handler/UnassignRegionHandler.java | 5 +- .../hadoop/hbase/TestSplitWithCache.java | 129 ++++++++++++++++++ .../hadoop/hbase/io/hfile/TestPrefetch.java | 8 -- .../io/hfile/TestPrefetchWithBucketCache.java | 70 +++++++++- 15 files changed, 353 insertions(+), 63 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 2119a3e7cbef..f075050d2f1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntConsumer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileInfo; +import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -64,6 +66,8 @@ public class HalfStoreFileReader extends StoreFileReader { private boolean firstKeySeeked = false; + private AtomicBoolean closed = new AtomicBoolean(false); + /** * Creates a half file reader for a hfile referred to by an hfilelink. * @param context Reader context info @@ -349,4 +353,32 @@ public long getFilterEntries() { // Estimate the number of entries as half the original file; this may be wildly inaccurate. return super.getFilterEntries() / 2; } + @Override + public void close(boolean evictOnClose) throws IOException { + if(closed.compareAndSet(false, true)) { + if (evictOnClose) { + final HFileReaderImpl.HFileScannerImpl s = + (HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, false); + final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName(); + final String referred = StoreFileInfo.getReferredToRegionAndFile(reference).getSecond(); + s.seekTo(splitCell); + long offset = s.getCurBlock().getOffset(); + LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}", + this, reference, top, offset); + ((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> { + int numEvictedReferred = top ? + cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE) : + cache.evictBlocksRangeByHfileName(referred, 0, offset); + int numEvictedReference = cache.evictBlocksByHfileName(reference); + LOG.trace( + "Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};" + + "evicted for reference: {}", reference, referred, top, numEvictedReferred, + numEvictedReference); + }); + reader.close(false); + } else { + reader.close(evictOnClose); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index bed0194b1fab..0b75861753e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.function.Predicate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -250,4 +251,8 @@ default Optional>> getFullyCachedFiles() { default Optional> getRegionCachedInfo() { return Optional.empty(); } + + default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { + return 0; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index e6a4b609bc7d..b90c4930ff76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -25,7 +25,9 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.GsonUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -35,6 +37,7 @@ import org.apache.hbase.thirdparty.com.google.gson.TypeAdapter; import org.apache.hbase.thirdparty.com.google.gson.stream.JsonReader; import org.apache.hbase.thirdparty.com.google.gson.stream.JsonWriter; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER; /** * Utilty for aggregating counts in CachedBlocks and toString/toJSON CachedBlocks and BlockCaches. @@ -238,12 +241,43 @@ public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, } } + + private static final int DEFAULT_MAX = 1000000; public static int getMaxCachedBlocksByFile(Configuration conf) { return conf == null ? DEFAULT_MAX : conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX); } + public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { + HFileContext newContext = new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize()) + .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data + .withCompression(block.getHFileContext().getCompression()) + .withDataBlockEncoding(block.getHFileContext().getDataBlockEncoding()) + .withHBaseCheckSum(block.getHFileContext().isUseHBaseChecksum()) + .withCompressTags(block.getHFileContext().isCompressTags()) + .withIncludesMvcc(block.getHFileContext().isIncludesMvcc()) + .withIncludesTags(block.getHFileContext().isIncludesTags()) + .withColumnFamily(block.getHFileContext().getColumnFamily()).withTableName(block.getHFileContext().getTableName()) + .build(); + // Build the HFileBlock. + HFileBlockBuilder builder = new HFileBlockBuilder(); + ByteBuff buff = block.getBufferReadOnly(); + // Calculate how many bytes we need for checksum on the tail of the block. + int numBytes = (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(), + block.getHFileContext().getBytesPerChecksum()); + return builder.withBlockType(block.getBlockType()) + .withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader()) + .withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader()) + .withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff) + .withFillHeader(FILL_HEADER) + .withOffset(block.getOffset()).withNextBlockOnDiskSize(-1) + .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes) + .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) + .withShared(!buff.hasArray()).build(); + } + + /** * Use one of these to keep a running account of cached blocks by file. Throw it away when done. * This is different than metrics in that it is stats on current state of a cache. See diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 7fb1f1ec85bd..78f62bfc77ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -72,6 +72,8 @@ public class CacheConfig implements ConfigurationObserver { */ public static final String EVICT_BLOCKS_ON_CLOSE_KEY = "hbase.rs.evictblocksonclose"; + public static final String EVICT_BLOCKS_ON_SPLIT_KEY = "hbase.rs.evictblocksonsplit"; + /** * Configuration key to prefetch all blocks of a given file into the block cache when the file is * opened. @@ -113,6 +115,7 @@ public class CacheConfig implements ConfigurationObserver { public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; + public static final boolean DEFAULT_EVICT_ON_SPLIT = true; public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index d6692d2e2bf1..3b90037ad67a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -109,8 +109,6 @@ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repea } } else { if (existInL1) { - LOG.warn("Cache key {} had block type {}, but was found in L1 cache.", cacheKey, - cacheKey.getBlockType()); updateBlockMetrics(block, cacheKey, l1Cache, caching); } else { updateBlockMetrics(block, cacheKey, l2Cache, caching); @@ -504,4 +502,9 @@ public Optional getBlockSize(BlockCacheKey key) { return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key); } + @Override + public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { + return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset) + + l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 47c20b691b4a..08a45dde23ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -697,7 +697,7 @@ public boolean isUnpacked() { * when block is returned to the cache. * @return the offset of this block in the file it was read from */ - long getOffset() { + public long getOffset() { if (offset < 0) { throw new IllegalStateException("HFile block offset not initialized properly"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java index 6063ffe68891..e6b79cc55cca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java @@ -45,7 +45,7 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c }); // Prefetch file blocks upon open if requested - if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && shouldCache.booleanValue()) { + if (cacheConf.shouldPrefetchOnOpen() && shouldCache.booleanValue()) { PrefetchExecutor.request(path, new Runnable() { @Override public void run() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 9c9b38c4906b..2fd9ebd83285 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -159,6 +159,10 @@ public BlockIndexNotLoadedException(Path path) { } } + public CacheConfig getCacheConf() { + return cacheConf; + } + private Optional toStringFirstKey() { return getFirstKey().map(CellUtil::getCellKeyAsString); } @@ -307,7 +311,7 @@ public NotSeekedException(Path path) { } } - protected static class HFileScannerImpl implements HFileScanner { + public static class HFileScannerImpl implements HFileScanner { private ByteBuff blockBuffer; protected final boolean cacheBlocks; protected final boolean pread; @@ -331,6 +335,7 @@ protected static class HFileScannerImpl implements HFileScanner { * loaded yet. */ protected Cell nextIndexedKey; + // Current block being used. NOTICE: DON't release curBlock separately except in shipped() or // close() methods. Because the shipped() or close() will do the release finally, even if any // exception occur the curBlock will be released by the close() method (see @@ -340,6 +345,10 @@ protected static class HFileScannerImpl implements HFileScanner { // Whether we returned a result for curBlock's size in recordBlockSize(). // gets reset whenever curBlock is changed. private boolean providedCurrentBlockSize = false; + + public HFileBlock getCurBlock() { + return curBlock; + } // Previous blocks that were used in the course of the read protected final ArrayList prevBlocks = new ArrayList<>(); @@ -1293,8 +1302,6 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo new BlockCacheKey(path, dataBlockOffset, this.isPrimaryReplicaReader(), expectedBlockType); Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, cacheKey.toString()); - boolean cacheable = cacheBlock && cacheIfCompactionsOff(); - boolean useLock = false; IdLock.Entry lockEntry = null; final Span span = Span.current(); @@ -1336,7 +1343,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo return cachedBlock; } - if (!useLock && cacheable && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) { + if (!useLock && cacheBlock && cacheConf.shouldLockOnCacheMiss(expectedBlockType)) { // check cache again with lock useLock = true; continue; @@ -1347,7 +1354,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo span.addEvent("block cache miss", attributes); // Load block from filesystem. HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, - !isCompaction, shouldUseHeap(expectedBlockType, cacheable)); + !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock)); try { validateBlockType(hfileBlock, expectedBlockType); } catch (IOException e) { @@ -1363,7 +1370,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); // Cache the block if necessary - if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) { + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); } }); @@ -1376,9 +1383,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { - if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) { + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, + cache.cacheBlock(cacheKey, BlockCacheUtil.getBlockForCaching(cacheConf, cacheCompressed ? hfileBlock : unpacked), cacheConf.isInMemory(), cacheOnly); } }); @@ -1712,9 +1719,4 @@ public int getMajorVersion() { public void unbufferStream() { fsBlockReader.unbufferStream(); } - - protected boolean cacheIfCompactionsOff() { - return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name)) - || !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 71bfc757e51e..c86d0edfa6f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -51,6 +51,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Predicate; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.RefCnt; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -215,6 +217,8 @@ public class BucketCache implements BlockCache, HeapSize { // reset after a successful read/write. private volatile long ioErrorStartTime = -1; + private Configuration conf; + /** * A ReentrantReadWriteLock to lock on a particular block identified by offset. The purpose of * this is to avoid freeing the block which is being read. @@ -291,6 +295,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck } else { this.offsetLock = new IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT); } + this.conf = conf; this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM); this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); this.writerThreads = new WriterThread[writerThreadNum]; @@ -560,6 +565,20 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach } } + public BucketEntry getBlockForReference(BlockCacheKey key) { + BucketEntry foundEntry = null; + String referredFileName = null; + if (StoreFileInfo.isReference(key.getHfileName())) { + referredFileName = StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond(); + } + if (referredFileName != null) { + BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); + foundEntry = backingMap.get(convertedCacheKey); + LOG.info("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", + key.getHfileName(), convertedCacheKey, foundEntry); + } + return foundEntry; + } /** * Get the buffer of the block with the specified key. * @param key block's cache key @@ -583,6 +602,9 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, return re.getData(); } BucketEntry bucketEntry = backingMap.get(key); + if (bucketEntry == null) { + bucketEntry = getBlockForReference(key); + } if (bucketEntry != null) { long start = System.nanoTime(); ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset()); @@ -591,7 +613,7 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, // We can not read here even if backingMap does contain the given key because its offset // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. - if (bucketEntry.equals(backingMap.get(key))) { + if (bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key))) { // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to // the same BucketEntry, then all of the three will share the same refCnt. @@ -1658,8 +1680,13 @@ protected String getAlgorithm() { */ @Override public int evictBlocksByHfileName(String hfileName) { + return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); + } + @Override + public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { fileNotFullyCached(hfileName); - Set keySet = getAllCacheKeysForFile(hfileName); + Set keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); + LOG.info("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), hfileName, initOffset, endOffset); int numEvicted = 0; for (BlockCacheKey key : keySet) { if (evictBlock(key)) { @@ -1668,10 +1695,9 @@ public int evictBlocksByHfileName(String hfileName) { } return numEvicted; } - - private Set getAllCacheKeysForFile(String hfileName) { - return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), true, - new BlockCacheKey(hfileName, Long.MAX_VALUE), true); + private Set getAllCacheKeysForFile(String hfileName, long init, long end) { + return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, + new BlockCacheKey(hfileName, end), true); } /** @@ -2081,25 +2107,20 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d try { final MutableInt count = new MutableInt(); LOG.debug("iterating over {} entries in the backing map", backingMap.size()); - backingMap.entrySet().stream().forEach(entry -> { - if ( - entry.getKey().getHfileName().equals(fileName.getName()) - && entry.getKey().getBlockType().equals(BlockType.DATA) - ) { - long offsetToLock = entry.getValue().offset(); - LOG.debug("found block {} in the backing map. Acquiring read lock for offset {}", - entry.getKey(), offsetToLock); - ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock); - lock.readLock().lock(); - locks.add(lock); - // rechecks the given key is still there (no eviction happened before the lock acquired) - if (backingMap.containsKey(entry.getKey())) { - count.increment(); - } else { - lock.readLock().unlock(); - locks.remove(lock); - LOG.debug("found block {}, but when locked and tried to count, it was gone."); - } + Set result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); + if(result.isEmpty() && StoreFileInfo.isReference(fileName)) { + result = getAllCacheKeysForFile( + StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), + 0, Long.MAX_VALUE); + } + result.stream().forEach(entry -> { + LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", + fileName.getName(), entry.getOffset()); + ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); + lock.readLock().lock(); + locks.add(lock); + if (backingMap.containsKey(entry)) { + count.increment(); } }); int metaCount = totalBlockCount - dataBlockCount; @@ -2122,7 +2143,7 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d Thread.sleep(100); notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); } else - if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) == dataBlockCount) { + if ((getAllCacheKeysForFile(fileName.getName(),0, Long.MAX_VALUE).size() - metaCount) == dataBlockCount) { LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " + "the cache write queue but we now found that total cached blocks for file {} " + "is equal to data block count.", count, dataBlockCount, fileName.getName()); @@ -2157,14 +2178,16 @@ public Optional shouldCacheFile(String fileName) { @Override public Optional isAlreadyCached(BlockCacheKey key) { - return Optional.of(getBackingMap().containsKey(key)); + boolean foundKey = backingMap.containsKey(key); + return Optional.of(foundKey ? true : getBlockForReference(key) != null); } @Override public Optional getBlockSize(BlockCacheKey key) { BucketEntry entry = backingMap.get(key); if (entry == null) { - return Optional.empty(); + entry = getBlockForReference(key); + return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); } else { return Optional.of(entry.getOnDiskSizeWithHeader()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 8dfc08a5de89..3306d1d34d4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.master.assignment; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_SPLIT; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY; import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME; import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT; @@ -369,10 +371,10 @@ private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) } } - private void closeRegionAfterUpdatingMeta(RegionStateNode regionNode) { + private void closeRegionAfterUpdatingMeta(MasterProcedureEnv env, RegionStateNode regionNode) { CloseRegionProcedure closeProc = isSplit ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, - true) + env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, DEFAULT_EVICT_ON_SPLIT)) : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, evictCache); addChildProcedure(closeProc); @@ -383,7 +385,7 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) throws IOException, ProcedureSuspendedException { if ( ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, - () -> closeRegionAfterUpdatingMeta(regionNode)) + () -> closeRegionAfterUpdatingMeta(env, regionNode)) ) { return; } @@ -391,7 +393,7 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) // this is the normal case ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, env.getAssignmentManager().regionClosing(regionNode), env, - () -> closeRegionAfterUpdatingMeta(regionNode)); + () -> closeRegionAfterUpdatingMeta(env, regionNode)); } else { forceNewPlan = true; regionNode.setRegionLocation(null); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index 09c379227bda..e241bf0a5d34 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -68,7 +68,7 @@ public class StoreFileReader { protected BloomFilter deleteFamilyBloomFilter = null; private BloomFilterMetrics bloomFilterMetrics = null; protected BloomType bloomFilterType; - private final HFile.Reader reader; + protected final HFile.Reader reader; protected long sequenceID = -1; protected TimeRange timeRange = null; private byte[] lastBloomKey; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java index 217f2ebbd45a..2a9fb38dc53c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java @@ -126,9 +126,8 @@ public void process() throws IOException { // This should be true only in the case of splits/merges closing the parent regions, as // there's no point on keep blocks for those region files. As hbase.rs.evictblocksonclose is // false by default we don't bother overriding it if evictCache is false. - if (evictCache) { - region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(true)); - } + region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(evictCache)); + if (region.close(abort) == null) { // XXX: Is this still possible? The old comment says about split, but now split is done at // master side, so... diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java new file mode 100644 index 000000000000..27aef8d56d30 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY; +import static org.junit.Assert.assertTrue; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestSplitWithCache { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWithCache.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestSplitWithCache.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000); + UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + UTIL.getConfiguration().setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true); + UTIL.getConfiguration().setBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, true); + UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); + UTIL.getConfiguration().setInt(BUCKET_CACHE_SIZE_KEY, 200); + } + + @Test + public void testEvictOnSplit() throws Exception { + doTest("testEvictOnSplit", true, + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null), + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)==null)); + } + + @Test + public void testDoesntEvictOnSplit() throws Exception { + doTest("testDoesntEvictOnSplit", false, + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null), + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null)); + } + + private void doTest(String table, boolean evictOnSplit, + BiConsumer>> predicateBeforeSplit, + BiConsumer>> predicateAfterSplit) throws Exception { + UTIL.getConfiguration().setBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, evictOnSplit); + UTIL.startMiniCluster(1); + try { + TableName tableName = TableName.valueOf(table); + byte[] family = Bytes.toBytes("CF"); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName). + setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + UTIL.getAdmin().createTable(td); + UTIL.waitTableAvailable(tableName); + Table tbl = UTIL.getConnection().getTable(tableName); + List puts = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + Put p = new Put(Bytes.toBytes("row-" + i)); + p.addColumn(family, Bytes.toBytes(1), Bytes.toBytes("val-" + i)); + puts.add(p); + } + tbl.put(puts); + UTIL.getAdmin().flush(tableName); + Collection files = UTIL.getMiniHBaseCluster(). + getRegions(tableName).get(0).getStores().get(0).getStorefiles(); + checkCacheForBlocks(tableName, files, predicateBeforeSplit); + UTIL.getAdmin().split(tableName, Bytes.toBytes("row-500")); + Waiter.waitFor(UTIL.getConfiguration(), 30000, + () -> UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 2); + UTIL.waitUntilNoRegionsInTransition(); + checkCacheForBlocks(tableName, files, predicateAfterSplit); + } finally { + UTIL.shutdownMiniCluster(); + } + + } + + private void checkCacheForBlocks(TableName tableName, Collection files, + BiConsumer>> checker){ + files.forEach(f -> { + UTIL.getMiniHBaseCluster().getRegionServer(0).getBlockCache().ifPresent(cache -> { + cache.getFullyCachedFiles().ifPresent(m -> { + checker.accept(f.getPath().getName(), m); + }); + assertTrue(cache.getFullyCachedFiles().isPresent()); + }); + }); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java index 6083d872c826..b172202c8d4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java @@ -323,14 +323,6 @@ public void testPrefetchCompressed() throws Exception { conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false); } - @Test - public void testPrefetchSkipsRefs() throws Exception { - testPrefetchWhenRefs(true, c -> { - boolean isCached = c != null; - assertFalse(isCached); - }); - } - @Test public void testPrefetchDoesntSkipRefs() throws Exception { testPrefetchWhenRefs(false, c -> { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index 2d0a85962ef9..be0ae8f75bbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -39,13 +40,20 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -135,6 +143,56 @@ public void testPrefetchDoesntOverwork() throws Exception { assertTrue(snapshot.get(key).getCachedTime() < bc.getBackingMap().get(key).getCachedTime()); } + @Test + public void testPrefetchRefsAfterSplit() throws Exception { + conf.setLong(BUCKET_CACHE_SIZE_KEY, 200); + blockCache = BlockCacheFactory.createBlockCache(conf); + cacheConf = new CacheConfig(conf, blockCache); + + Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); + RegionInfo region = + RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); + Path regionDir = new Path(tableDir, region.getEncodedName()); + Path cfDir = new Path(regionDir, "cf"); + HRegionFileSystem regionFS = + HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region); + Path storeFile = writeStoreFile(100, cfDir); + + // Prefetches the file blocks + LOG.debug("First read should prefetch the blocks."); + readStoreFile(storeFile); + BucketCache bc = BucketCache.getBucketCacheFromCacheConfig(cacheConf).get(); + // Our file should have 6 DATA blocks. We should wait for all of them to be cached + Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); + + //split the file and return references to the original file + Random rand = ThreadLocalRandom.current(); + byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); + HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true); + Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, + new ConstantSizeRegionSplitPolicy()); + HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true); + //starts reader for the ref. The ref should resolve to the original file blocks + // and not duplicate blocks in the cache. + refHsf.initReader(); + HFile.Reader reader = refHsf.getReader().getHFileReader(); + while (!reader.prefetchComplete()) { + // Sleep for a bit + Thread.sleep(1000); + } + //the ref file blocks keys should actually resolve to the referred file blocks, + //so we should not see additional blocks in the cache. + Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); + + BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); + Cacheable result = bc.getBlock(refCacheKey, true, false,true); + assertNotNull(result); + BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); + assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); + assertNull(bc.getBackingMap().get(refCacheKey)); + assertNotNull(bc.getBlockForReference(refCacheKey)); + } + @Test public void testPrefetchInterruptOnCapacity() throws Exception { conf.setLong(BUCKET_CACHE_SIZE_KEY, 1); @@ -270,10 +328,18 @@ private Path writeStoreFile(String fname, int numKVs) throws IOException { return writeStoreFile(fname, meta, numKVs); } + private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException { + HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build(); + return writeStoreFile(meta, numKVs, regionCFDir); + } + private Path writeStoreFile(String fname, HFileContext context, int numKVs) throws IOException { - Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname); + return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); + } + + private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) throws IOException { StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) - .withOutputDir(storeFileParentDir).withFileContext(context).build(); + .withOutputDir(regionCFDir).withFileContext(context).build(); Random rand = ThreadLocalRandom.current(); final int rowLen = 32; for (int i = 0; i < numKVs; ++i) { From 7bf567f1dc465667b539705414466f9880b1445f Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 16 May 2024 11:49:30 -0400 Subject: [PATCH 02/13] UT and spotless fixes Change-Id: I510be0cb8041fead191f965178fd7336b6274352 --- .../hadoop/hbase/io/HalfStoreFileReader.java | 15 ++++--- .../hadoop/hbase/io/hfile/BlockCache.java | 1 - .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 32 +++++++------ .../hbase/io/hfile/CombinedBlockCache.java | 4 +- .../hbase/io/hfile/HFileReaderImpl.java | 7 ++- .../hbase/io/hfile/bucket/BucketCache.java | 45 +++++++++++-------- .../TransitRegionStateProcedure.java | 13 +++--- .../hadoop/hbase/TestSplitWithCache.java | 45 ++++++++++--------- .../io/hfile/TestPrefetchWithBucketCache.java | 16 +++---- 9 files changed, 93 insertions(+), 85 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index f075050d2f1e..76812ffc3922 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -353,10 +353,11 @@ public long getFilterEntries() { // Estimate the number of entries as half the original file; this may be wildly inaccurate. return super.getFilterEntries() / 2; } + @Override public void close(boolean evictOnClose) throws IOException { - if(closed.compareAndSet(false, true)) { - if (evictOnClose) { + if (closed.compareAndSet(false, true)) { + if (evictOnClose && StoreFileInfo.isReference(this.reader.getPath())) { final HFileReaderImpl.HFileScannerImpl s = (HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, false); final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName(); @@ -366,14 +367,14 @@ public void close(boolean evictOnClose) throws IOException { LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}", this, reference, top, offset); ((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> { - int numEvictedReferred = top ? - cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE) : - cache.evictBlocksRangeByHfileName(referred, 0, offset); + int numEvictedReferred = top + ? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE) + : cache.evictBlocksRangeByHfileName(referred, 0, offset); int numEvictedReference = cache.evictBlocksByHfileName(reference); LOG.trace( "Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};" - + "evicted for reference: {}", reference, referred, top, numEvictedReferred, - numEvictedReference); + + "evicted for reference: {}", + reference, referred, top, numEvictedReferred, numEvictedReference); }); reader.close(false); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 0b75861753e8..6fe4e50aeecf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -20,7 +20,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.function.Predicate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index b90c4930ff76..629ccfb73207 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.NavigableMap; @@ -37,7 +39,6 @@ import org.apache.hbase.thirdparty.com.google.gson.TypeAdapter; import org.apache.hbase.thirdparty.com.google.gson.stream.JsonReader; import org.apache.hbase.thirdparty.com.google.gson.stream.JsonWriter; -import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER; /** * Utilty for aggregating counts in CachedBlocks and toString/toJSON CachedBlocks and BlockCaches. @@ -241,25 +242,24 @@ public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache, } } - - private static final int DEFAULT_MAX = 1000000; public static int getMaxCachedBlocksByFile(Configuration conf) { return conf == null ? DEFAULT_MAX : conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX); } - public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { - HFileContext newContext = new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize()) - .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data - .withCompression(block.getHFileContext().getCompression()) - .withDataBlockEncoding(block.getHFileContext().getDataBlockEncoding()) - .withHBaseCheckSum(block.getHFileContext().isUseHBaseChecksum()) - .withCompressTags(block.getHFileContext().isCompressTags()) - .withIncludesMvcc(block.getHFileContext().isIncludesMvcc()) - .withIncludesTags(block.getHFileContext().isIncludesTags()) - .withColumnFamily(block.getHFileContext().getColumnFamily()).withTableName(block.getHFileContext().getTableName()) - .build(); + public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { + HFileContext newContext = + new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize()) + .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data + .withCompression(block.getHFileContext().getCompression()) + .withDataBlockEncoding(block.getHFileContext().getDataBlockEncoding()) + .withHBaseCheckSum(block.getHFileContext().isUseHBaseChecksum()) + .withCompressTags(block.getHFileContext().isCompressTags()) + .withIncludesMvcc(block.getHFileContext().isIncludesMvcc()) + .withIncludesTags(block.getHFileContext().isIncludesTags()) + .withColumnFamily(block.getHFileContext().getColumnFamily()) + .withTableName(block.getHFileContext().getTableName()).build(); // Build the HFileBlock. HFileBlockBuilder builder = new HFileBlockBuilder(); ByteBuff buff = block.getBufferReadOnly(); @@ -270,14 +270,12 @@ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock b .withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader()) .withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader()) .withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff) - .withFillHeader(FILL_HEADER) - .withOffset(block.getOffset()).withNextBlockOnDiskSize(-1) + .withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1) .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes) .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) .withShared(!buff.hasArray()).build(); } - /** * Use one of these to keep a running account of cached blocks by file. Throw it away when done. * This is different than metrics in that it is stats on current state of a cache. See diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java index 3b90037ad67a..06bf2a76f756 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java @@ -504,7 +504,7 @@ public Optional getBlockSize(BlockCacheKey key) { @Override public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { - return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset) + - l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset); + return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset) + + l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 2fd9ebd83285..6a9fc9657fd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY; import io.opentelemetry.api.common.Attributes; @@ -42,14 +41,12 @@ import org.apache.hadoop.hbase.SizeCachedKeyValue; import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue; import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue; -import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.IdLock; @@ -349,6 +346,7 @@ public static class HFileScannerImpl implements HFileScanner { public HFileBlock getCurBlock() { return curBlock; } + // Previous blocks that were used in the course of the read protected final ArrayList prevBlocks = new ArrayList<>(); @@ -1385,7 +1383,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - cache.cacheBlock(cacheKey, BlockCacheUtil.getBlockForCaching(cacheConf, cacheCompressed ? hfileBlock : unpacked), + cache.cacheBlock(cacheKey, + BlockCacheUtil.getBlockForCaching(cacheConf, cacheCompressed ? hfileBlock : unpacked), cacheConf.isInMemory(), cacheOnly); } }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index c86d0edfa6f1..d457cf196b0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -51,7 +51,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -574,11 +573,12 @@ public BucketEntry getBlockForReference(BlockCacheKey key) { if (referredFileName != null) { BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); foundEntry = backingMap.get(convertedCacheKey); - LOG.info("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", - key.getHfileName(), convertedCacheKey, foundEntry); + LOG.info("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), + convertedCacheKey, foundEntry); } return foundEntry; } + /** * Get the buffer of the block with the specified key. * @param key block's cache key @@ -613,7 +613,9 @@ public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, // We can not read here even if backingMap does contain the given key because its offset // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check // existence here. - if (bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key))) { + if ( + bucketEntry.equals(backingMap.get(key)) || bucketEntry.equals(getBlockForReference(key)) + ) { // Read the block from IOEngine based on the bucketEntry's offset and length, NOTICE: the // block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to // the same BucketEntry, then all of the three will share the same refCnt. @@ -1682,11 +1684,13 @@ protected String getAlgorithm() { public int evictBlocksByHfileName(String hfileName) { return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE); } + @Override public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { fileNotFullyCached(hfileName); Set keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); - LOG.info("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), hfileName, initOffset, endOffset); + LOG.info("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), + hfileName, initOffset, endOffset); int numEvicted = 0; for (BlockCacheKey key : keySet) { if (evictBlock(key)) { @@ -1695,6 +1699,7 @@ public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long e } return numEvicted; } + private Set getAllCacheKeysForFile(String hfileName, long init, long end) { return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true, new BlockCacheKey(hfileName, end), true); @@ -2108,10 +2113,10 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d final MutableInt count = new MutableInt(); LOG.debug("iterating over {} entries in the backing map", backingMap.size()); Set result = getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE); - if(result.isEmpty() && StoreFileInfo.isReference(fileName)) { + if (result.isEmpty() && StoreFileInfo.isReference(fileName)) { result = getAllCacheKeysForFile( - StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), - 0, Long.MAX_VALUE); + StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0, + Long.MAX_VALUE); } result.stream().forEach(entry -> { LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}", @@ -2142,17 +2147,19 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d + "and try the verification again.", fileName); Thread.sleep(100); notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size); - } else - if ((getAllCacheKeysForFile(fileName.getName(),0, Long.MAX_VALUE).size() - metaCount) == dataBlockCount) { - LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " - + "the cache write queue but we now found that total cached blocks for file {} " - + "is equal to data block count.", count, dataBlockCount, fileName.getName()); - fileCacheCompleted(fileName, size); - } else { - LOG.info("We found only {} data blocks cached from a total of {} for file {}, " - + "but no blocks pending caching. Maybe cache is full or evictions " - + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); - } + } else if ( + (getAllCacheKeysForFile(fileName.getName(), 0, Long.MAX_VALUE).size() - metaCount) + == dataBlockCount + ) { + LOG.debug("We counted {} data blocks, expected was {}, there was no more pending in " + + "the cache write queue but we now found that total cached blocks for file {} " + + "is equal to data block count.", count, dataBlockCount, fileName.getName()); + fileCacheCompleted(fileName, size); + } else { + LOG.info("We found only {} data blocks cached from a total of {} for file {}, " + + "but no blocks pending caching. Maybe cache is full or evictions " + + "happened concurrently to cache prefetch.", count, dataBlockCount, fileName); + } } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 3306d1d34d4b..0ed740c7853e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -372,11 +372,14 @@ private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) } private void closeRegionAfterUpdatingMeta(MasterProcedureEnv env, RegionStateNode regionNode) { - CloseRegionProcedure closeProc = isSplit - ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, - env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, DEFAULT_EVICT_ON_SPLIT)) - : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, - evictCache); + CloseRegionProcedure closeProc = + isSplit + ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), + assignCandidate, + env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, + DEFAULT_EVICT_ON_SPLIT)) + : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), + assignCandidate, evictCache); addChildProcedure(closeProc); setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java index 27aef8d56d30..c308d5f6d832 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java @@ -17,6 +17,18 @@ */ package org.apache.hadoop.hbase; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; @@ -33,17 +45,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; -import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; -import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY; -import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY; -import static org.junit.Assert.assertTrue; @Category({ MiscTests.class, MediumTests.class }) public class TestSplitWithCache { @@ -69,27 +70,27 @@ public static void setUp() throws Exception { @Test public void testEvictOnSplit() throws Exception { doTest("testEvictOnSplit", true, - (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null), - (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)==null)); + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) != null), + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) == null)); } @Test public void testDoesntEvictOnSplit() throws Exception { doTest("testDoesntEvictOnSplit", false, - (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null), - (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f)!=null)); + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) != null), + (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) != null)); } private void doTest(String table, boolean evictOnSplit, - BiConsumer>> predicateBeforeSplit, - BiConsumer>> predicateAfterSplit) throws Exception { + BiConsumer>> predicateBeforeSplit, + BiConsumer>> predicateAfterSplit) throws Exception { UTIL.getConfiguration().setBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, evictOnSplit); UTIL.startMiniCluster(1); try { TableName tableName = TableName.valueOf(table); byte[] family = Bytes.toBytes("CF"); - TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName). - setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); UTIL.getAdmin().createTable(td); UTIL.waitTableAvailable(tableName); Table tbl = UTIL.getConnection().getTable(tableName); @@ -101,8 +102,8 @@ private void doTest(String table, boolean evictOnSplit, } tbl.put(puts); UTIL.getAdmin().flush(tableName); - Collection files = UTIL.getMiniHBaseCluster(). - getRegions(tableName).get(0).getStores().get(0).getStorefiles(); + Collection files = + UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getStores().get(0).getStorefiles(); checkCacheForBlocks(tableName, files, predicateBeforeSplit); UTIL.getAdmin().split(tableName, Bytes.toBytes("row-500")); Waiter.waitFor(UTIL.getConfiguration(), 30000, @@ -116,7 +117,7 @@ private void doTest(String table, boolean evictOnSplit, } private void checkCacheForBlocks(TableName tableName, Collection files, - BiConsumer>> checker){ + BiConsumer>> checker) { files.forEach(f -> { UTIL.getMiniHBaseCluster().getRegionServer(0).getBlockCache().ifPresent(cache -> { cache.getFullyCachedFiles().ifPresent(m -> { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java index be0ae8f75bbf..581d1893c17d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java @@ -150,8 +150,7 @@ public void testPrefetchRefsAfterSplit() throws Exception { cacheConf = new CacheConfig(conf, blockCache); Path tableDir = new Path(TEST_UTIL.getDataTestDir(), "testPrefetchRefsAfterSplit"); - RegionInfo region = - RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); + RegionInfo region = RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build(); Path regionDir = new Path(tableDir, region.getEncodedName()); Path cfDir = new Path(regionDir, "cf"); HRegionFileSystem regionFS = @@ -165,14 +164,14 @@ public void testPrefetchRefsAfterSplit() throws Exception { // Our file should have 6 DATA blocks. We should wait for all of them to be cached Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); - //split the file and return references to the original file + // split the file and return references to the original file Random rand = ThreadLocalRandom.current(); byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50); HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, BloomType.NONE, true); Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false, new ConstantSizeRegionSplitPolicy()); HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, BloomType.NONE, true); - //starts reader for the ref. The ref should resolve to the original file blocks + // starts reader for the ref. The ref should resolve to the original file blocks // and not duplicate blocks in the cache. refHsf.initReader(); HFile.Reader reader = refHsf.getReader().getHFileReader(); @@ -180,12 +179,12 @@ public void testPrefetchRefsAfterSplit() throws Exception { // Sleep for a bit Thread.sleep(1000); } - //the ref file blocks keys should actually resolve to the referred file blocks, - //so we should not see additional blocks in the cache. + // the ref file blocks keys should actually resolve to the referred file blocks, + // so we should not see additional blocks in the cache. Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6); BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0); - Cacheable result = bc.getBlock(refCacheKey, true, false,true); + Cacheable result = bc.getBlock(refCacheKey, true, false, true); assertNotNull(result); BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 0); assertEquals(result, bc.getBlock(fileCacheKey, true, false, true)); @@ -337,7 +336,8 @@ private Path writeStoreFile(String fname, HFileContext context, int numKVs) thro return writeStoreFile(context, numKVs, new Path(TEST_UTIL.getDataTestDir(), fname)); } - private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) throws IOException { + private Path writeStoreFile(HFileContext context, int numKVs, Path regionCFDir) + throws IOException { StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs) .withOutputDir(regionCFDir).withFileContext(context).build(); Random rand = ThreadLocalRandom.current(); From 1f9d91f091fdda5988d451596d35fd565dce59fe Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Fri, 17 May 2024 16:36:35 -0400 Subject: [PATCH 03/13] more UT fixes Change-Id: Ib4454ed5de44ce35271e53a88caf7fce3b2d05ad --- .../hadoop/hbase/io/HalfStoreFileReader.java | 28 ++++++++++--------- .../hbase/io/hfile/HFileReaderImpl.java | 8 +++--- .../hbase/io/hfile/bucket/BucketCache.java | 2 +- .../bucket/TestBucketCachePersister.java | 6 ++++ 4 files changed, 26 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 76812ffc3922..92a41fc75c4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -363,19 +363,21 @@ public void close(boolean evictOnClose) throws IOException { final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName(); final String referred = StoreFileInfo.getReferredToRegionAndFile(reference).getSecond(); s.seekTo(splitCell); - long offset = s.getCurBlock().getOffset(); - LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}", - this, reference, top, offset); - ((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> { - int numEvictedReferred = top - ? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE) - : cache.evictBlocksRangeByHfileName(referred, 0, offset); - int numEvictedReference = cache.evictBlocksByHfileName(reference); - LOG.trace( - "Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};" - + "evicted for reference: {}", - reference, referred, top, numEvictedReferred, numEvictedReference); - }); + if (s.getCurBlock() != null) { + long offset = s.getCurBlock().getOffset(); + LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}", + this, reference, top, offset); + ((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> { + int numEvictedReferred = top + ? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE) + : cache.evictBlocksRangeByHfileName(referred, 0, offset); + int numEvictedReference = cache.evictBlocksByHfileName(reference); + LOG.trace( + "Closing reference: {}; referred file: {}; was top? {}; evicted for referred: {};" + + "evicted for reference: {}", + reference, referred, top, numEvictedReferred, numEvictedReference); + }); + } reader.close(false); } else { reader.close(evictOnClose); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 6a9fc9657fd9..71d28f58b084 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1351,8 +1351,9 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo span.addEvent("block cache miss", attributes); // Load block from filesystem. - HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, - !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock)); + HFileBlock hfileBlock = + BlockCacheUtil.getBlockForCaching(cacheConf, fsBlockReader.readBlockData(dataBlockOffset, + onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock))); try { validateBlockType(hfileBlock, expectedBlockType); } catch (IOException e) { @@ -1383,8 +1384,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - cache.cacheBlock(cacheKey, - BlockCacheUtil.getBlockForCaching(cacheConf, cacheCompressed ? hfileBlock : unpacked), + cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheConf.isInMemory(), cacheOnly); } }); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index d457cf196b0a..591dfdd5ff18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -2124,7 +2124,7 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset()); lock.readLock().lock(); locks.add(lock); - if (backingMap.containsKey(entry)) { + if (backingMap.containsKey(entry) && entry.getBlockType() == BlockType.DATA) { count.increment(); } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java index d60d2c53ef6d..b3ac553582b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java @@ -49,6 +49,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({ IOTests.class, MediumTests.class }) public class TestBucketCachePersister { @@ -61,6 +63,8 @@ public class TestBucketCachePersister { public int constructedBlockSize = 16 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(TestBucketCachePersister.class); + public int[] constructedBlockSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024 }; @@ -166,6 +170,7 @@ public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception { HFile.createReader(fs, storeFile, cacheConf, true, conf); boolean evicted = false; while (!PrefetchExecutor.isCompleted(storeFile)) { + LOG.debug("Entered loop as prefetch for {} is still running.", storeFile); if (bucketCache.backingMap.size() > 0 && !evicted) { Iterator> it = bucketCache.backingMap.entrySet().iterator(); @@ -174,6 +179,7 @@ public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception { while (it.hasNext() && !evicted) { if (entry.getKey().getBlockType().equals(BlockType.DATA)) { evicted = bucketCache.evictBlock(it.next().getKey()); + LOG.debug("Attempted eviction for {}. Succeeded? {}", storeFile, evicted); } } } From 61a12fcb88a62777b9e60b77a5c9c8f8e7129ef6 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 20 May 2024 11:59:53 -0400 Subject: [PATCH 04/13] Fixing latest UT failure Change-Id: I970b6179aa4ccfe89dcd8eb7bb8eedbfc77037d5 --- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 71d28f58b084..7dd2de397274 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1351,9 +1351,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo span.addEvent("block cache miss", attributes); // Load block from filesystem. - HFileBlock hfileBlock = - BlockCacheUtil.getBlockForCaching(cacheConf, fsBlockReader.readBlockData(dataBlockOffset, - onDiskBlockSize, pread, !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock))); + HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, + !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock)); try { validateBlockType(hfileBlock, expectedBlockType); } catch (IOException e) { @@ -1370,7 +1369,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly); + cache.cacheBlock(cacheKey, BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock), + cacheConf.isInMemory(), cacheOnly); } }); @@ -1384,7 +1384,10 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, + cache.cacheBlock(cacheKey, + cacheCompressed + ? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock) + : BlockCacheUtil.getBlockForCaching(cacheConf, unpacked), cacheConf.isInMemory(), cacheOnly); } }); From 8bb75074f59ea5fa7589574834c72f7e0f458ca0 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 20 May 2024 13:38:12 -0400 Subject: [PATCH 05/13] adding some comments for better understanding Change-Id: I6dd46a801a44d9da8a824d10d57b88fb3c8cf0da --- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 4 ++++ .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 15 ++++++++++++++- .../handler/UnassignRegionHandler.java | 3 +-- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 7dd2de397274..330ddaf15b2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1369,6 +1369,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + // When caching on write, we create a block without checksum + // (see HFileBlock.Writer.getBlockForCaching). We need to do the same here. cache.cacheBlock(cacheKey, BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock), cacheConf.isInMemory(), cacheOnly); } @@ -1384,6 +1386,8 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. + // When caching on write, we create a block without checksum + // (see HFileBlock.Writer.getBlockForCaching). We need to do the same here. cache.cacheBlock(cacheKey, cacheCompressed ? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 591dfdd5ff18..e95c0ed4dfce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -564,6 +564,15 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach } } + /** + * If the passed cache key relates to a reference (.), this method looks + * for the block from the referred file, in the cache. If present in the cache, the block for the + * referred file is returned, otherwise, this method returns null. It will also return null if the + * passed cache key doesn't relate to a reference. + * @param key the BlockCacheKey instance to look for in the cache. + * @return the cached block from the referred file, null if there's no such block in the cache or + * the passed key doesn't relate to a reference. + */ public BucketEntry getBlockForReference(BlockCacheKey key) { BucketEntry foundEntry = null; String referredFileName = null; @@ -573,7 +582,7 @@ public BucketEntry getBlockForReference(BlockCacheKey key) { if (referredFileName != null) { BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, key.getOffset()); foundEntry = backingMap.get(convertedCacheKey); - LOG.info("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), + LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", key.getHfileName(), convertedCacheKey, foundEntry); } return foundEntry; @@ -2186,6 +2195,8 @@ public Optional shouldCacheFile(String fileName) { @Override public Optional isAlreadyCached(BlockCacheKey key) { boolean foundKey = backingMap.containsKey(key); + // if there's no entry for the key itself, we need to check if this key is for a reference, + // and if so, look for a block from the referenced file using this getBlockForReference method. return Optional.of(foundKey ? true : getBlockForReference(key) != null); } @@ -2193,6 +2204,8 @@ public Optional isAlreadyCached(BlockCacheKey key) { public Optional getBlockSize(BlockCacheKey key) { BucketEntry entry = backingMap.get(key); if (entry == null) { + // the key might be for a reference tha we had found the block from the referenced file in + // the cache when we first tried to cache it. entry = getBlockForReference(key); return entry == null ? Optional.empty() : Optional.of(entry.getOnDiskSizeWithHeader()); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java index 2a9fb38dc53c..a360759aea15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java @@ -124,8 +124,7 @@ public void process() throws IOException { region.getCoprocessorHost().preClose(abort); } // This should be true only in the case of splits/merges closing the parent regions, as - // there's no point on keep blocks for those region files. As hbase.rs.evictblocksonclose is - // false by default we don't bother overriding it if evictCache is false. + // there's no point on keep blocks for those region files. region.getStores().forEach(s -> s.getCacheConfig().setEvictOnClose(evictCache)); if (region.close(abort) == null) { From ca03c502139db859879a64127083f91439bf1238 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 20 May 2024 13:52:32 -0400 Subject: [PATCH 06/13] some extra javadocs Change-Id: Ie893a0ef25bedc252a475b0f80f13beb28861e62 --- .../org/apache/hadoop/hbase/io/HalfStoreFileReader.java | 8 ++++++++ .../org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 92a41fc75c4c..fa7f44b83655 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -354,6 +354,14 @@ public long getFilterEntries() { return super.getFilterEntries() / 2; } + /** + * Overrides close method to handle cache evictions for the referred file. If evictionOnClose is + * true, we will seek to the block containing the splitCell and evict all blocks from offset 0 + * up to that block offset if this is a bottom half reader, or the from the split block offset up + * to the end of the file if this is a top half reader. + * @param evictOnClose true if it should evict the file blocks from the cache. + * @throws IOException + */ @Override public void close(boolean evictOnClose) throws IOException { if (closed.compareAndSet(false, true)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 629ccfb73207..3bf3575511ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -248,6 +248,15 @@ public static int getMaxCachedBlocksByFile(Configuration conf) { return conf == null ? DEFAULT_MAX : conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX); } + /** + * Similarly to HFileBlock.Writer.getBlockForCaching(), creates a HFileBlock instance without + * checksum for caching. This is needed for when we cache blocks via readers (either prefetch + * or client read), otherwise we may fail equality comparison when checking against same block + * that may already have been cached at write time. + * @param cacheConf the related CacheConfig object. + * @param block the HFileBlock instance to be converted. + * @return the resulting HFileBlock instance without checksum. + */ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { HFileContext newContext = new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize()) From 96ebf978ee2daa9e53c31f10b39486ca6b48fe72 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Mon, 20 May 2024 13:54:01 -0400 Subject: [PATCH 07/13] more spotless Change-Id: I8c88c9dd0eeefbb276c6d5f29b7c485df5482f3c --- .../org/apache/hadoop/hbase/io/HalfStoreFileReader.java | 7 +++---- .../org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index fa7f44b83655..0de90bf57ba6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -356,11 +356,10 @@ public long getFilterEntries() { /** * Overrides close method to handle cache evictions for the referred file. If evictionOnClose is - * true, we will seek to the block containing the splitCell and evict all blocks from offset 0 - * up to that block offset if this is a bottom half reader, or the from the split block offset up - * to the end of the file if this is a top half reader. + * true, we will seek to the block containing the splitCell and evict all blocks from offset 0 up + * to that block offset if this is a bottom half reader, or the from the split block offset up to + * the end of the file if this is a top half reader. * @param evictOnClose true if it should evict the file blocks from the cache. - * @throws IOException */ @Override public void close(boolean evictOnClose) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index 3bf3575511ba..d26df261ebae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -250,11 +250,11 @@ public static int getMaxCachedBlocksByFile(Configuration conf) { /** * Similarly to HFileBlock.Writer.getBlockForCaching(), creates a HFileBlock instance without - * checksum for caching. This is needed for when we cache blocks via readers (either prefetch - * or client read), otherwise we may fail equality comparison when checking against same block - * that may already have been cached at write time. + * checksum for caching. This is needed for when we cache blocks via readers (either prefetch or + * client read), otherwise we may fail equality comparison when checking against same block that + * may already have been cached at write time. * @param cacheConf the related CacheConfig object. - * @param block the HFileBlock instance to be converted. + * @param block the HFileBlock instance to be converted. * @return the resulting HFileBlock instance without checksum. */ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { From 3c87622122116f3edd1cc6881d58b656e3151068 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 21 May 2024 14:32:14 -0400 Subject: [PATCH 08/13] Changed the way we handle checksum when reading from cache Change-Id: I779423940f1ab0b4c40e50fd0525c6f9d1e3e4fe --- .../apache/hadoop/hbase/io/hfile/BlockCache.java | 7 +++++++ .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 6 ++++-- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 11 +++++++++-- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 15 ++++++--------- 4 files changed, 26 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java index 6fe4e50aeecf..028a80075b5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java @@ -251,6 +251,13 @@ default Optional> getRegionCachedInfo() { return Optional.empty(); } + /** + * Evict all blocks for the given file name between the passed offset values. + * @param hfileName The file for which blocks should be evicted. + * @param initOffset the initial offset for the range of blocks to be evicted. + * @param endOffset the end offset for the range of blocks to be evicted. + * @return number of blocks evicted. + */ default int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { return 0; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index d26df261ebae..c3a3c8e412f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -273,8 +273,10 @@ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock bl HFileBlockBuilder builder = new HFileBlockBuilder(); ByteBuff buff = block.getBufferReadOnly(); // Calculate how many bytes we need for checksum on the tail of the block. - int numBytes = (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(), - block.getHFileContext().getBytesPerChecksum()); + int numBytes = cacheConf.shouldCacheCompressed(block.getBlockType().getCategory()) + ? 0 + : (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(), + block.getHFileContext().getBytesPerChecksum()); return builder.withBlockType(block.getBlockType()) .withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader()) .withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 08a45dde23ab..fd1e4d143f0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1205,8 +1205,14 @@ void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { * being wholesome (ECC memory or if file-backed, it does checksumming). */ HFileBlock getBlockForCaching(CacheConfig cacheConf) { + boolean cacheWithChecksum = cacheConf.getBlockCache().get().cacheWithChecksum(blockType); HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) - .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data + .withBytesPerCheckSum(cacheWithChecksum ? fileContext.getBytesPerChecksum() : 0) + .withChecksumType(cacheWithChecksum ? fileContext.getChecksumType() : ChecksumType.NULL) // no + // checksums + // in + // cached + // data .withCompression(fileContext.getCompression()) .withDataBlockEncoding(fileContext.getDataBlockEncoding()) .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) @@ -1228,7 +1234,8 @@ HFileBlock getBlockForCaching(CacheConfig cacheConf) { .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) - .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) + .withOnDiskDataSizeWithHeader( + onDiskBlockBytesWithHeader.size() + (cacheWithChecksum ? 0 : onDiskChecksum.length)) .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) .withShared(!buff.hasArray()).build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 330ddaf15b2f..e37536b03097 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1365,33 +1365,30 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo // Don't need the unpacked block back and we're storing the block in the cache compressed if (cacheOnly && cacheCompressed && cacheOnRead) { + HFileBlock blockNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock); cacheConf.getBlockCache().ifPresent(cache -> { LOG.debug("Skipping decompression of block {} in prefetch", cacheKey); // Cache the block if necessary if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { - // When caching on write, we create a block without checksum - // (see HFileBlock.Writer.getBlockForCaching). We need to do the same here. - cache.cacheBlock(cacheKey, BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock), - cacheConf.isInMemory(), cacheOnly); + cache.cacheBlock(cacheKey, blockNoChecksum, cacheConf.isInMemory(), cacheOnly); } }); if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.DATABLOCK_READ_COUNT.increment(); } - return hfileBlock; + return blockNoChecksum; } HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked); // Cache the block if necessary cacheConf.getBlockCache().ifPresent(cache -> { if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { // Using the wait on cache during compaction and prefetching. - // When caching on write, we create a block without checksum - // (see HFileBlock.Writer.getBlockForCaching). We need to do the same here. cache.cacheBlock(cacheKey, cacheCompressed ? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock) - : BlockCacheUtil.getBlockForCaching(cacheConf, unpacked), + : unpackedNoChecksum, cacheConf.isInMemory(), cacheOnly); } }); @@ -1403,7 +1400,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo HFile.DATABLOCK_READ_COUNT.increment(); } - return unpacked; + return unpackedNoChecksum; } } finally { if (lockEntry != null) { From 458771344ba544f22c03d18daf6e5fcd52c6b83b Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Tue, 21 May 2024 15:14:47 -0400 Subject: [PATCH 09/13] fixed compile error Change-Id: I89a2cc6fd70b67f1f8f77382b1c61bfffc9f39a1 --- .../org/apache/hadoop/hbase/io/hfile/HFileBlock.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index fd1e4d143f0a..26f4df808724 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1205,14 +1205,9 @@ void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { * being wholesome (ECC memory or if file-backed, it does checksumming). */ HFileBlock getBlockForCaching(CacheConfig cacheConf) { - boolean cacheWithChecksum = cacheConf.getBlockCache().get().cacheWithChecksum(blockType); HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) - .withBytesPerCheckSum(cacheWithChecksum ? fileContext.getBytesPerChecksum() : 0) - .withChecksumType(cacheWithChecksum ? fileContext.getChecksumType() : ChecksumType.NULL) // no - // checksums - // in - // cached - // data + .withBytesPerCheckSum(0) + .withChecksumType(ChecksumType.NULL) .withCompression(fileContext.getCompression()) .withDataBlockEncoding(fileContext.getDataBlockEncoding()) .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) @@ -1234,8 +1229,7 @@ HFileBlock getBlockForCaching(CacheConfig cacheConf) { .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader()) .withPrevBlockOffset(prevOffset).withByteBuff(buff).withFillHeader(FILL_HEADER) .withOffset(startOffset).withNextBlockOnDiskSize(UNSET) - .withOnDiskDataSizeWithHeader( - onDiskBlockBytesWithHeader.size() + (cacheWithChecksum ? 0 : onDiskChecksum.length)) + .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length) .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) .withShared(!buff.hasArray()).build(); } From 64b085536fdbb448e3d6f2c95b723a302ae4d2c3 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 22 May 2024 11:17:55 -0400 Subject: [PATCH 10/13] spotless Change-Id: I5f7bf8c5056c20911f9c86b333f1592d3dd45d79 --- .../hadoop/hbase/io/hfile/HFileBlock.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 26f4df808724..0897b6f6910a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1205,17 +1205,16 @@ void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { * being wholesome (ECC memory or if file-backed, it does checksumming). */ HFileBlock getBlockForCaching(CacheConfig cacheConf) { - HFileContext newContext = new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()) - .withBytesPerCheckSum(0) - .withChecksumType(ChecksumType.NULL) - .withCompression(fileContext.getCompression()) - .withDataBlockEncoding(fileContext.getDataBlockEncoding()) - .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) - .withCompressTags(fileContext.isCompressTags()) - .withIncludesMvcc(fileContext.isIncludesMvcc()) - .withIncludesTags(fileContext.isIncludesTags()) - .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) - .build(); + HFileContext newContext = + new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()).withBytesPerCheckSum(0) + .withChecksumType(ChecksumType.NULL).withCompression(fileContext.getCompression()) + .withDataBlockEncoding(fileContext.getDataBlockEncoding()) + .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) + .withCompressTags(fileContext.isCompressTags()) + .withIncludesMvcc(fileContext.isIncludesMvcc()) + .withIncludesTags(fileContext.isIncludesTags()) + .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) + .build(); // Build the HFileBlock. HFileBlockBuilder builder = new HFileBlockBuilder(); ByteBuff buff; From 243df0456cf88fedc3700e345a4a307c47c509c2 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Fri, 7 Jun 2024 10:37:34 +0100 Subject: [PATCH 11/13] reducing log level Change-Id: I94f27b3e5f6787a1bc159693805c0c8929a8fcea --- .../org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index e95c0ed4dfce..7ee7a03ba647 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1698,7 +1698,7 @@ public int evictBlocksByHfileName(String hfileName) { public int evictBlocksRangeByHfileName(String hfileName, long initOffset, long endOffset) { fileNotFullyCached(hfileName); Set keySet = getAllCacheKeysForFile(hfileName, initOffset, endOffset); - LOG.info("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), + LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: {}", keySet.size(), hfileName, initOffset, endOffset); int numEvicted = 0; for (BlockCacheKey key : keySet) { From 7dba9bf12a9d6e2a7c14d368575b85517b0866a8 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Wed, 12 Jun 2024 22:07:16 +0100 Subject: [PATCH 12/13] addressing Duo's comments Change-Id: If712a4310a8f651b2395dd233fec7f8430362da5 --- .../hadoop/hbase/io/HalfStoreFileReader.java | 2 +- .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 31 +++++++++---------- .../hadoop/hbase/io/hfile/HFileBlock.java | 11 +------ 3 files changed, 17 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 0de90bf57ba6..3a4b0437bfca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -364,7 +364,7 @@ public long getFilterEntries() { @Override public void close(boolean evictOnClose) throws IOException { if (closed.compareAndSet(false, true)) { - if (evictOnClose && StoreFileInfo.isReference(this.reader.getPath())) { + if (evictOnClose) { final HFileReaderImpl.HFileScannerImpl s = (HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, false); final String reference = this.reader.getHFileInfo().getHFileContext().getHFileName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java index c3a3c8e412f9..3d4698b0047e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java @@ -258,33 +258,32 @@ public static int getMaxCachedBlocksByFile(Configuration conf) { * @return the resulting HFileBlock instance without checksum. */ public static HFileBlock getBlockForCaching(CacheConfig cacheConf, HFileBlock block) { - HFileContext newContext = - new HFileContextBuilder().withBlockSize(block.getHFileContext().getBlocksize()) - .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data - .withCompression(block.getHFileContext().getCompression()) - .withDataBlockEncoding(block.getHFileContext().getDataBlockEncoding()) - .withHBaseCheckSum(block.getHFileContext().isUseHBaseChecksum()) - .withCompressTags(block.getHFileContext().isCompressTags()) - .withIncludesMvcc(block.getHFileContext().isIncludesMvcc()) - .withIncludesTags(block.getHFileContext().isIncludesTags()) - .withColumnFamily(block.getHFileContext().getColumnFamily()) - .withTableName(block.getHFileContext().getTableName()).build(); - // Build the HFileBlock. - HFileBlockBuilder builder = new HFileBlockBuilder(); - ByteBuff buff = block.getBufferReadOnly(); // Calculate how many bytes we need for checksum on the tail of the block. int numBytes = cacheConf.shouldCacheCompressed(block.getBlockType().getCategory()) ? 0 : (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(), block.getHFileContext().getBytesPerChecksum()); + ByteBuff buff = block.getBufferReadOnly(); + HFileBlockBuilder builder = new HFileBlockBuilder(); return builder.withBlockType(block.getBlockType()) .withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader()) .withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader()) .withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff) .withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1) .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + numBytes) - .withHFileContext(newContext).withByteBuffAllocator(cacheConf.getByteBuffAllocator()) - .withShared(!buff.hasArray()).build(); + .withHFileContext(cloneContext(block.getHFileContext())) + .withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build(); + } + + public static HFileContext cloneContext(HFileContext context) { + HFileContext newContext = new HFileContextBuilder().withBlockSize(context.getBlocksize()) + .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no checksums in cached data + .withCompression(context.getCompression()) + .withDataBlockEncoding(context.getDataBlockEncoding()) + .withHBaseCheckSum(context.isUseHBaseChecksum()).withCompressTags(context.isCompressTags()) + .withIncludesMvcc(context.isIncludesMvcc()).withIncludesTags(context.isIncludesTags()) + .withColumnFamily(context.getColumnFamily()).withTableName(context.getTableName()).build(); + return newContext; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 0897b6f6910a..b24976707c33 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1205,16 +1205,7 @@ void writeBlock(BlockWritable bw, FSDataOutputStream out) throws IOException { * being wholesome (ECC memory or if file-backed, it does checksumming). */ HFileBlock getBlockForCaching(CacheConfig cacheConf) { - HFileContext newContext = - new HFileContextBuilder().withBlockSize(fileContext.getBlocksize()).withBytesPerCheckSum(0) - .withChecksumType(ChecksumType.NULL).withCompression(fileContext.getCompression()) - .withDataBlockEncoding(fileContext.getDataBlockEncoding()) - .withHBaseCheckSum(fileContext.isUseHBaseChecksum()) - .withCompressTags(fileContext.isCompressTags()) - .withIncludesMvcc(fileContext.isIncludesMvcc()) - .withIncludesTags(fileContext.isIncludesTags()) - .withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName()) - .build(); + HFileContext newContext = BlockCacheUtil.cloneContext(fileContext); // Build the HFileBlock. HFileBlockBuilder builder = new HFileBlockBuilder(); ByteBuff buff; From a23ca87bcc7a40507069e504bf368b9d8b7c3fc2 Mon Sep 17 00:00:00 2001 From: Wellington Ramos Chevreuil Date: Thu, 13 Jun 2024 19:51:36 +0100 Subject: [PATCH 13/13] fixing TestHalfStoreFileReader Change-Id: I480a6a4f062d1d57d8a432d93d9f9c4e62c9c826 --- .../hbase/io/TestHalfStoreFileReader.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java index 7dd4cbe44f93..74addb186143 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.ReaderContext; import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -82,15 +84,19 @@ public static void tearDownAfterClass() throws Exception { */ @Test public void testHalfScanAndReseek() throws IOException { - String root_dir = TEST_UTIL.getDataTestDir().toString(); - Path p = new Path(root_dir, "test"); - Configuration conf = TEST_UTIL.getConfiguration(); FileSystem fs = FileSystem.get(conf); + String root_dir = TEST_UTIL.getDataTestDir().toString(); + Path parentPath = new Path(new Path(root_dir, "parent"), "CF"); + fs.mkdirs(parentPath); + Path splitAPath = new Path(new Path(root_dir, "splita"), "CF"); + Path splitBPath = new Path(new Path(root_dir, "splitb"), "CF"); + Path filePath = StoreFileWriter.getUniqueFile(fs, parentPath); + CacheConfig cacheConf = new CacheConfig(conf); HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build(); HFile.Writer w = - HFile.getWriterFactory(conf, cacheConf).withPath(fs, p).withFileContext(meta).create(); + HFile.getWriterFactory(conf, cacheConf).withPath(fs, filePath).withFileContext(meta).create(); // write some things. List items = genSomeKeys(); @@ -99,26 +105,35 @@ public void testHalfScanAndReseek() throws IOException { } w.close(); - HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf); + HFile.Reader r = HFile.createReader(fs, filePath, cacheConf, true, conf); Cell midKV = r.midKey().get(); byte[] midkey = CellUtil.cloneRow(midKV); - // System.out.println("midkey: " + midKV + " or: " + Bytes.toStringBinary(midkey)); + Path splitFileA = new Path(splitAPath, filePath.getName() + ".parent"); + Path splitFileB = new Path(splitBPath, filePath.getName() + ".parent"); Reference bottom = new Reference(midkey, Reference.Range.bottom); - doTestOfScanAndReseek(p, fs, bottom, cacheConf); + bottom.write(fs, splitFileA); + doTestOfScanAndReseek(splitFileA, fs, bottom, cacheConf); Reference top = new Reference(midkey, Reference.Range.top); - doTestOfScanAndReseek(p, fs, top, cacheConf); + top.write(fs, splitFileB); + doTestOfScanAndReseek(splitFileB, fs, top, cacheConf); r.close(); } private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, CacheConfig cacheConf) throws IOException { - ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, p).build(); - StoreFileInfo storeFileInfo = - new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), bottom); + Path referencePath = StoreFileInfo.getReferredToFile(p); + FSDataInputStreamWrapper in = new FSDataInputStreamWrapper(fs, referencePath, false, 0); + FileStatus status = fs.getFileStatus(referencePath); + long length = status.getLen(); + ReaderContextBuilder contextBuilder = + new ReaderContextBuilder().withInputStreamWrapper(in).withFileSize(length) + .withReaderType(ReaderContext.ReaderType.PREAD).withFileSystem(fs).withFilePath(p); + ReaderContext context = contextBuilder.build(); + StoreFileInfo storeFileInfo = new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p, true); storeFileInfo.initHFileInfo(context); final HalfStoreFileReader halfreader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);