From 632377ed15dfc509854d6a4ad35d130cd0d75160 Mon Sep 17 00:00:00 2001 From: Janardhan Hungund Date: Mon, 7 Oct 2024 10:35:25 +0530 Subject: [PATCH] HBASE-28900: Avoid resetting the bucket cache during recovery from persistence. When an inconsistency is detected during the recovery of bucket cache from persistence, we tend to throw away the complete cache and try to rebuild the cache. Thsi inconsistency can occur when the region server is abruptly terminated. This can be avoided by skipping the inconsistent backing map entry and continuing with the cache recovery. The inconsistent backing map entry will be discarded during the subsequent bucket cache validation. Change-Id: I41237bda2189cbd73e22e4eadf4d53d16a3733f6 --- .../io/hfile/bucket/BucketAllocator.java | 19 +++--- .../hbase/io/hfile/bucket/BucketCache.java | 2 +- .../TestRecoveryPersistentBucketCache.java | 62 +++++++++++++++++++ 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 0b03656d7010..32098246bee0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -126,7 +126,7 @@ public long allocate() { return offset; } - public void addAllocation(long offset) throws BucketAllocatorException { + public boolean addAllocation(long offset) throws BucketAllocatorException { offset -= baseOffset; if (offset < 0 || offset % itemAllocationSize != 0) throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException { if (matchFound) freeList[i - 1] = freeList[i]; else if (freeList[i] == idx) matchFound = true; } - if (!matchFound) throw new BucketAllocatorException( - "Couldn't find match for index " + idx + " in free list"); + if (!matchFound) { + LOG.warn("We found more entries for bucket than the bucket capacity. " + + "Skipping entry at cache offset {}", offset); + return false; + } ++usedCount; --freeCount; + return true; } private void free(long offset) { @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } - realCacheSize.add(foundLen); - buckets[bucketNo].addAllocation(foundOffset); - usedSize += buckets[bucketNo].getItemAllocationSize(); - bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + if(buckets[bucketNo].addAllocation(foundOffset)) { + realCacheSize.add(foundLen); + usedSize += buckets[bucketNo].getItemAllocationSize(); + bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + } } if (sizeNotMatchedCount > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index eb612635dda2..47e9f457f694 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -853,7 +853,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, * it is {@link ByteBuffAllocator#putbackBuffer}. * */ - private Recycler createRecycler(final BucketEntry bucketEntry) { + public Recycler createRecycler(final BucketEntry bucketEntry) { return () -> { freeBucketEntry(bucketEntry); return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 3a4af295dc84..8879e67f812e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -199,6 +202,65 @@ public void testValidateCacheInitialization() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); + conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); + conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36*1024, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(1000)); + assertTrue( + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + + // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it + // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency + BucketEntry bucketEntry = new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), + blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), 0, false, + bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); + bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); + bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); + + // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The + // 5th block has same cache offset as the first + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36*1024, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + while(!newBucketCache.getBackingMapValidated().get()){ + Thread.sleep(10); + } + + assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); + assertEquals(blocks[0].getBlock(), + newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false)); + assertEquals(blocks[1].getBlock(), + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertEquals(blocks[3].getBlock(), + newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); + assertEquals(4, newBucketCache.backingMap.size()); + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { Waiter.waitFor(HBaseConfiguration.create(), 12000,