diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index 0b03656d7010..e25d52698bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -126,7 +126,7 @@ public long allocate() { return offset; } - public void addAllocation(long offset) throws BucketAllocatorException { + public boolean addAllocation(long offset) throws BucketAllocatorException { offset -= baseOffset; if (offset < 0 || offset % itemAllocationSize != 0) throw new BucketAllocatorException("Attempt to add allocation for bad offset: " + offset @@ -137,10 +137,14 @@ public void addAllocation(long offset) throws BucketAllocatorException { if (matchFound) freeList[i - 1] = freeList[i]; else if (freeList[i] == idx) matchFound = true; } - if (!matchFound) throw new BucketAllocatorException( - "Couldn't find match for index " + idx + " in free list"); + if (!matchFound) { + LOG.warn("We found more entries for bucket starting at offset {} for blocks of {} size. " + + "Skipping entry at cache offset {}", baseOffset, itemAllocationSize, offset); + return false; + } ++usedCount; --freeCount; + return true; } private void free(long offset) { @@ -402,10 +406,11 @@ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) { bsi.instantiateBucket(b); reconfigured[bucketNo] = true; } - realCacheSize.add(foundLen); - buckets[bucketNo].addAllocation(foundOffset); - usedSize += buckets[bucketNo].getItemAllocationSize(); - bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + if (buckets[bucketNo].addAllocation(foundOffset)) { + realCacheSize.add(foundLen); + usedSize += buckets[bucketNo].getItemAllocationSize(); + bucketSizeInfos[bucketSizeIndex].blockAllocated(b); + } } if (sizeNotMatchedCount > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 6dc65c632de0..bc7bc955864c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -854,7 +854,7 @@ private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry, * it is {@link ByteBuffAllocator#putbackBuffer}. * */ - private Recycler createRecycler(final BucketEntry bucketEntry) { + public Recycler createRecycler(final BucketEntry bucketEntry) { return () -> { freeBucketEntry(bucketEntry); return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java index 3a4af295dc84..b8fc1be986ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -199,6 +202,67 @@ public void testValidateCacheInitialization() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testBucketCacheRecoveryWithAllocationInconsistencies() throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + // Disables the persister thread by setting its interval to MAX_VALUE + conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE); + conf.setDouble(MIN_FACTOR_CONFIG_NAME, 0.99); + conf.setDouble(ACCEPT_FACTOR_CONFIG_NAME, 1); + conf.setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.01); + int[] bucketSizes = new int[] { 8 * 1024 + 1024 }; + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, 8192, + bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + assertTrue(bucketCache.waitForCacheInitialization(1000)); + assertTrue( + bucketCache.isCacheInitialized("testBucketCacheRecovery") && bucketCache.isCacheEnabled()); + + CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 5); + + // Add four blocks + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), blocks[0].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), blocks[1].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), blocks[2].getBlock()); + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), blocks[3].getBlock()); + + // creates a entry for a 5th block with the same cache offset of the 1st block. Just add it + // straight to the backingMap, bypassing caching, in order to fabricate an inconsistency + BucketEntry bucketEntry = + new BucketEntry(bucketCache.backingMap.get(blocks[0].getBlockName()).offset(), + blocks[4].getBlock().getSerializedLength(), blocks[4].getBlock().getOnDiskSizeWithHeader(), + 0, false, bucketCache::createRecycler, blocks[4].getBlock().getByteBuffAllocator()); + bucketEntry.setDeserializerReference(blocks[4].getBlock().getDeserializer()); + bucketCache.getBackingMap().put(blocks[4].getBlockName(), bucketEntry); + + // saves the current state of the cache: 5 blocks in the map, but we only have cached 4. The + // 5th block has same cache offset as the first + bucketCache.persistToFile(); + + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 36 * 1024, + 8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence", + DEFAULT_ERROR_TOLERATION_DURATION, conf); + while (!newBucketCache.getBackingMapValidated().get()) { + Thread.sleep(10); + } + + assertNull(newBucketCache.getBlock(blocks[4].getBlockName(), false, false, false)); + // The backing map entry with key blocks[0].getBlockName() for the may point to a valid entry + // or null based on different ordering of the keys in the backing map. + // Hence, skipping the check for that key. + assertEquals(blocks[1].getBlock(), + newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false)); + assertEquals(blocks[2].getBlock(), + newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false)); + assertEquals(blocks[3].getBlock(), + newBucketCache.getBlock(blocks[3].getBlockName(), false, false, false)); + assertEquals(4, newBucketCache.backingMap.size()); + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { Waiter.waitFor(HBaseConfiguration.create(), 12000,