Skip to content

Commit

Permalink
HBASE-28535: Add a region-server wide key to enable data-tiering. (#5856
Browse files Browse the repository at this point in the history
)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
jhungund authored May 2, 2024
1 parent f890281 commit 4dee532
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,10 @@ void freeSpace(final String why) {

// Check the list of files to determine the cold files which can be readily evicted.
Map<String, String> coldFiles = null;
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();

DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null) {
coldFiles = dataTieringManager.getColdFilesList();
} catch (IllegalStateException e) {
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
}
// Scan entire map putting bucket entry into appropriate bucket entry
// group
Expand Down Expand Up @@ -2195,16 +2194,11 @@ public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
@Override
public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
String fileName = hFileInfo.getHFileContext().getHFileName();
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (!dataTieringManager.isHotData(hFileInfo, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
return Optional.of(false);
}
} catch (IllegalStateException e) {
LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage());
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
return Optional.of(false);
}

// if we don't have the file in fullyCachedFiles, we should cache it
return Optional.of(!fullyCachedFiles.containsKey(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
@InterfaceAudience.Private
public class DataTieringManager {
private static final Logger LOG = LoggerFactory.getLogger(DataTieringManager.class);
public static final String GLOBAL_DATA_TIERING_ENABLED_KEY =
"hbase.regionserver.datatiering.enable";
public static final boolean DEFAULT_GLOBAL_DATA_TIERING_ENABLED = false; // disabled by default
public static final String DATATIERING_KEY = "hbase.hstore.datatiering.type";
public static final String DATATIERING_HOT_DATA_AGE_KEY =
"hbase.hstore.datatiering.hot.age.millis";
Expand All @@ -58,28 +61,29 @@ private DataTieringManager(Map<String, HRegion> onlineRegions) {
}

/**
* Initializes the DataTieringManager instance with the provided map of online regions.
* Initializes the DataTieringManager instance with the provided map of online regions, only if
* the configuration "hbase.regionserver.datatiering.enable" is enabled.
* @param conf Configuration object.
* @param onlineRegions A map containing online regions.
* @return True if the instance is instantiated successfully, false otherwise.
*/
public static synchronized void instantiate(Map<String, HRegion> onlineRegions) {
if (instance == null) {
public static synchronized boolean instantiate(Configuration conf,
Map<String, HRegion> onlineRegions) {
if (isDataTieringFeatureEnabled(conf) && instance == null) {
instance = new DataTieringManager(onlineRegions);
LOG.info("DataTieringManager instantiated successfully.");
return true;
} else {
LOG.warn("DataTieringManager is already instantiated.");
}
return false;
}

/**
* Retrieves the instance of DataTieringManager.
* @return The instance of DataTieringManager.
* @throws IllegalStateException if DataTieringManager has not been instantiated.
* @return The instance of DataTieringManager, if instantiated, null otherwise.
*/
public static synchronized DataTieringManager getInstance() {
if (instance == null) {
throw new IllegalStateException(
"DataTieringManager has not been instantiated. Call instantiate() first.");
}
return instance;
}

Expand Down Expand Up @@ -308,4 +312,14 @@ public Map<String, String> getColdFilesList() {
}
return coldFiles;
}

private static boolean isDataTieringFeatureEnabled(Configuration conf) {
return conf.getBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY,
DataTieringManager.DEFAULT_GLOBAL_DATA_TIERING_ENABLED);
}

// Resets the instance to null. To be used only for testing.
public static void resetForTestingOnly() {
instance = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,10 @@ public HRegionServer(final Configuration conf) throws IOException {
regionServerAccounting = new RegionServerAccounting(conf);

blockCache = BlockCacheFactory.createBlockCache(conf);
DataTieringManager.instantiate(onlineRegions);
// The call below, instantiates the DataTieringManager only when
// the configuration "hbase.regionserver.datatiering.enable" is set to true.
DataTieringManager.instantiate(conf, onlineRegions);

mobFileCache = new MobFileCache(conf);

rsSnapshotVerifier = new RSSnapshotVerifier(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -64,6 +66,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to test the functionality of the DataTieringManager.
Expand Down Expand Up @@ -91,6 +95,7 @@ public class TestDataTieringManager {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDataTieringManager.class);

private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static Configuration defaultConf;
private static FileSystem fs;
Expand All @@ -109,10 +114,11 @@ public static void setupBeforeClass() throws Exception {
defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32);
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
fs = HFileSystem.get(defaultConf);
blockCache = BlockCacheFactory.createBlockCache(defaultConf);
cacheConf = new CacheConfig(defaultConf, blockCache);
DataTieringManager.instantiate(testOnlineRegions);
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
setupOnlineRegions();
dataTieringManager = DataTieringManager.getInstance();
}
Expand Down Expand Up @@ -268,9 +274,9 @@ public void testBlockEvictions() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with cold data files and a block with hot data.
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
Expand Down Expand Up @@ -318,9 +324,9 @@ public void testBlockEvictionsAllColdBlocks() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with three cold data blocks.
// hStoreFiles.get(3) is a cold data file.
Expand Down Expand Up @@ -365,9 +371,9 @@ public void testBlockEvictionsHotBlocks() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
Expand Down Expand Up @@ -402,11 +408,74 @@ public void testBlockEvictionsHotBlocks() throws Exception {
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

@Test
public void testFeatureKeyDisabled() throws Exception {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false);
try {
assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
// Verify that the DataaTieringManager instance is not instantiated in the
// instantiate call above.
assertNull(DataTieringManager.getInstance());

// Also validate that data temperature is not honoured.
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
List<BlockCacheKey> cacheKeys = new ArrayList<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
LOG.info("Adding {}", key);
bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional hot block, which triggers eviction.
BlockCacheKey newKey =
new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket still contains the only cold block and one newly added hot block.
// The older hot blocks are evicted and data-tiering mechanism does not kick in to evict
// the cold block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
} finally {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
}
}

private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
int expectedColdBlocks) {
int numHotBlocks = 0, numColdBlocks = 0;

assertEquals(expectedTotalKeys, keys.size());
Waiter.waitFor(defaultConf, 10000, 100, () -> (expectedTotalKeys == keys.size()));
int iter = 0;
for (BlockCacheKey key : keys) {
try {
Expand Down

0 comments on commit 4dee532

Please sign in to comment.