Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. #4026

Merged
merged 4 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ static class Header {
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE).
*/
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
public static final int BLOCK_METADATA_SPACE =
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;

/**
* Each checksum value is an integer that can be stored in 4 bytes.
Expand Down Expand Up @@ -1883,8 +1884,7 @@ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata)
/**
* For use by bucketcache. This exposes internals.
*/
public ByteBuffer getMetaData() {
ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
public ByteBuffer getMetaData(ByteBuffer bb) {
bb = addMetaData(bb, true);
bb.flip();
return bb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ private void freeSpace(final String why) {
class WriterThread extends Thread {
private final BlockingQueue<RAMQueueEntry> inputQueue;
private volatile boolean writerEnabled = true;
private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);

WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super("BucketCacheWriterThread");
Expand All @@ -970,7 +971,7 @@ public void run() {
break;
}
}
doDrain(entries);
doDrain(entries, metaBuff);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
Expand Down Expand Up @@ -1046,7 +1047,7 @@ private static String getAllocationFailWarningMessage(final BucketAllocatorExcep
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
*/
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
Expand Down Expand Up @@ -1076,7 +1077,7 @@ void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
}

BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler);
this::createRecycler, metaBuff);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
Expand Down Expand Up @@ -1504,8 +1505,8 @@ private ByteBuffAllocator getByteBuffAllocator() {
}

public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
ByteBuffer metaBuff) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
Expand All @@ -1522,9 +1523,15 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly();
ByteBuffer metadata = block.getMetaData();
block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset);
ioEngine.write(metadata, offset + len - metadata.limit());
ioEngine.write(metaBuff, offset + len - metaBuff.limit());
// Reset the position for reuse.
// The data in metaBuff should be guaranteed that has been transferred to the ioEngine
// safely. Otherwise, this reuse is problematic. Fortunately, the data is already
// transferred with our current IOEngines. Should take care, when we have new types of
// IOEngine in the future.
metaBuff.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the metaBuff is passed in from upper layer, I prefer we also do this in upper layer? And what if the above ioEngine.write throws IOException out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have made the metaBuff cleared before invoking of #writeToCache. I think this can keep the metaBuff clean every time we writeToCache even if the IOEngine had exception and failed the last block write.

} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {

Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null, null);
re.writeToCache(ioEngine, allocator, null, null,
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
Assert.fail();
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
}

@Override
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
super.doDrain(entries);
void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
super.doDrain(entries, metaBuff);
if (entries.size() > 0) {
/**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand Down Expand Up @@ -140,7 +142,7 @@ public void testIOE() throws IOException, InterruptedException {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
Expand All @@ -162,7 +164,8 @@ public void testCacheFullException()
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
Expand All @@ -171,7 +174,7 @@ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.Wr
final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
bc.doDrain(rqes);
bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());
Expand Down