Skip to content

Commit

Permalink
HBASE-28458 BucketCache.notifyFileCachingCompleted may incorrectly co…
Browse files Browse the repository at this point in the history
…nsider a file fully cached (#5777)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
wchevreuil committed Apr 2, 2024
1 parent 9ac23ad commit c4ac2df
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2073,25 +2073,29 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
// so we need to count all blocks for this file in the backing map under
// a read lock for the block offset
final List<ReentrantReadWriteLock> locks = new ArrayList<>();
LOG.debug("Notifying caching completed for file {}, with total blocks {}", fileName,
dataBlockCount);
LOG.debug("Notifying caching completed for file {}, with total blocks {}, and data blocks {}",
fileName, totalBlockCount, dataBlockCount);
try {
final MutableInt count = new MutableInt();
LOG.debug("iterating over {} entries in the backing map", backingMap.size());
backingMap.entrySet().stream().forEach(entry -> {
if (entry.getKey().getHfileName().equals(fileName.getName())) {
if (
entry.getKey().getHfileName().equals(fileName.getName())
&& entry.getKey().getBlockType().equals(BlockType.DATA)
) {
LOG.debug("found block for file {} in the backing map. Acquiring read lock for offset {}",
fileName, entry.getKey().getOffset());
ReentrantReadWriteLock lock = offsetLock.getLock(entry.getKey().getOffset());
lock.readLock().lock();
locks.add(lock);
// rechecks the given key is still there (no eviction happened before the lock acquired)
if (backingMap.containsKey(entry.getKey())) {
count.increment();
}
}
});
// We may either place only data blocks on the BucketCache or all type of blocks
if (dataBlockCount == count.getValue() || totalBlockCount == count.getValue()) {
// BucketCache would only have data blocks
if (dataBlockCount == count.getValue()) {
LOG.debug("File {} has now been fully cached.", fileName);
fileCacheCompleted(fileName, size);
} else {
Expand All @@ -2100,15 +2104,17 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
+ "Total blocks for file: {}. Checking for blocks pending cache in cache writer queue.",
fileName, count.getValue(), dataBlockCount);
if (ramCache.hasBlocksForFile(fileName.getName())) {
for (ReentrantReadWriteLock lock : locks) {
lock.readLock().unlock();
}
LOG.debug("There are still blocks pending caching for file {}. Will sleep 100ms "
+ "and try the verification again.", fileName);
Thread.sleep(100);
notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
} else {
LOG.info(
"We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full?",
count, dataBlockCount, fileName);
LOG.info("We found only {} blocks cached from a total of {} for file {}, "
+ "but no blocks pending caching. Maybe cache is full or evictions "
+ "happened concurrently to cache prefetch.", count, totalBlockCount, fileName);
}
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void setup() throws Exception {
@Test
public void testBlockEvictionOnRegionMove() throws Exception {
// Write to table and flush
TableName tableRegionMove = writeDataToTable();
TableName tableRegionMove = writeDataToTable("testBlockEvictionOnRegionMove");

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionMove).size() == 1
Expand All @@ -115,7 +116,7 @@ public void testBlockEvictionOnRegionMove() throws Exception {
@Test
public void testBlockEvictionOnGracefulStop() throws Exception {
// Write to table and flush
TableName tableRegionClose = writeDataToTable();
TableName tableRegionClose = writeDataToTable("testBlockEvictionOnGracefulStop");

HRegionServer regionServingRS =
cluster.getRegionServer(1).getRegions(tableRegionClose).size() == 1
Expand All @@ -138,8 +139,8 @@ public void testBlockEvictionOnGracefulStop() throws Exception {
assertNotEquals(0, regionServingRS.getBlockCache().get().getBlockCaches()[1].getBlockCount());
}

public TableName writeDataToTable() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf("table1");
public TableName writeDataToTable(String testName) throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(testName + EnvironmentEdgeManager.currentTime());
byte[] row0 = Bytes.toBytes("row1");
byte[] row1 = Bytes.toBytes("row2");
byte[] family = Bytes.toBytes("family");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,21 @@ public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception {
// Load Blocks in cache
Path storeFile = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
HFile.createReader(fs, storeFile, cacheConf, true, conf);
while (bucketCache.backingMap.size() == 0) {
boolean evicted = false;
while (!PrefetchExecutor.isCompleted(storeFile)) {
if (bucketCache.backingMap.size() > 0 && !evicted) {
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
bucketCache.backingMap.entrySet().iterator();
// Evict a data block from cache
Map.Entry<BlockCacheKey, BucketEntry> entry = it.next();
while (it.hasNext() && !evicted) {
if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
evicted = bucketCache.evictBlock(it.next().getKey());
}
}
}
Thread.sleep(10);
}
Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
bucketCache.backingMap.entrySet().iterator();
// Evict Blocks from cache
bucketCache.evictBlock(it.next().getKey());
bucketCache.evictBlock(it.next().getKey());
int retries = 0;
while (!PrefetchExecutor.isCompleted(storeFile) && retries < 5) {
Thread.sleep(500);
retries++;
}
assertTrue(retries < 5);
assertFalse(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
cleanupBucketCache(bucketCache);
}
Expand Down

0 comments on commit c4ac2df

Please sign in to comment.