Skip to content

Commit

Permalink
HBASE-28303 Interrupt cache prefetch thread when a heap usage thresho…
Browse files Browse the repository at this point in the history
…ld is reached (#5615)

Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
Signed-off-by: Peter Somogyi <psomogyi@apache.org>
  • Loading branch information
wchevreuil authored Jan 26, 2024
1 parent 1c5b5ff commit 73cb0dd
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class CacheConfig {
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
"hbase.bucketcache.persist.intervalinmillis";

/**
* Configuration key to set the heap usage threshold limit once prefetch threads should be
* interrupted.
*/
public static final String PREFETCH_HEAP_USAGE_THRESHOLD = "hbase.rs.prefetchheapusage";

// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
Expand All @@ -111,6 +117,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = Long.MAX_VALUE;
public static final double DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD = 1d;

/**
* Whether blocks should be cached on read (default is on if there is a cache but this can be
Expand Down Expand Up @@ -157,6 +164,8 @@ public class CacheConfig {

private final ByteBuffAllocator byteBuffAllocator;

private final double heapUsageThreshold;

/**
* Create a cache configuration using the specified configuration object and defaults for family
* level settings. Only use if no column family context.
Expand Down Expand Up @@ -201,6 +210,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
this.cacheCompactedDataOnWrite =
conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
this.cacheCompactedDataOnWriteThreshold = getCacheCompactedBlocksOnWriteThreshold(conf);
this.heapUsageThreshold =
conf.getDouble(PREFETCH_HEAP_USAGE_THRESHOLD, DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD);
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
}
Expand All @@ -222,6 +233,7 @@ public CacheConfig(CacheConfig cacheConf) {
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
this.heapUsageThreshold = cacheConf.heapUsageThreshold;
}

private CacheConfig() {
Expand All @@ -237,6 +249,7 @@ private CacheConfig() {
this.dropBehindCompaction = false;
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD;
}

/**
Expand Down Expand Up @@ -386,6 +399,17 @@ public boolean shouldReadBlockFromCache(BlockType blockType) {
return false;
}

/**
* Checks if the current heap usage is below the threshold configured by
* "hbase.rs.prefetchheapusage" (0.8 by default).
*/
public boolean isHeapUsageBelowThreshold() {
double total = Runtime.getRuntime().maxMemory();
double available = Runtime.getRuntime().freeMemory();
double usedRatio = 1d - (available / total);
return heapUsageThreshold > usedRatio;
}

/**
* If we make sure the block could not be cached, we will not acquire the lock otherwise we will
* acquire lock
Expand Down Expand Up @@ -413,6 +437,10 @@ public ByteBuffAllocator getByteBuffAllocator() {
return this.byteBuffAllocator;
}

public double getHeapUsageThreshold() {
return heapUsageThreshold;
}

private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
long cacheCompactedBlocksOnWriteThreshold =
conf.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,23 @@ public void run() {
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
if (!cacheConf.isInMemory() && !cache.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space.",
path, cacheKey, block.getOnDiskSizeWithHeader());
interrupted = true;
break;
if (!cacheConf.isInMemory()) {
if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
LOG.warn(
"Interrupting prefetch for file {} because block {} of size {} "
+ "doesn't fit in the available cache space.",
path, cacheKey, block.getOnDiskSizeWithHeader());
interrupted = true;
break;
}
if (!cacheConf.isHeapUsageBelowThreshold()) {
LOG.warn(
"Interrupting prefetch because heap usage is above the threshold: {} "
+ "configured via {}",
cacheConf.getHeapUsageThreshold(), CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD);
interrupted = true;
break;
}
}
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -156,6 +157,43 @@ public void testPrefetchBlockCacheDisabled() throws Exception {
poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
}

@Test
public void testPrefetchHeapUsageAboveThreshold() throws Exception {
ColumnFamilyDescriptor columnFamilyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
.setBlockCacheEnabled(true).build();
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
Configuration newConf = new Configuration(conf);
newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
CacheConfig cacheConfig =
new CacheConfig(newConf, columnFamilyDescriptor, blockCache, ByteBuffAllocator.HEAP);
Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", meta, cacheConfig);
MutableInt cachedCount = new MutableInt(0);
MutableInt unCachedCount = new MutableInt(0);
readStoreFile(storeFile, (r, o) -> {
HFileBlock block = null;
try {
block = r.readBlock(o, -1, false, true, false, true, null, null);
} catch (IOException e) {
fail(e.getMessage());
}
return block;
}, (key, block) -> {
boolean isCached = blockCache.getBlock(key, true, false, true) != null;
if (
block.getBlockType() == BlockType.DATA || block.getBlockType() == BlockType.ROOT_INDEX
|| block.getBlockType() == BlockType.INTERMEDIATE_INDEX
) {
if (isCached) {
cachedCount.increment();
} else {
unCachedCount.increment();
}
}
}, cacheConfig);
assertTrue(unCachedCount.compareTo(cachedCount) > 0);
}

@Test
public void testPrefetch() throws Exception {
TraceUtil.trace(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) thr
return conf;
}

public BucketCache setupBucketCache(Configuration conf) throws IOException {
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
public BucketCache setupBucketCache(Configuration conf, String persistentCacheFile)
throws IOException {
BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + persistentCacheFile,
capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
return bucketCache;
}
Expand All @@ -103,7 +104,7 @@ public void cleanupBucketCache(BucketCache bucketCache) throws IOException {
public void testPrefetchPersistenceCrash() throws Exception {
long bucketCachePersistInterval = 3000;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrash");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Expand All @@ -121,7 +122,7 @@ public void testPrefetchPersistenceCrash() throws Exception {
public void testPrefetchPersistenceCrashNegative() throws Exception {
long bucketCachePersistInterval = Long.MAX_VALUE;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchPersistenceCrashNegative");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
Expand All @@ -134,7 +135,7 @@ public void testPrefetchPersistenceCrashNegative() throws Exception {
@Test
public void testPrefetchListUponBlockEviction() throws Exception {
Configuration conf = setupBucketCacheConfig(200);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache = setupBucketCache(conf, "testPrefetchListUponBlockEviction");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Expand All @@ -156,7 +157,8 @@ public void testPrefetchListUponBlockEviction() throws Exception {
@Test
public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception {
Configuration conf = setupBucketCacheConfig(200);
BucketCache bucketCache = setupBucketCache(conf);
BucketCache bucketCache =
setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
Expand Down

0 comments on commit 73cb0dd

Please sign in to comment.