Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28805: Chunked persistence of backing map for persistent bucket cache. #6183

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -122,6 +123,8 @@ public class BucketCache implements BlockCache, HeapSize {
static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor";
static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
"hbase.bucketcache.persistence.chunksize";

/** Use strong reference for offsetLock or not */
private static final String STRONG_REF_KEY = "hbase.bucketcache.offsetlock.usestrongref";
Expand All @@ -145,6 +148,8 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_THREADS = 3;
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;

final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000;

// Store/read block data
transient final IOEngine ioEngine;

Expand Down Expand Up @@ -273,6 +278,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private String algorithm;

private long persistenceChunkSize;

/* Tracing failed Bucket Cache allocations. */
private long allocFailLogPrevTs; // time of previous log event for allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 1 minute.
Expand Down Expand Up @@ -313,6 +320,11 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval = conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
this.persistenceChunkSize =
conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
if (this.persistenceChunkSize <= 0) {
persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
}

sanityCheckConfigs();

Expand Down Expand Up @@ -1358,8 +1370,8 @@ void persistToFile() throws IOException {
}
File tempPersistencePath = new File(persistencePath + EnvironmentEdgeManager.currentTime());
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
LOG.debug("Persist in new chunked persistence format.");
persistChunkedBackingMap(fos);
} catch (IOException e) {
LOG.error("Failed to persist bucket cache to file", e);
throw e;
Expand Down Expand Up @@ -1405,16 +1417,23 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
throw new IOException("Incorrect number of bytes read while checking for protobuf magic "
+ "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath);
}
if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
LOG.info("Reading old format of persistence.");
// The old non-chunked version of backing map persistence.
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
} else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
// The new persistence format of chunked persistence.
LOG.info("Reading new chunked format of persistence.");
retrieveChunkedBackingMap(in, bucketSizes);
} else {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support reading the old format
throw new IOException(
"Persistence file does not start with protobuf magic number. " + persistencePath);
}
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize);
blockNumber.add(backingMap.size());
LOG.info("Bucket cache retrieved from file successfully");
LOG.info("Bucket cache retrieved from file successfully with size: {}", backingMap.size());
}
}

Expand Down Expand Up @@ -1457,6 +1476,75 @@ private void verifyCapacityAndClasses(long capacitySize, String ioclass, String
}
}

private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
try {
if (proto.hasChecksum()) {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
}
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");

Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fileNotFullyCached(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
}

private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
List<BucketCacheProtos.BackingMap> chunks) throws IOException {
fullyCachedFiles.clear();
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), firstChunk.getBackingMap(),
this::createRecycler);
backingMap.putAll(pair.getFirst());
blocksByHFile.addAll(pair.getSecond());
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));

LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", backingMap.size(),
fullyCachedFiles.size());
int i = 1;
for (BucketCacheProtos.BackingMap chunk : chunks) {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair2 =
BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, this::createRecycler);
backingMap.putAll(pair2.getFirst());
blocksByHFile.addAll(pair2.getSecond());
LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: {}", ++i,
backingMap.size(), fullyCachedFiles.size());
}
verifyFileIntegrity(firstChunk);
verifyCapacityAndClasses(firstChunk.getCacheCapacity(), firstChunk.getIoClass(),
firstChunk.getMapClass());
updateRegionSizeMapWhileRetrievingFromFile();
}

private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
Expand All @@ -1465,52 +1553,60 @@ private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOExceptio
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
if (proto.hasChecksum()) {
try {
((PersistentIOEngine) ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
+ "We need to validate each cache key in the backing map. "
+ "This may take some time, so we'll do it in a background thread,");
Runnable cacheValidator = () -> {
while (bucketAllocator == null) {
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
long startTime = EnvironmentEdgeManager.currentTime();
int totalKeysOriginally = backingMap.size();
for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : backingMap.entrySet()) {
try {
((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
} catch (IOException e1) {
LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
evictBlock(keyEntry.getKey());
fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
}
}
backingMapValidated.set(true);
LOG.info("Finished validating {} keys in the backing map. Recovered: {}. This took {}ms.",
totalKeysOriginally, backingMap.size(),
(EnvironmentEdgeManager.currentTime() - startTime));
};
Thread t = new Thread(cacheValidator);
t.setDaemon(true);
t.start();
}
} else {
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
backingMapValidated.set(true);
}
verifyFileIntegrity(proto);
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
}

private void persistChunkedBackingMap(FileOutputStream fos) throws IOException {
long numChunks = backingMap.size() / persistenceChunkSize;
if (backingMap.size() % persistenceChunkSize != 0) {
numChunks += 1;
}

LOG.debug(
"persistToFile: before persisting backing map size: {}, "
+ "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, numChunks);

BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks);

LOG.debug(
"persistToFile: after persisting backing map size: {}, "
+ "fullycachedFiles size: {}, numChunksPersisteed: {}",
backingMap.size(), fullyCachedFiles.size(), numChunks);
}

private void retrieveChunkedBackingMap(FileInputStream in, int[] bucketSizes) throws IOException {
byte[] bytes = new byte[Long.BYTES];
int readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size of chunk-size read from persistence: " + readSize);
}
long batchSize = Bytes.toLong(bytes, 0);

readSize = in.read(bytes);
if (readSize != Long.BYTES) {
throw new IOException("Invalid size for number of chunks read from persistence: " + readSize);
}
long numChunks = Bytes.toLong(bytes, 0);

LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);

ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new ArrayList<>();
// Read the first chunk that has all the details.
BucketCacheProtos.BucketCacheEntry firstChunk =
BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);

// Subsequent chunks have the backingMap entries.
for (int i = 1; i < numChunks; i++) {
LOG.info("Reading chunk no: {}", i + 1);
bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
LOG.info("Retrieved chunk: {}", i + 1);
}
parsePB(firstChunk, bucketCacheMaps);
}

/**
* Check whether we tolerate IO error this time. If the duration of IOEngine throwing errors
* exceeds ioErrorsDurationTimeTolerated, we will disable the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -41,29 +43,55 @@

@InterfaceAudience.Private
final class BucketProtoUtils {

final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };

private BucketProtoUtils() {

}

static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
BucketCacheProtos.BackingMap backingMap) {
return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap)
.setChecksum(ByteString
.copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
.build();
}

private static BucketCacheProtos.BackingMap toPB(Map<BlockCacheKey, BucketEntry> backingMap) {
public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize,
long numChunks) throws IOException {
int blockCount = 0;
int chunkCount = 0;
int backingMapSize = cache.backingMap.size();
BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey()))
.setValue(toPB(entry.getValue())).build());

fos.write(PB_MAGIC_V2);
fos.write(Bytes.toBytes(chunkSize));
fos.write(Bytes.toBytes(numChunks));

for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
blockCount++;
builder.addEntry(
BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey()))
.setValue(BucketProtoUtils.toPB(entry.getValue())).build());
if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
chunkCount++;
if (chunkCount == 1) {
// Persist all details along with the first chunk into BucketCacheEntry
BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
} else {
// Directly persist subsequent backing-map chunks.
builder.build().writeDelimitedTo(fos);
}
if (blockCount < backingMapSize) {
builder = BucketCacheProtos.BackingMap.newBuilder();
}
}
}
return builder.build();
}

private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
Expand Down
Loading