Skip to content

Commit

Permalink
HBASE-28450 BuckeCache.evictBlocksByHfileName won't work after a cach…
Browse files Browse the repository at this point in the history
…e recovery from file (#5769)
  • Loading branch information
wchevreuil authored Mar 27, 2024
1 parent 78923b7 commit 298c550
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
transient final IdReadWriteLock<Long> offsetLock;

final NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>((a, b) -> {
int nameComparison = a.getHfileName().compareTo(b.getHfileName());
if (nameComparison != 0) {
return nameComparison;
}
return Long.compare(a.getOffset(), b.getOffset());
});
NavigableSet<BlockCacheKey> blocksByHFile = new ConcurrentSkipListSet<>(
Comparator.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));

/** Statistics thread schedule pool (for heavy debugging, could remove) */
private transient final ScheduledExecutorService scheduleThreadPool =
Expand Down Expand Up @@ -1471,8 +1466,11 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
}

private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
this::createRecycler);
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
this::createRecycler);
backingMap = pair.getFirst();
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
if (proto.hasChecksum()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.Function;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
Expand Down Expand Up @@ -121,10 +124,12 @@ private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
}
}

static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(Map<Integer, String> deserializers,
BucketCacheProtos.BackingMap backingMap, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB(
Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
Function<BucketEntry, Recycler> createRecycler) throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
.comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
Expand Down Expand Up @@ -153,8 +158,9 @@ static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(Map<Integer, String>
throw new IOException("Unknown deserializer class found: " + deserializerClass);
}
result.put(key, value);
resultSet.add(key);
}
return result;
return new Pair<>(result, resultSet);
}

private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,39 @@ public void testBucketCacheRecovery() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testBucketCacheEvictByHFileAfterRecovery() throws Exception {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);

CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 4);

// Add four blocks
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());
// saves the current state of the cache
bucketCache.persistToFile();

BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
Thread.sleep(100);
assertEquals(4, newBucketCache.backingMap.size());
newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
assertEquals(3, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
Expand Down

0 comments on commit 298c550

Please sign in to comment.