Skip to content

Commit

Permalink
HBASE-28468: Integrate the data-tiering logic into cache evictions. (#…
Browse files Browse the repository at this point in the history
…5829)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
jhungund authored and wchevreuil committed May 20, 2024
1 parent 84b3f19 commit c1bebbb
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ void freeSpace(final String why) {
long bytesToFreeWithExtra =
(long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));

long bytesFreed = 0;
// Instantiate priority buckets
BucketEntryGroup bucketSingle =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(singleFactor));
Expand All @@ -982,9 +983,36 @@ void freeSpace(final String why) {
BucketEntryGroup bucketMemory =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize, getPartitionSize(memoryFactor));

// Check the list of files to determine the cold files which can be readily evicted.
Map<String, String> coldFiles = null;
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
coldFiles = dataTieringManager.getColdFilesList();
} catch (IllegalStateException e) {
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
}
// Scan entire map putting bucket entry into appropriate bucket entry
// group
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey : backingMap.entrySet()) {
if (
coldFiles != null && coldFiles.containsKey(bucketEntryWithKey.getKey().getHfileName())
) {
int freedBlockSize = bucketEntryWithKey.getValue().getLength();
if (evictBlockIfNoRpcReferenced(bucketEntryWithKey.getKey())) {
bytesFreed += freedBlockSize;
}
if (bytesFreed >= bytesToFreeWithExtra) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Bucket cache free space completed; required: {} freed: {} from cold data blocks.",
bytesToFreeWithExtra, StringUtils.byteDesc(bytesFreed));
}
// Sufficient bytes have been freed.
return;
}
continue;
}

