Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. #4026

Merged
merged 4 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ static class Header {
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE).
*/
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
public static final int BLOCK_METADATA_SPACE =
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;

/**
* Each checksum value is an integer that can be stored in 4 bytes.
Expand Down Expand Up @@ -1883,8 +1884,7 @@ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata)
/**
* For use by bucketcache. This exposes internals.
*/
public ByteBuffer getMetaData() {
ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
public ByteBuffer getMetaData(ByteBuffer bb) {
bb = addMetaData(bb, true);
bb.flip();
return bb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ private void freeSpace(final String why) {
class WriterThread extends Thread {
private final BlockingQueue<RAMQueueEntry> inputQueue;
private volatile boolean writerEnabled = true;
private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);

WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super("BucketCacheWriterThread");
Expand All @@ -970,7 +971,7 @@ public void run() {
break;
}
}
doDrain(entries);
doDrain(entries, metaBuff);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
Expand Down Expand Up @@ -1046,7 +1047,7 @@ private static String getAllocationFailWarningMessage(final BucketAllocatorExcep
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
*/
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
Expand Down Expand Up @@ -1074,9 +1075,14 @@ void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (ramCache.containsKey(cacheKey)) {
blocksByHFile.add(cacheKey);
}

// Reset the position for reuse.
// It should be guaranteed that the data in the metaBuff has been transferred to the
// ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
// transferred with our current IOEngines. Should take care, when we have new kinds of
// IOEngine in the future.
metaBuff.clear();
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler);
this::createRecycler, metaBuff);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
Expand Down Expand Up @@ -1504,8 +1510,8 @@ private ByteBuffAllocator getByteBuffAllocator() {
}

public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
ByteBuffer metaBuff) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
Expand All @@ -1522,9 +1528,9 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly();
ByteBuffer metadata = block.getMetaData();
block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset);
ioEngine.write(metadata, offset + len - metadata.limit());
ioEngine.write(metaBuff, offset + len - metaBuff.limit());
} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {

Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null, null);
re.writeToCache(ioEngine, allocator, null, null,
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
Assert.fail();
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
}

@Override
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
super.doDrain(entries);
void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
super.doDrain(entries, metaBuff);
if (entries.size() > 0) {
/**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand Down Expand Up @@ -140,7 +142,7 @@ public void testIOE() throws IOException, InterruptedException {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
Expand All @@ -162,7 +164,8 @@ public void testCacheFullException()
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
Expand All @@ -171,7 +174,7 @@ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.Wr
final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
bc.doDrain(rqes);
bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());
Expand Down