Skip to content

Commit

Permalink
HBASE-28894 NPE on TestPrefetch.testPrefetchWithDelay (apache#6338)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
  • Loading branch information
wchevreuil committed Oct 8, 2024
1 parent 7ffba05 commit 182f516
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ public static void cancel(Path path) {
}
}

public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
prefetchFutures.remove(path);
// ok to race with other cancellation attempts
future.cancel(true);
LOG.debug("Prefetch cancelled for {}", path);
}
}

private PrefetchExecutor() {
}

Expand Down Expand Up @@ -200,10 +190,18 @@ public static void loadConfiguration(Configuration conf) {
prefetchFutures.forEach((k, v) -> {
ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
// the thread is still pending delay expiration and has not started to run yet, so can be
// re-scheduled at no cost.
interrupt(k);
request(k, prefetchRunnable.get(k));
Runnable runnable = prefetchRunnable.get(k);
Future<?> future = prefetchFutures.get(k);
if (future != null) {
prefetchFutures.remove(k);
// ok to race with other cancellation attempts
boolean canceled = future.cancel(true);
LOG.debug("Prefetch {} for {}",
canceled ? "cancelled" : "cancel attempted but it was already finished", k);
}
if (runnable != null && future != null && future.isCancelled()) {
request(k, runnable);
}
}
LOG.debug("Reset called on Prefetch of file {} with delay {}, delay variation {}", k,
prefetchDelayMillis, prefetchDelayVariation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
successfulAddition = bq.offer(re);
}
if (!successfulAddition) {
LOG.debug("Failed to insert block {} into the cache writers queue", cacheKey);
ramCache.remove(cacheKey);
cacheStats.failInsert();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;

Expand All @@ -82,6 +84,8 @@
@Category({ IOTests.class, LargeTests.class })
public class TestBucketCache {

private static final Logger LOG = LoggerFactory.getLogger(TestBucketCache.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBucketCache.class);
Expand Down Expand Up @@ -911,8 +915,16 @@ public void testNotifyFileCachingCompletedSuccess() throws Exception {
try {
Path filePath =
new Path(HBASE_TESTING_UTILITY.getDataTestDir(), "testNotifyFileCachingCompletedSuccess");
bucketCache = testNotifyFileCachingCompleted(filePath, 10);
assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 10);
if (bucketCache.getStats().getFailedInserts() > 0) {
LOG.info("There were {} fail inserts, "
+ "will assert if total blocks in backingMap equals (10 - failInserts) "
+ "and file isn't listed as fully cached.", bucketCache.getStats().getFailedInserts());
assertEquals(10 - bucketCache.getStats().getFailedInserts(), bucketCache.backingMap.size());
assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
} else {
assertTrue(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
}
} finally {
if (bucketCache != null) {
bucketCache.shutdown();
Expand All @@ -929,7 +941,7 @@ public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
"testNotifyFileCachingCompletedNotAllCached");
// Deliberately passing more blocks than we have created to test that
// notifyFileCachingCompleted will not consider the file fully cached
bucketCache = testNotifyFileCachingCompleted(filePath, 12);
bucketCache = testNotifyFileCachingCompletedForTenBlocks(filePath, 12);
assertFalse(bucketCache.fullyCachedFiles.containsKey(filePath.getName()));
} finally {
if (bucketCache != null) {
Expand All @@ -939,8 +951,8 @@ public void testNotifyFileCachingCompletedNotAllCached() throws Exception {
}
}

private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlocks)
throws Exception {
private BucketCache testNotifyFileCachingCompletedForTenBlocks(Path filePath,
int totalBlocksToCheck) throws Exception {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
Expand All @@ -954,8 +966,8 @@ private BucketCache testNotifyFileCachingCompleted(Path filePath, int totalBlock
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false, true);
}
bucketCache.notifyFileCachingCompleted(filePath, totalBlocks, totalBlocks,
totalBlocks * constructedBlockSize);
bucketCache.notifyFileCachingCompleted(filePath, totalBlocksToCheck, totalBlocksToCheck,
totalBlocksToCheck * constructedBlockSize);
return bucketCache;
}
}

0 comments on commit 182f516

Please sign in to comment.