Skip to content

Commit

Permalink
HBASE-28900: Avoid resetting the bucket cache during recovery from pe…
Browse files Browse the repository at this point in the history
…rsistence.

When an inconsistency is detected during the recovery of bucket cache from persistence,
we tend to throw away the complete cache and try to rebuild the cache.
Thsi inconsistency can occur when the region server is abruptly terminated.

This can be avoided by skipping the inconsistent backing map entry and continuing with
the cache recovery. The inconsistent backing map entry will be discarded during the subsequent
bucket cache validation.

Change-Id: I41237bda2189cbd73e22e4eadf4d53d16a3733f6
  • Loading branch information
jhungund committed Oct 9, 2024
1 parent ef19929 commit 632377e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public long allocate() {
return offset;
}

public void addAllocation(long offset) throws BucketAllocatorException {
public boolean addAllocation(long offset) throws BucketAllocatorException {
offset -= baseOffset;
if (offset < 0 || offset % itemAllocationSize != 0)
throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset
Expand All @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException {
if (matchFound) freeList[i - 1] = freeList[i];
else if (freeList[i] == idx) matchFound = true;
}
if (!matchFound) throw new BucketAllocatorException(
"Couldn't find match for index " + idx + " in free list");
if (!matchFound) {
LOG.warn("We found more entries for bucket than the bucket capacity. "
+ "Skipping entry at cache offset {}", offset);
return false;
}
++usedCount;
--freeCount;
return true;
}

private void free(long offset) {
Expand Down Expand Up @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
bsi.instantiateBucket(b);
reconfigured[bucketNo] = true;
}
realCacheSize.add(foundLen);
buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
if(buckets[bucketNo].addAllocation(foundOffset)) {
realCacheSize.add(foundLen);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
}
}

if (sizeNotMatchedCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
* it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre>
*/
private Recycler createRecycler(final BucketEntry bucketEntry) {
public Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
freeBucketEntry(bucketEntry);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -199,6 +202,65 @@ public void testValidateCacheInitialization() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36*1024,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
assertTrue(bucketCache.waitForCacheInitialization(1000));
assertTrue(
bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());

CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);

// Add four blocks
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());

// creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
// straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
BucketEntry bucketEntry = new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), 0, false,
bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);

// saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
// 5th block has same cache offset as the first
bucketCache.persistToFile();

BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36*1024,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
while(!newBucketCache.getBackingMapValidated().get()){
Thread.sleep(10);
}

assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false));
assertEquals(blocks[0].getBlock(),
newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
assertEquals(blocks[1].getBlock(),
newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
assertEquals(blocks[2].getBlock(),
newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
assertEquals(blocks[3].getBlock(),
newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
assertEquals(4, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
Waiter.waitFor(HBaseConfiguration.create(), 12000,
Expand Down

0 comments on commit 632377e

Please sign in to comment.