Skip to content

Commit

Permalink
HBASE-28805: Chunked persistence of backing map for persistent bucket…
Browse files Browse the repository at this point in the history
… cache. (apache#6183)

HBASE-28805: Chunked persistence of backing map for persistent bucket cache.

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Change-Id: I72efe71040a8970f87a8b2a08c4c4ac1ceef74bc
  • Loading branch information
jhungund committed Sep 6, 2024
1 parent fb9d89d commit 566b18c
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -121,6 +122,8 @@ public class BucketCache implements BlockCache, HeapSize {
static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
"hbase.bucketcache.persistence.chunksize";

/** Priority buckets */
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
Expand All @@ -140,6 +143,8 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_THREADS = 3;
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;

final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000;

// Store/read block data
transient final IOEngine ioEngine;

Expand Down Expand Up @@ -269,6 +274,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private String algorithm;

private long persistenceChunkSize;

/* Tracing failed Bucket Cache allocations. */
private long allocFailLogPrevTs; // time of previous log event for allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
Expand Down Expand Up @@ -302,6 +309,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
this.persistenceChunkSize =
conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
if (this.persistenceChunkSize <= 0) {
persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
}

sanityCheckConfigs();

Expand Down Expand Up @@ -1346,8 +1358,8 @@ void persistToFile() throws IOException {
}
File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime());
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
LOG.debug("Persist in new chunked persistence format.");
persistChunkedBackingMap(fos);
} catch (IOException e) {
LOG.error("Failed to persist bucket cache to file", e);
throw e;
Expand Down Expand Up @@ -1389,16 +1401,24 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
int pblen = ProtobufMagic.lengthOfPBMagic();
byte[] pbuf = new byte[pblen];
IOUtils.readFully(in, pbuf, 0, pblen);
if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {

if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
LOG.info("Reading old format of persistence.");
// The old non-chunked version of backing map persistence.
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
} else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
// The new persistence format of chunked persistence.
LOG.info("Reading new chunked format of persistence.");
retrieveChunkedBackingMap(in, bucketSizes);
} else {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support reading the old format
throw new IOException(
"Persistence file does not start with protobuf magic number. " + persistencePath);
}
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
blockNumber.add(backingMap.size());
LOG.info("Bucket cache retrieved from file successfully");
LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size());
}
}

Expand Down Expand Up @@ -1441,6 +1461,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
}
}

private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
try {
if (proto.hasChecksum()) {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
}
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");

Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fileNotFullyCached(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
}

private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
List<BucketCacheProtos.BackingMap> chunks) throws IOException {
fullyCachedFiles.clear();
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(),
this::createRecycler);
backingMap.putAll(pair.getFirst());
blocksByHFile.addAll(pair.getSecond());
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));

LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", backingMap.size(),
fullyCachedFiles.size());
int i = 1;
for (BucketCacheProtos.BackingMap chunk : chunks) {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, this::createRecycler);
backingMap.putAll(pair2.getFirst());
blocksByHFile.addAll(pair2.getSecond());
LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", ++i,
backingMap.size(), fullyCachedFiles.size());
}
verifyFileIntegrity(firstChunk);
verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(),
firstChunk.getMapClass());
updateRegionSizeMapWhileRetrievingFromFile();
}

private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
Expand All @@ -1449,52 +1538,60 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
if (proto.hasChecksum()) {
try {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");
Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fileNotFullyCached(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
} else {
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
backingMapValidated.set(true);
}
verifyFileIntegrity(proto);
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
}

private void persistChunkedBackingMap(FileOutputStream fos) throws IOException {
long numChunks = backingMap.size() / persistenceChunkSize;
if (backingMap.size() % persistenceChunkSize != 0) {
numChunks += 1;
}

LOG.debug(
"persistToFile: before persisting backing map size: {}, "
+ "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks);

BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks);

LOG.debug(
"persistToFile: after persisting backing map size: {}, "
+ "fullycachedFiles size: {}, numChunksPersisteed: {}",
backingMap.size(), fullyCachedFiles.size(), numChunks);
}

private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException {
byte[] bytes = new byte[Long.BYTES];
int readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size of chunk-size read from persistence: " + readSize);
}
long batchSize = Bytes.toLong(bytes, 0);

readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size for number of chunks read from persistence: " + readSize);
}
long numChunks = Bytes.toLong(bytes, 0);

LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);

ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new ArrayList<>();
// Read the first chunk that has all the details.
BucketCacheProtos.BucketCacheEntry firstChunk =
BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);

// Subsequent chunks have the backingMap entries.
for (int i = 1; i < numChunks; i++) {
LOG.info("Reading chunk no: {}", i + 1);
bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
LOG.info("Retrieved chunk: {}", i + 1);
}
parsePB(firstChunk, bucketCacheMaps);
}

/**
* Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
* exceeds ioErrorsDurationTimeTolerated, we will disable the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -41,29 +43,55 @@

@InterfaceAudience.Private
final class BucketProtoUtils {

final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };

private BucketProtoUtils() {

}

static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
BucketCacheProtos.BackingMap backingMap) {
return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap)
.setChecksum(ByteString
.copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
.build();
}

private static BucketCacheProtos.BackingMap toPB(Map<BlockCacheKey, BucketEntry> backingMap) {
public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize,
long numChunks) throws IOException {
int blockCount = 0;
int chunkCount = 0;
int backingMapSize = cache.backingMap.size();
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue())).build());

fos.write(PB_MAGIC_V2);
fos.write(Bytes.toBytes(chunkSize));
fos.write(Bytes.toBytes(numChunks));

for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
blockCount++;
builder.addEntry(
BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey()))
.setValue(BucketProtoUtils.toPB(entry.getValue())).build());
if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
chunkCount++;
if (chunkCount == 1) {
// Persist all details along with the first chunk into BucketCacheEntry
BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
} else {
// Directly persist subsequent backing-map chunks.
builder.build().writeDelimitedTo(fos);
}
if (blockCount < backingMapSize) {
builder = BucketCacheProtos.BackingMap.newBuilder();
}
}
}
return builder.build();
}

private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
Expand Down
Loading

0 comments on commit 566b18c

Please sign in to comment.