diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java index d23c2e3ecf39..f136a4d195c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java @@ -133,16 +133,6 @@ public static void cancel(Path path) { } } - public static void interrupt(Path path) { - Future future = prefetchFutures.get(path); - if (future != null) { - prefetchFutures.remove(path); - // ok to race with other cancellation attempts - future.cancel(true); - LOG.debug("Prefetch cancelled for {}", path); - } - } - private PrefetchExecutor() { } @@ -200,10 +190,18 @@ public static void loadConfiguration(Configuration conf) { prefetchFutures.forEach((k, v) -> { ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k); if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) { - // the thread is still pending delay expiration and has not started to run yet, so can be - // re-scheduled at no cost. - interrupt(k); - request(k, prefetchRunnable.get(k)); + Runnable runnable = prefetchRunnable.get(k); + Future future = prefetchFutures.get(k); + if (future != null) { + prefetchFutures.remove(k); + // ok to race with other cancellation attempts + boolean canceled = future.cancel(true); + LOG.debug("Prefetch {} for {}", + canceled ? "cancelled" : "cancel attempted but it was already finished", k); + } + if (runnable != null && future != null && future.isCancelled()) { + request(k, runnable); + } } LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k, prefetchDelayMillis, prefetchDelayVariation); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index ea8b8a959d38..f972de4b3318 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -594,6 +594,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach successfulAddition = bq.offer(re); } if (!successfulAddition) { + LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey); ramCache.remove(cacheKey); cacheStats.failInsert(); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index f7223510bb67..5f0f956eaec0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -70,6 +70,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -82,6 +84,8 @@ @Category({ IOTests.class, LargeTests.class }) public class TestBucketCache { + private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class); + @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBucketCache.class); @@ -911,8 +915,16 @@ public void testNotifyFileCachingCompletedSuccess() throws Exception { try { Path filePath = new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess"); - bucketCache = testNotifyFileCachingCompleted(filePath, 10); - assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); + bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10); + if (bucketCache.getStats().getFailedInserts() > 0) { + LOG.info("There were {} fail inserts, " + + "will assert if total blocks in backingMap equals (10 - failInserts) " + + "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts()); + assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size()); + assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); + } else { + assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); + } } finally { if (bucketCache != null) { bucketCache.shutdown(); @@ -929,7 +941,7 @@ public void testNotifyFileCachingCompletedNotAllCached() throws Exception { "testNotifyFileCachingCompletedNotAllCached"); // Deliberately passing more blocks than we have created to test that // notifyFileCachingCompleted will not consider the file fully cached - bucketCache = testNotifyFileCachingCompleted(filePath, 12); + bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12); assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName())); } finally { if (bucketCache != null) { @@ -939,8 +951,8 @@ public void testNotifyFileCachingCompletedNotAllCached() throws Exception { } } - private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlocks) - throws Exception { + private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath, + int totalBlocksToCheck) throws Exception { final Path dataTestDir = createAndGetTestDir(); String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache"; BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, @@ -954,8 +966,8 @@ private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlock for (HFileBlockPair hfileBlockPair : hfileBlockPairs) { bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true); } - bucketCache.notifyFileCachingCompleted(filePath, totalBlocks, totalBlocks, - totalBlocks * constructedBlockSize); + bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck, + totalBlocksToCheck * constructedBlockSize); return bucketCache; } }