switch (bucketEntryWithKey.getValue().getPriority()) {
case SINGLE: {
bucketSingle.add(bucketEntryWithKey);
Expand All @@ -1001,6 +1029,21 @@ void freeSpace(final String why) {
}
}

// Check if the cold file eviction is sufficient to create enough space.
bytesToFreeWithExtra -= bytesFreed;
if (bytesToFreeWithExtra <= 0) {
LOG.debug("Bucket cache free space completed; freed space : {} bytes of cold data blocks.",
StringUtils.byteDesc(bytesFreed));
return;
}

if (LOG.isDebugEnabled()) {
LOG.debug(
"Bucket cache free space completed; freed space : {} "
+ "bytes of cold data blocks. {} more bytes required to be freed.",
StringUtils.byteDesc(bytesFreed), bytesToFreeWithExtra);
}

PriorityQueue<BucketEntryGroup> bucketQueue =
new PriorityQueue<>(3, Comparator.comparingLong(BucketEntryGroup::overflow));

Expand All @@ -1009,8 +1052,6 @@ void freeSpace(final String why) {
bucketQueue.add(bucketMemory);

int remainingBuckets = bucketQueue.size();
long bytesFreed = 0;

BucketEntryGroup bucketGroup;
while ((bucketGroup = bucketQueue.poll()) != null) {
long overflow = bucketGroup.overflow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -173,12 +174,12 @@ private boolean hotDataValidator(long maxTimestamp, long hotDataAge) {
private long getMaxTimestamp(Path hFilePath) throws DataTieringException {
HStoreFile hStoreFile = getHStoreFile(hFilePath);
if (hStoreFile == null) {
LOG.error("HStoreFile corresponding to " + hFilePath + " doesn't exist");
LOG.error("HStoreFile corresponding to {} doesn't exist", hFilePath);
return Long.MAX_VALUE;
}
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
if (!maxTimestamp.isPresent()) {
LOG.error("Maximum timestamp not present for " + hFilePath);
LOG.error("Maximum timestamp not present for {}", hFilePath);
return Long.MAX_VALUE;
}
return maxTimestamp.getAsLong();
Expand Down Expand Up @@ -270,4 +271,41 @@ private long getDataTieringHotDataAge(Configuration conf) {
return Long.parseLong(
conf.get(DATATIERING_HOT_DATA_AGE_KEY, String.valueOf(DEFAULT_DATATIERING_HOT_DATA_AGE)));
}

/*
* This API traverses through the list of online regions and returns a subset of these files-names
* that are cold.
* @return List of names of files with cold data as per data-tiering logic.
*/
public Map<String, String> getColdFilesList() {
Map<String, String> coldFiles = new HashMap<>();
for (HRegion r : this.onlineRegions.values()) {
for (HStore hStore : r.getStores()) {
Configuration conf = hStore.getReadOnlyConfiguration();
if (getDataTieringType(conf) != DataTieringType.TIME_RANGE) {
// Data-Tiering not enabled for the store. Just skip it.
continue;
}
Long hotDataAge = getDataTieringHotDataAge(conf);

for (HStoreFile hStoreFile : hStore.getStorefiles()) {
String hFileName =
hStoreFile.getFileInfo().getHFileInfo().getHFileContext().getHFileName();
OptionalLong maxTimestamp = hStoreFile.getMaximumTimestamp();
if (!maxTimestamp.isPresent()) {
LOG.warn("maxTimestamp missing for file: {}",
hStoreFile.getFileInfo().getActiveFileName());
continue;
}
long currentTimestamp = EnvironmentEdgeManager.getDelegate().currentTime();
long fileAge = currentTimestamp - maxTimestamp.getAsLong();
if (fileAge > hotDataAge) {
// Values do not matter.
coldFiles.put(hFileName, null);
}
}
}
}
return coldFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ public void testPrefetchWithDelay() throws Exception {
Thread.sleep(20000);
assertFalse("Prefetch threads should not be running at this point", reader.prefetchStarted());
while (!reader.prefetchStarted()) {
assertTrue("Prefetch delay has not been expired yet",
getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
// Wait until the prefetch is triggered.
Thread.sleep(500);
}
if (reader.prefetchStarted()) {
// Added some delay as we have started the timer a bit late.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -51,7 +52,9 @@
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -245,6 +248,181 @@ public void testColdDataFiles() {
}
}

@Test
public void testPickColdDataFiles() {
Map<String, String> coldDataFiles = dataTieringManager.getColdFilesList();
assertEquals(1, coldDataFiles.size());
// hStoreFiles[3] is the cold file.
assert (coldDataFiles.containsKey(hStoreFiles.get(3).getFileInfo().getActiveFileName()));
}

/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity. The hot file
* remains in the cache.
*/
@Test
public void testBlockEvictions() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with cold data files and a block with hot data.
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 2 hot blocks blocks only.
// Both cold blocks of 8KB will be evicted to make room for 1 block of 8KB + an additional
// space.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

/*
* Verify that two cold blocks(both) are evicted when bucket reaches its capacity, but one cold
* block remains in the cache since the required space is freed.
*/
@Test
public void testBlockEvictionsAllColdBlocks() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with three cold data blocks.
// hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 16384, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block into cache with hot data which should trigger the eviction
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 1 cold block and a newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
}

/*
* Verify that a hot block evicted along with a cold block when bucket reaches its capacity.
*/
@Test
public void testBlockEvictionsHotBlocks() throws Exception {
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
Set<BlockCacheKey> cacheKeys = new HashSet<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100, () -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional block which should evict the only cold block with an additional hot block.
BlockCacheKey newKey = new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket cache now contains 2 hot blocks.
// Only one of the older hot blocks is retained and other one is the newly added hot block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
int expectedColdBlocks) {
int numHotBlocks = 0, numColdBlocks = 0;

assertEquals(expectedTotalKeys, keys.size());
int iter = 0;
for (BlockCacheKey key : keys) {
try {
if (dataTieringManager.isHotData(key)) {
numHotBlocks++;
} else {
numColdBlocks++;
}
} catch (Exception e) {
fail("Unexpected exception!");
}
}
assertEquals(expectedHotBlocks, numHotBlocks);
assertEquals(expectedColdBlocks, numColdBlocks);
}

private void testDataTieringMethodWithPath(DataTieringMethodCallerWithPath caller, Path path,
boolean expectedResult, DataTieringException exception) {
try {
Expand Down

0 comments on commit c1bebbb

Please sign in to comment.