Skip to content

Commit

Permalink
HBASE-28900: Avoid resetting the bucket cache during recovery from pe…
Browse files Browse the repository at this point in the history
…rsistence. (#6360)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
jhungund authored and wchevreuil committed Oct 14, 2024
1 parent 6725566 commit 008fb37
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public long allocate() {
return offset;
}

public void addAllocation(long offset) throws BucketAllocatorException {
public boolean addAllocation(long offset) throws BucketAllocatorException {
offset -= baseOffset;
if (offset < 0 || offset % itemAllocationSize != 0)
throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset
Expand All @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException {
if (matchFound) freeList[i - 1] = freeList[i];
else if (freeList[i] == idx) matchFound = true;
}
if (!matchFound) throw new BucketAllocatorException(
"Couldn't find match for index " + idx + " in free list");
if (!matchFound) {
LOG.warn("We found more entries for bucket starting at offset {} for blocks of {} size. "
+ "Skipping entry at cache offset {}", baseOffset, itemAllocationSize, offset);
return false;
}
++usedCount;
--freeCount;
return true;
}

private void free(long offset) {
Expand Down Expand Up @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
bsi.instantiateBucket(b);
reconfigured[bucketNo] = true;
}
realCacheSize.add(foundLen);
buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
if (buckets[bucketNo].addAllocation(foundOffset)) {
realCacheSize.add(foundLen);
usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
}
}

if (sizeNotMatchedCount > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
* it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre>
*/
private Recycler createRecycler(final BucketEntry bucketEntry) {
public Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
freeBucketEntry(bucketEntry);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.io.hfile.bucket;

import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -199,6 +202,67 @@ public void testValidateCacheInitialization() throws Exception {
TEST_UTIL.cleanupTestDir();
}

@Test
public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception {
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = HBaseConfiguration.create();
// Disables the persister thread by setting its interval to MAX_VALUE
conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99);
conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1);
conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01);
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192,
bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
assertTrue(bucketCache.waitForCacheInitialization(1000));
assertTrue(
bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled());

CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5);

// Add four blocks
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock());
cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock());

// creates a entry for a 5th block with the same cache offset of the 1st block. Just add it
// straight to the backingMap, bypassing caching, in order to fabricate an inconsistency
BucketEntry bucketEntry =
new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(),
blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(),
0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator());
bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer());
bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry);

// saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The
// 5th block has same cache offset as the first
bucketCache.persistToFile();

BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
while (!newBucketCache.getBackingMapValidated().get()) {
Thread.sleep(10);
}

assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false));
// The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry
// or null based on different ordering of the keys in the backing map.
// Hence, skipping the check for that key.
assertEquals(blocks[1].getBlock(),
newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
assertEquals(blocks[2].getBlock(),
newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
assertEquals(blocks[3].getBlock(),
newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false));
assertEquals(4, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}

private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
throws InterruptedException {
Waiter.waitFor(HBaseConfiguration.create(), 12000,
Expand Down

0 comments on commit 008fb37

Please sign in to comment.