diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5816b8ff1602..10d0c925a47a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -122,6 +123,8 @@ public class BucketCache implements BlockCache, HeapSize { static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; + static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE = + "hbase.bucketcache.persistence.chunksize"; /** Use strong reference for offsetLock or not */ private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref"; @@ -145,6 +148,8 @@ public class BucketCache implements BlockCache, HeapSize { final static int DEFAULT_WRITER_THREADS = 3; final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; + final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000; + // Store/read block data transient final IOEngine ioEngine; @@ -273,6 +278,8 @@ public class BucketCache implements BlockCache, HeapSize { */ private String algorithm; + private long persistenceChunkSize; + /* Tracing failed Bucket Cache allocations. */ private long allocFailLogPrevTs; // time of previous log event for allocation failure. private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute. @@ -313,6 +320,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck this.queueAdditionWaitTime = conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME); this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000); + this.persistenceChunkSize = + conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE); + if (this.persistenceChunkSize <= 0) { + persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE; + } sanityCheckConfigs(); @@ -1358,8 +1370,8 @@ void persistToFile() throws IOException { } File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime()); try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) { - fos.write(ProtobufMagic.PB_MAGIC); - BucketProtoUtils.toPB(this).writeDelimitedTo(fos); + LOG.debug("Persist in new chunked persistence format."); + persistChunkedBackingMap(fos); } catch (IOException e) { LOG.error("Failed to persist bucket cache to file", e); throw e; @@ -1405,16 +1417,23 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException { throw new IOException("Incorrect number of bytes read while checking for protobuf magic " + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); } - if (!ProtobufMagic.isPBMagicPrefix(pbuf)) { + if (ProtobufMagic.isPBMagicPrefix(pbuf)) { + LOG.info("Reading old format of persistence."); + // The old non-chunked version of backing map persistence. + parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); + } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) { + // The new persistence format of chunked persistence. + LOG.info("Reading new chunked format of persistence."); + retrieveChunkedBackingMap(in, bucketSizes); + } else { // In 3.0 we have enough flexibility to dump the old cache data. // TODO: In 2.x line, this might need to be filled in to support reading the old format throw new IOException( "Persistence file does not start with protobuf magic number. " + persistencePath); } - parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); blockNumber.add(backingMap.size()); - LOG.info("Bucket cache retrieved from file successfully"); + LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size()); } } @@ -1457,6 +1476,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String } } + private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) { + try { + if (proto.hasChecksum()) { + ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), + algorithm); + } + backingMapValidated.set(true); + } catch (IOException e) { + LOG.warn("Checksum for cache file failed. " + + "We need to validate each cache key in the backing map. " + + "This may take some time, so we'll do it in a background thread,"); + + Runnable cacheValidator = () -> { + while (bucketAllocator == null) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + long startTime = EnvironmentEdgeManager.currentTime(); + int totalKeysOriginally = backingMap.size(); + for (Map.Entry keyEntry : backingMap.entrySet()) { + try { + ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); + } catch (IOException e1) { + LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); + evictBlock(keyEntry.getKey()); + fileNotFullyCached(keyEntry.getKey().getHfileName()); + } + } + backingMapValidated.set(true); + LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", + totalKeysOriginally, backingMap.size(), + (EnvironmentEdgeManager.currentTime() - startTime)); + }; + Thread t = new Thread(cacheValidator); + t.setDaemon(true); + t.start(); + } + } + + private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk, + List chunks) throws IOException { + fullyCachedFiles.clear(); + Pair, NavigableSet> pair = + BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(), + this::createRecycler); + backingMap.putAll(pair.getFirst()); + blocksByHFile.addAll(pair.getSecond()); + fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap())); + + LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", backingMap.size(), + fullyCachedFiles.size()); + int i = 1; + for (BucketCacheProtos.BackingMap chunk : chunks) { + Pair, NavigableSet> pair2 = + BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, this::createRecycler); + backingMap.putAll(pair2.getFirst()); + blocksByHFile.addAll(pair2.getSecond()); + LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", ++i, + backingMap.size(), fullyCachedFiles.size()); + } + verifyFileIntegrity(firstChunk); + verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(), + firstChunk.getMapClass()); + updateRegionSizeMapWhileRetrievingFromFile(); + } + private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { Pair, NavigableSet> pair = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(), @@ -1465,52 +1553,60 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio blocksByHFile = pair.getSecond(); fullyCachedFiles.clear(); fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap())); - if (proto.hasChecksum()) { - try { - ((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(), - algorithm); - backingMapValidated.set(true); - } catch (IOException e) { - LOG.warn("Checksum for cache file failed. " - + "We need to validate each cache key in the backing map. " - + "This may take some time, so we'll do it in a background thread,"); - Runnable cacheValidator = () -> { - while (bucketAllocator == null) { - try { - Thread.sleep(50); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - long startTime = EnvironmentEdgeManager.currentTime(); - int totalKeysOriginally = backingMap.size(); - for (Map.Entry keyEntry : backingMap.entrySet()) { - try { - ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue()); - } catch (IOException e1) { - LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey()); - evictBlock(keyEntry.getKey()); - fullyCachedFiles.remove(keyEntry.getKey().getHfileName()); - } - } - backingMapValidated.set(true); - LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.", - totalKeysOriginally, backingMap.size(), - (EnvironmentEdgeManager.currentTime() - startTime)); - }; - Thread t = new Thread(cacheValidator); - t.setDaemon(true); - t.start(); - } - } else { - // if has not checksum, it means the persistence file is old format - LOG.info("Persistent file is old format, it does not support verifying file integrity!"); - backingMapValidated.set(true); - } + verifyFileIntegrity(proto); updateRegionSizeMapWhileRetrievingFromFile(); verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); } + private void persistChunkedBackingMap(FileOutputStream fos) throws IOException { + long numChunks = backingMap.size() / persistenceChunkSize; + if (backingMap.size() % persistenceChunkSize != 0) { + numChunks += 1; + } + + LOG.debug( + "persistToFile: before persisting backing map size: {}, " + + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}", + backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks); + + BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks); + + LOG.debug( + "persistToFile: after persisting backing map size: {}, " + + "fullycachedFiles size: {}, numChunksPersisteed: {}", + backingMap.size(), fullyCachedFiles.size(), numChunks); + } + + private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException { + byte[] bytes = new byte[Long.BYTES]; + int readSize = in.read(bytes); + if (readSize != Long.BYTES) { + throw new IOException("Invalid size of chunk-size read from persistence: " + readSize); + } + long batchSize = Bytes.toLong(bytes, 0); + + readSize = in.read(bytes); + if (readSize != Long.BYTES) { + throw new IOException("Invalid size for number of chunks read from persistence: " + readSize); + } + long numChunks = Bytes.toLong(bytes, 0); + + LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize); + + ArrayList bucketCacheMaps = new ArrayList<>(); + // Read the first chunk that has all the details. + BucketCacheProtos.BucketCacheEntry firstChunk = + BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in); + + // Subsequent chunks have the backingMap entries. + for (int i = 1; i < numChunks; i++) { + LOG.info("Reading chunk no: {}", i + 1); + bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in)); + LOG.info("Retrieved chunk: {}", i + 1); + } + parsePB(firstChunk, bucketCacheMaps); + } + /** * Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors * exceeds ioErrorsDurationTimeTolerated, we will disable the cache diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java index 4b42414fb9c5..4618200325c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.FileOutputStream; import java.io.IOException; import java.util.Comparator; import java.util.HashMap; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -41,29 +43,55 @@ @InterfaceAudience.Private final class BucketProtoUtils { + + final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; + private BucketProtoUtils() { } - static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { + static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, + BucketCacheProtos.BackingMap backingMap) { return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) .setIoClass(cache.ioEngine.getClass().getName()) .setMapClass(cache.backingMap.getClass().getName()) .putAllDeserializers(CacheableDeserializerIdManager.save()) - .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) - .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) + .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap) .setChecksum(ByteString .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) .build(); } - private static BucketCacheProtos.BackingMap toPB(Map backingMap) { + public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize, + long numChunks) throws IOException { + int blockCount = 0; + int chunkCount = 0; + int backingMapSize = cache.backingMap.size(); BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); - for (Map.Entry entry : backingMap.entrySet()) { - builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey())) - .setValue(toPB(entry.getValue())).build()); + + fos.write(PB_MAGIC_V2); + fos.write(Bytes.toBytes(chunkSize)); + fos.write(Bytes.toBytes(numChunks)); + + for (Map.Entry entry : cache.backingMap.entrySet()) { + blockCount++; + builder.addEntry( + BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey())) + .setValue(BucketProtoUtils.toPB(entry.getValue())).build()); + if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) { + chunkCount++; + if (chunkCount == 1) { + // Persist all details along with the first chunk into BucketCacheEntry + BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos); + } else { + // Directly persist subsequent backing-map chunks. + builder.build().writeDelimitedTo(fos); + } + if (blockCount < backingMapSize) { + builder = BucketCacheProtos.BackingMap.newBuilder(); + } + } } - return builder.build(); } private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java index 0d33fb079bcd..b49a2b1db8d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY; +import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE; import static org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -350,6 +351,52 @@ public void testBucketCacheRecovery() throws Exception { TEST_UTIL.cleanupTestDir(); } + @Test + public void testSingleChunk() throws Exception { + testChunkedBackingMapRecovery(5, 5); + } + + @Test + public void testMultipleChunks() throws Exception { + testChunkedBackingMapRecovery(5, 10); + } + + private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) throws Exception { + HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + Configuration conf = HBaseConfiguration.create(); + conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize); + + String mapFileName = testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime(); + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + CacheTestUtils.HFileBlockPair[] blocks = + CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks); + + for (int i = 0; i < numBlocks; i++) { + cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), blocks[i].getBlock()); + } + + // saves the current state + bucketCache.persistToFile(); + + // Create a new bucket which reads from persistence file. + BucketCache newBucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, mapFileName, + DEFAULT_ERROR_TOLERATION_DURATION, conf); + + assertEquals(numBlocks, newBucketCache.backingMap.size()); + + for (int i = 0; i < numBlocks; i++) { + assertEquals(blocks[i].getBlock(), + newBucketCache.getBlock(blocks[i].getBlockName(), false, false, false)); + } + TEST_UTIL.cleanupTestDir(); + } + private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey) throws InterruptedException { while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {