Skip to content

Commit

Permalink
HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCach…
Browse files Browse the repository at this point in the history
…e could be reused. (#4026)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
YutSean authored and Apache9 committed Feb 18, 2022
1 parent af338d2 commit 932bf59
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ static class Header {
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE).
*/
static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
public static final int BLOCK_METADATA_SPACE =
Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;

/**
* Each checksum value is an integer that can be stored in 4 bytes.
Expand Down Expand Up @@ -1884,8 +1885,7 @@ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata)
/**
* For use by bucketcache. This exposes internals.
*/
public ByteBuffer getMetaData() {
ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
public ByteBuffer getMetaData(ByteBuffer bb) {
bb = addMetaData(bb, true);
bb.flip();
return bb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ private void freeSpace(final String why) {
class WriterThread extends Thread {
private final BlockingQueue<RAMQueueEntry> inputQueue;
private volatile boolean writerEnabled = true;
private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);

WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super("BucketCacheWriterThread");
Expand All @@ -959,7 +960,7 @@ public void run() {
break;
}
}
doDrain(entries);
doDrain(entries, metaBuff);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
Expand Down Expand Up @@ -1035,7 +1036,7 @@ private static String getAllocationFailWarningMessage(final BucketAllocatorExcep
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
*/
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
Expand Down Expand Up @@ -1063,9 +1064,14 @@ void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
if (ramCache.containsKey(cacheKey)) {
blocksByHFile.add(cacheKey);
}

// Reset the position for reuse.
// It should be guaranteed that the data in the metaBuff has been transferred to the
// ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
// transferred with our current IOEngines. Should take care, when we have new kinds of
// IOEngine in the future.
metaBuff.clear();
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler);
this::createRecycler, metaBuff);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
Expand Down Expand Up @@ -1489,8 +1495,8 @@ private ByteBuffAllocator getByteBuffAllocator() {
}

public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
throws IOException {
final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
ByteBuffer metaBuff) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
Expand All @@ -1507,9 +1513,9 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly();
ByteBuffer metadata = block.getMetaData();
block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset);
ioEngine.write(metadata, offset + len - metadata.limit());
ioEngine.write(metaBuff, offset + len - metaBuff.limit());
} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException {

Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null, null);
re.writeToCache(ioEngine, allocator, null, null,
ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
Assert.fail();
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
}

@Override
void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
super.doDrain(entries);
void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
super.doDrain(entries, metaBuff);
if (entries.size() > 0) {
/**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand Down Expand Up @@ -140,7 +142,7 @@ public void testIOE() throws IOException, InterruptedException {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
Expand All @@ -162,7 +164,8 @@ public void testCacheFullException()
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
Expand All @@ -171,7 +174,7 @@ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.Wr
final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
bc.doDrain(rqes);
bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());
Expand Down

0 comments on commit 932bf59

Please sign in to comment.