Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-23066 Allow cache on write during compactions when prefetching is #919

Merged
merged 1 commit into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ public class CacheConfig {
*/
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen";

/**
* Configuration key to cache blocks when a compacted file is written
*/
public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
"hbase.rs.cachecompactedblocksonwrite";

public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";

Expand All @@ -93,6 +99,7 @@ public class CacheConfig {
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;

/**
Expand Down Expand Up @@ -124,6 +131,11 @@ public class CacheConfig {
/** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen;

/**
* Whether data blocks should be cached when compacted file is written
*/
private final boolean cacheCompactedDataOnWrite;

private final boolean dropBehindCompaction;

// Local reference to the block cache
Expand Down Expand Up @@ -174,6 +186,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache
(family == null ? false : family.isEvictBlocksOnClose());
this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
(family == null ? false : family.isPrefetchBlocksOnOpen());
this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
Expand All @@ -193,6 +207,7 @@ public CacheConfig(CacheConfig cacheConf) {
this.evictOnClose = cacheConf.evictOnClose;
this.cacheDataCompressed = cacheConf.cacheDataCompressed;
this.prefetchOnOpen = cacheConf.prefetchOnOpen;
this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite;
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
Expand All @@ -207,6 +222,7 @@ private CacheConfig() {
this.evictOnClose = false;
this.cacheDataCompressed = false;
this.prefetchOnOpen = false;
this.cacheCompactedDataOnWrite = false;
this.dropBehindCompaction = false;
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
Expand Down Expand Up @@ -319,6 +335,13 @@ public boolean shouldPrefetchOnOpen() {
return this.prefetchOnOpen;
}

/**
* @return true if blocks should be cached while writing during compaction, false if not
*/
public boolean shouldCacheCompactedBlocksOnWrite() {
return this.cacheCompactedDataOnWrite;
}

/**
* Return true if we may find this type of block in block cache.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,9 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm
boolean shouldDropBehind) throws IOException {
final CacheConfig writerCacheConf;
if (isCompaction) {
// Don't cache data on write on compactions.
// Don't cache data on write on compactions, unless specifically configured to do so
writerCacheConf = new CacheConfig(cacheConf);
writerCacheConf.setCacheDataOnWrite(false);
writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite());
ramkrish86 marked this conversation as resolved.
Show resolved Hide resolved
} else {
writerCacheConf = cacheConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -405,59 +407,82 @@ private void writeStoreFile(boolean useTags) throws IOException {
storeFilePath = sfw.getPath();
}

private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, boolean cacheBlocksOnCompaction)
throws IOException, InterruptedException {
// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
final String table = "CompactionCacheOnWrite";
final String cf = "myCF";
final byte[] cfBytes = Bytes.toBytes(cf);
final int maxVersions = 3;
ColumnFamilyDescriptor cfd =
ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
.setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
.setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
int rowIdx = 0;
long ts = EnvironmentEdgeManager.currentTime();
for (int iFile = 0; iFile < 5; ++iFile) {
for (int iRow = 0; iRow < 500; ++iRow) {
String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
iRow;
Put p = new Put(Bytes.toBytes(rowStr));
++rowIdx;
for (int iCol = 0; iCol < 10; ++iCol) {
String qualStr = "col" + iCol;
String valueStr = "value_" + rowStr + "_" + qualStr;
for (int iTS = 0; iTS < 5; ++iTS) {
if (useTags) {
Tag t = new ArrayBackedTag((byte) 1, "visibility");
Tag[] tags = new Tag[1];
tags[0] = t;
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
p.add(kv);
} else {
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
// create a localConf
Configuration localConf = new Configuration(conf);
try {
// Set the conf if testing caching compacted blocks on write
conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction);

// TODO: need to change this test if we add a cache size threshold for
// compactions, or if we implement some other kind of intelligent logic for
// deciding what blocks to cache-on-write on compaction.
final String table = "CompactionCacheOnWrite";
final String cf = "myCF";
final byte[] cfBytes = Bytes.toBytes(cf);
final int maxVersions = 3;
ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
.setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
.setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
int rowIdx = 0;
long ts = EnvironmentEdgeManager.currentTime();
for (int iFile = 0; iFile < 5; ++iFile) {
for (int iRow = 0; iRow < 500; ++iRow) {
String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
Put p = new Put(Bytes.toBytes(rowStr));
++rowIdx;
for (int iCol = 0; iCol < 10; ++iCol) {
String qualStr = "col" + iCol;
String valueStr = "value_" + rowStr + "_" + qualStr;
for (int iTS = 0; iTS < 5; ++iTS) {
if (useTags) {
Tag t = new ArrayBackedTag((byte) 1, "visibility");
Tag[] tags = new Tag[1];
tags[0] = t;
KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
p.add(kv);
} else {
p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
}
}
}
p.setDurability(Durability.ASYNC_WAL);
region.put(p);
}
region.flush(true);
}

clearBlockCache(blockCache);
assertEquals(0, blockCache.getBlockCount());

region.compact(false);
LOG.debug("compactStores() returned");

boolean dataBlockCached = false;
for (CachedBlock block : blockCache) {
if (BlockType.ENCODED_DATA.equals(block.getBlockType())
|| BlockType.DATA.equals(block.getBlockType())) {
dataBlockCached = true;
break;
}
p.setDurability(Durability.ASYNC_WAL);
region.put(p);
}
region.flush(true);
}
clearBlockCache(blockCache);
assertEquals(0, blockCache.getBlockCount());
region.compact(false);
LOG.debug("compactStores() returned");

for (CachedBlock block: blockCache) {
assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
assertNotEquals(BlockType.DATA, block.getBlockType());
// Data blocks should be cached in instances where we are caching blocks on write. In the case
// of testing
// BucketCache, we cannot verify block type as it is not stored in the cache.
assertTrue(
"\nTest description: " + testDescription + "\nprefetchCompactedBlocksOnWrite: "
+ cacheBlocksOnCompaction + "\n",
(cacheBlocksOnCompaction && !(blockCache instanceof BucketCache)) == dataBlockCached);

region.close();
} finally {
// reset back
conf = new Configuration(localConf);
}
region.close();
}

@Test
Expand All @@ -467,8 +492,8 @@ public void testStoreFileCacheOnWrite() throws IOException {
}

@Test
public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testNotCachingDataBlocksDuringCompactionInternals(false);
testNotCachingDataBlocksDuringCompactionInternals(true);
public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
testCachingDataBlocksDuringCompactionInternals(false, false);
testCachingDataBlocksDuringCompactionInternals(true, true);
}
}