Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28535: Add a region-server wide key to enable data-tiering. #5856

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,10 @@ void freeSpace(final String why) {

// Check the list of files to determine the cold files which can be readily evicted.
Map<String, String> coldFiles = null;
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();

DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null) {
coldFiles = dataTieringManager.getColdFilesList();
} catch (IllegalStateException e) {
LOG.warn("Data Tiering Manager is not set. Ignore time-based block evictions.");
}
// Scan entire map putting bucket entry into appropriate bucket entry
// group
Expand Down Expand Up @@ -2195,16 +2194,11 @@ public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
@Override
public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
String fileName = hFileInfo.getHFileContext().getHFileName();
try {
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (!dataTieringManager.isHotData(hFileInfo, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
return Optional.of(false);
}
} catch (IllegalStateException e) {
LOG.error("Error while getting DataTieringManager instance: {}", e.getMessage());
DataTieringManager dataTieringManager = DataTieringManager.getInstance();
if (dataTieringManager != null && !dataTieringManager.isHotData(hFileInfo, conf)) {
LOG.debug("Data tiering is enabled for file: '{}' and it is not hot data", fileName);
return Optional.of(false);
}

// if we don't have the file in fullyCachedFiles, we should cache it
return Optional.of(!fullyCachedFiles.containsKey(fileName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
@InterfaceAudience.Private
public class DataTieringManager {
private static final Logger LOG = LoggerFactory.getLogger(DataTieringManager.class);
public static final String GLOBAL_DATA_TIERING_ENABLED_KEY =
"hbase.regionserver.datatiering.enable";
public static final boolean DEFAULT_GLOBAL_DATA_TIERING_ENABLED = false; // disabled by default
public static final String DATATIERING_KEY = "hbase.hstore.datatiering.type";
public static final String DATATIERING_HOT_DATA_AGE_KEY =
"hbase.hstore.datatiering.hot.age.millis";
Expand All @@ -58,28 +61,29 @@ private DataTieringManager(Map<String, HRegion> onlineRegions) {
}

/**
* Initializes the DataTieringManager instance with the provided map of online regions.
* Initializes the DataTieringManager instance with the provided map of online regions, only if
* the configuration "hbase.regionserver.datatiering.enable" is enabled.
* @param conf Configuration object.
* @param onlineRegions A map containing online regions.
* @return True if the instance is instantiated successfully, false otherwise.
*/
public static synchronized void instantiate(Map<String, HRegion> onlineRegions) {
if (instance == null) {
public static synchronized boolean instantiate(Configuration conf,
Map<String, HRegion> onlineRegions) {
if (isDataTieringFeatureEnabled(conf) && instance == null) {
instance = new DataTieringManager(onlineRegions);
LOG.info("DataTieringManager instantiated successfully.");
return true;
} else {
LOG.warn("DataTieringManager is already instantiated.");
}
return false;
}

/**
* Retrieves the instance of DataTieringManager.
* @return The instance of DataTieringManager.
* @throws IllegalStateException if DataTieringManager has not been instantiated.
* @return The instance of DataTieringManager, if instantiated, null otherwise.
*/
public static synchronized DataTieringManager getInstance() {
if (instance == null) {
throw new IllegalStateException(
"DataTieringManager has not been instantiated. Call instantiate() first.");
}
return instance;
}

Expand Down Expand Up @@ -308,4 +312,14 @@ public Map<String, String> getColdFilesList() {
}
return coldFiles;
}

private static boolean isDataTieringFeatureEnabled(Configuration conf) {
return conf.getBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY,
DataTieringManager.DEFAULT_GLOBAL_DATA_TIERING_ENABLED);
}

// Resets the instance to null. To be used only for testing.
public static void resetForTestingOnly() {
instance = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,10 @@ public HRegionServer(final Configuration conf) throws IOException {
regionServerAccounting = new RegionServerAccounting(conf);

blockCache = BlockCacheFactory.createBlockCache(conf);
DataTieringManager.instantiate(onlineRegions);
// The call below, instantiates the DataTieringManager only when
// the configuration "hbase.regionserver.datatiering.enable" is set to true.
DataTieringManager.instantiate(conf, onlineRegions);

mobFileCache = new MobFileCache(conf);

rsSnapshotVerifier = new RSSnapshotVerifier(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -64,6 +66,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to test the functionality of the DataTieringManager.
Expand Down Expand Up @@ -91,6 +95,7 @@ public class TestDataTieringManager {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDataTieringManager.class);

private static final Logger LOG = LoggerFactory.getLogger(TestDataTieringManager.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static Configuration defaultConf;
private static FileSystem fs;
Expand All @@ -109,10 +114,11 @@ public static void setupBeforeClass() throws Exception {
defaultConf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
defaultConf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
defaultConf.setLong(BUCKET_CACHE_SIZE_KEY, 32);
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
fs = HFileSystem.get(defaultConf);
blockCache = BlockCacheFactory.createBlockCache(defaultConf);
cacheConf = new CacheConfig(defaultConf, blockCache);
DataTieringManager.instantiate(testOnlineRegions);
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
setupOnlineRegions();
dataTieringManager = DataTieringManager.getInstance();
}
Expand Down Expand Up @@ -268,9 +274,9 @@ public void testBlockEvictions() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with cold data files and a block with hot data.
// hStoreFiles.get(3) is a cold data file, while hStoreFiles.get(0) is a hot file.
Expand Down Expand Up @@ -318,9 +324,9 @@ public void testBlockEvictionsAllColdBlocks() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with three cold data blocks.
// hStoreFiles.get(3) is a cold data file.
Expand Down Expand Up @@ -365,9 +371,9 @@ public void testBlockEvictionsHotBlocks() throws Exception {
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
Expand Down Expand Up @@ -402,11 +408,74 @@ public void testBlockEvictionsHotBlocks() throws Exception {
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 2, 0);
}

@Test
public void testFeatureKeyDisabled() throws Exception {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, false);
try {
assertFalse(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
// Verify that the DataaTieringManager instance is not instantiated in the
// instantiate call above.
assertNull(DataTieringManager.getInstance());

// Also validate that data temperature is not honoured.
long capacitySize = 40 * 1024;
int writeThreads = 3;
int writerQLen = 64;
int[] bucketSizes = new int[] { 8 * 1024 + 1024 };

// Setup: Create a bucket cache with lower capacity
BucketCache bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 8192, bucketSizes,
writeThreads, writerQLen, null, DEFAULT_ERROR_TOLERATION_DURATION, defaultConf);

// Create three Cache keys with two hot data blocks and one cold data block
// hStoreFiles.get(0) is a hot data file and hStoreFiles.get(3) is a cold data file.
List<BlockCacheKey> cacheKeys = new ArrayList<>();
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 0, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(0).getPath(), 8192, true, BlockType.DATA));
cacheKeys.add(new BlockCacheKey(hStoreFiles.get(3).getPath(), 0, true, BlockType.DATA));

// Create dummy data to be cached and fill the cache completely.
CacheTestUtils.HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(8192, 3);

int blocksIter = 0;
for (BlockCacheKey key : cacheKeys) {
LOG.info("Adding {}", key);
Copy link
Contributor Author

@jhungund jhungund Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test seems to work with the latest fixes. However, I am keeping these logs handy until the flakyness is fixed.

bucketCache.cacheBlock(key, blocks[blocksIter++].getBlock());
// Ensure that the block is persisted to the file.
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(key)));
}

// Verify that the bucket cache contains 3 blocks.
assertEquals(3, bucketCache.getBackingMap().keySet().size());

// Add an additional hot block, which triggers eviction.
BlockCacheKey newKey =
new BlockCacheKey(hStoreFiles.get(2).getPath(), 0, true, BlockType.DATA);
CacheTestUtils.HFileBlockPair[] newBlock = CacheTestUtils.generateHFileBlocks(8192, 1);

bucketCache.cacheBlock(newKey, newBlock[0].getBlock());
Waiter.waitFor(defaultConf, 10000, 100,
() -> (bucketCache.getBackingMap().containsKey(newKey)));

// Verify that the bucket still contains the only cold block and one newly added hot block.
// The older hot blocks are evicted and data-tiering mechanism does not kick in to evict
// the cold block.
validateBlocks(bucketCache.getBackingMap().keySet(), 2, 1, 1);
} finally {
DataTieringManager.resetForTestingOnly();
defaultConf.setBoolean(DataTieringManager.GLOBAL_DATA_TIERING_ENABLED_KEY, true);
assertTrue(DataTieringManager.instantiate(defaultConf, testOnlineRegions));
}
}

private void validateBlocks(Set<BlockCacheKey> keys, int expectedTotalKeys, int expectedHotBlocks,
int expectedColdBlocks) {
int numHotBlocks = 0, numColdBlocks = 0;

assertEquals(expectedTotalKeys, keys.size());
Waiter.waitFor(defaultConf, 10000, 100, () -> (expectedTotalKeys == keys.size()));
int iter = 0;
for (BlockCacheKey key : keys) {
try {
Expand Down