diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index bb57fbe06b60..7dbf4f8ba2d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -81,6 +81,12 @@ public class CacheConfig { */ public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * Configuration key to cache blocks when a compacted file is written + */ + public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY = + "hbase.rs.cachecompactedblocksonwrite"; + public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; @@ -93,6 +99,7 @@ public class CacheConfig { public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; + public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false; public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; /** @@ -124,6 +131,11 @@ public class CacheConfig { /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; + /** + * Whether data blocks should be cached when compacted file is written + */ + private final boolean cacheCompactedDataOnWrite; + private final boolean dropBehindCompaction; // Local reference to the block cache @@ -174,6 +186,8 @@ public CacheConfig(Configuration conf, ColumnFamilyDescriptor family, BlockCache (family == null ? false : family.isEvictBlocksOnClose()); this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || (family == null ? false : family.isPrefetchBlocksOnOpen()); + this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, + DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE); this.blockCache = blockCache; this.byteBuffAllocator = byteBuffAllocator; LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) + @@ -193,6 +207,7 @@ public CacheConfig(CacheConfig cacheConf) { this.evictOnClose = cacheConf.evictOnClose; this.cacheDataCompressed = cacheConf.cacheDataCompressed; this.prefetchOnOpen = cacheConf.prefetchOnOpen; + this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite; this.dropBehindCompaction = cacheConf.dropBehindCompaction; this.blockCache = cacheConf.blockCache; this.byteBuffAllocator = cacheConf.byteBuffAllocator; @@ -207,6 +222,7 @@ private CacheConfig() { this.evictOnClose = false; this.cacheDataCompressed = false; this.prefetchOnOpen = false; + this.cacheCompactedDataOnWrite = false; this.dropBehindCompaction = false; this.blockCache = null; this.byteBuffAllocator = ByteBuffAllocator.HEAP; @@ -319,6 +335,13 @@ public boolean shouldPrefetchOnOpen() { return this.prefetchOnOpen; } + /** + * @return true if blocks should be cached while writing during compaction, false if not + */ + public boolean shouldCacheCompactedBlocksOnWrite() { + return this.cacheCompactedDataOnWrite; + } + /** * Return true if we may find this type of block in block cache. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c7ecfca9682d..0a3b59f62c66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1118,9 +1118,9 @@ public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { - // Don't cache data on write on compactions. + // Don't cache data on write on compactions, unless specifically configured to do so writerCacheConf = new CacheConfig(cacheConf); - writerCacheConf.setCacheDataOnWrite(false); + writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite()); } else { writerCacheConf = cacheConf; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 3a769b09ba9f..8196787781c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -405,59 +407,82 @@ private void writeStoreFile(boolean useTags) throws IOException { storeFilePath = sfw.getPath(); } - private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) + private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, boolean cacheBlocksOnCompaction) throws IOException, InterruptedException { - // TODO: need to change this test if we add a cache size threshold for - // compactions, or if we implement some other kind of intelligent logic for - // deciding what blocks to cache-on-write on compaction. - final String table = "CompactionCacheOnWrite"; - final String cf = "myCF"; - final byte[] cfBytes = Bytes.toBytes(cf); - final int maxVersions = 3; - ColumnFamilyDescriptor cfd = - ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress) - .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) - .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); - HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); - int rowIdx = 0; - long ts = EnvironmentEdgeManager.currentTime(); - for (int iFile = 0; iFile < 5; ++iFile) { - for (int iRow = 0; iRow < 500; ++iRow) { - String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + - iRow; - Put p = new Put(Bytes.toBytes(rowStr)); - ++rowIdx; - for (int iCol = 0; iCol < 10; ++iCol) { - String qualStr = "col" + iCol; - String valueStr = "value_" + rowStr + "_" + qualStr; - for (int iTS = 0; iTS < 5; ++iTS) { - if (useTags) { - Tag t = new ArrayBackedTag((byte) 1, "visibility"); - Tag[] tags = new Tag[1]; - tags[0] = t; - KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), - HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); - p.add(kv); - } else { - p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); + // create a localConf + Configuration localConf = new Configuration(conf); + try { + // Set the conf if testing caching compacted blocks on write + conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, cacheBlocksOnCompaction); + + // TODO: need to change this test if we add a cache size threshold for + // compactions, or if we implement some other kind of intelligent logic for + // deciding what blocks to cache-on-write on compaction. + final String table = "CompactionCacheOnWrite"; + final String cf = "myCF"; + final byte[] cfBytes = Bytes.toBytes(cf); + final int maxVersions = 3; + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes) + .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) + .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); + HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); + int rowIdx = 0; + long ts = EnvironmentEdgeManager.currentTime(); + for (int iFile = 0; iFile < 5; ++iFile) { + for (int iRow = 0; iRow < 500; ++iRow) { + String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; + Put p = new Put(Bytes.toBytes(rowStr)); + ++rowIdx; + for (int iCol = 0; iCol < 10; ++iCol) { + String qualStr = "col" + iCol; + String valueStr = "value_" + rowStr + "_" + qualStr; + for (int iTS = 0; iTS < 5; ++iTS) { + if (useTags) { + Tag t = new ArrayBackedTag((byte) 1, "visibility"); + Tag[] tags = new Tag[1]; + tags[0] = t; + KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), + HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); + p.add(kv); + } else { + p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); + } } } + p.setDurability(Durability.ASYNC_WAL); + region.put(p); + } + region.flush(true); + } + + clearBlockCache(blockCache); + assertEquals(0, blockCache.getBlockCount()); + + region.compact(false); + LOG.debug("compactStores() returned"); + + boolean dataBlockCached = false; + for (CachedBlock block : blockCache) { + if (BlockType.ENCODED_DATA.equals(block.getBlockType()) + || BlockType.DATA.equals(block.getBlockType())) { + dataBlockCached = true; + break; } - p.setDurability(Durability.ASYNC_WAL); - region.put(p); } - region.flush(true); - } - clearBlockCache(blockCache); - assertEquals(0, blockCache.getBlockCount()); - region.compact(false); - LOG.debug("compactStores() returned"); - for (CachedBlock block: blockCache) { - assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); - assertNotEquals(BlockType.DATA, block.getBlockType()); + // Data blocks should be cached in instances where we are caching blocks on write. In the case + // of testing + // BucketCache, we cannot verify block type as it is not stored in the cache. + assertTrue( + "\nTest description: " + testDescription + "\nprefetchCompactedBlocksOnWrite: " + + cacheBlocksOnCompaction + "\n", + (cacheBlocksOnCompaction && !(blockCache instanceof BucketCache)) == dataBlockCached); + + region.close(); + } finally { + // reset back + conf = new Configuration(localConf); } - region.close(); } @Test @@ -467,8 +492,8 @@ public void testStoreFileCacheOnWrite() throws IOException { } @Test - public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { - testNotCachingDataBlocksDuringCompactionInternals(false); - testNotCachingDataBlocksDuringCompactionInternals(true); + public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { + testCachingDataBlocksDuringCompactionInternals(false, false); + testCachingDataBlocksDuringCompactionInternals(true, true); } }