From ce65bb0a08fda892e01038bb25156a0c6665857e Mon Sep 17 00:00:00 2001 From: Yutong Sean Date: Thu, 13 Jan 2022 08:45:06 +0800 Subject: [PATCH 1/4] HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. --- .../hadoop/hbase/io/hfile/HFileBlock.java | 5 ++--- .../hbase/io/hfile/bucket/BucketCache.java | 17 ++++++++++------- .../hbase/io/hfile/bucket/TestBucketCache.java | 4 +++- .../io/hfile/bucket/TestBucketCacheRefCnt.java | 4 ++-- .../io/hfile/bucket/TestBucketWriterThread.java | 10 +++++++--- 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index fd6cea1b0a19..0ae5edb5df1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -235,7 +235,7 @@ static class Header { * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was * formerly known as EXTRA_SERIALIZATION_SPACE). */ - static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + public static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. @@ -1883,8 +1883,7 @@ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) /** * For use by bucketcache. This exposes internals. */ - public ByteBuffer getMetaData() { - ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); + public ByteBuffer getMetaData(ByteBuffer bb) { bb = addMetaData(bb, true); bb.flip(); return bb; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 25859d4a7169..bb26f74b5bb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -945,6 +945,7 @@ private void freeSpace(final String why) { class WriterThread extends Thread { private final BlockingQueue inputQueue; private volatile boolean writerEnabled = true; + private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); WriterThread(BlockingQueue queue) { super("BucketCacheWriterThread"); @@ -970,7 +971,7 @@ public void run() { break; } } - doDrain(entries); + doDrain(entries, metaBuff); } catch (Exception ioe) { LOG.error("WriterThread encountered error", ioe); } @@ -1046,7 +1047,7 @@ private static String getAllocationFailWarningMessage(final BucketAllocatorExcep * @param entries Presumes list passed in here will be processed by this invocation only. No * interference expected. */ - void doDrain(final List entries) throws InterruptedException { + void doDrain(final List entries, ByteBuffer metaBuff) throws InterruptedException { if (entries.isEmpty()) { return; } @@ -1076,7 +1077,7 @@ void doDrain(final List entries) throws InterruptedException { } BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, - this::createRecycler); + this::createRecycler, metaBuff); // Successfully added. Up index and add bucketEntry. Clear io exceptions. bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { @@ -1504,8 +1505,8 @@ private ByteBuffAllocator getByteBuffAllocator() { } public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, - final LongAdder realCacheSize, Function createRecycler) - throws IOException { + final LongAdder realCacheSize, Function createRecycler, + ByteBuffer metaBuff) throws IOException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized if (len == 0) { @@ -1522,9 +1523,11 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a // If an instance of HFileBlock, save on some allocations. HFileBlock block = (HFileBlock) data; ByteBuff sliceBuf = block.getBufferReadOnly(); - ByteBuffer metadata = block.getMetaData(); + block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); - ioEngine.write(metadata, offset + len - metadata.limit()); + ioEngine.write(metaBuff, offset + len - metaBuff.limit()); + // Reset the position for reuse. + metaBuff.clear(); } else { // Only used for testing. ByteBuffer bb = ByteBuffer.allocate(len); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 22b48dc91632..a65226e4ba27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.After; @@ -686,7 +687,8 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { Assert.assertEquals(0, allocator.getUsedSize()); try { - re.writeToCache(ioEngine, allocator, null, null); + re.writeToCache(ioEngine, allocator, null, null, + ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); Assert.fail(); } catch (Exception e) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index fd083fd3c898..e9c6e5e2cc63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -627,8 +627,8 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { } @Override - void doDrain(List entries) throws InterruptedException { - super.doDrain(entries); + void doDrain(List entries, ByteBuffer metaBuff) throws InterruptedException { + super.doDrain(entries, metaBuff); if (entries.size() > 0) { /** * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 0ba7dea48081..244412c5a0b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -31,6 +32,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -140,7 +143,7 @@ public void testIOE() throws IOException, InterruptedException { RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). - writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -162,7 +165,8 @@ public void testCacheFullException() BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). - when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } @@ -171,7 +175,7 @@ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.Wr final BlockingQueue q) throws InterruptedException { List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); - bc.doDrain(rqes); + bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); assertTrue(q.isEmpty()); assertTrue(bc.ramCache.isEmpty()); assertEquals(0, bc.heapSize()); From ed5d2547706424bd8e5444f6f4548b5e7cf791de Mon Sep 17 00:00:00 2001 From: Yutong Sean Date: Thu, 13 Jan 2022 09:42:15 +0800 Subject: [PATCH 2/4] Checkstyle fix --- .../main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java | 3 ++- .../apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java | 1 - .../hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 0ae5edb5df1b..7c7fa4ef8c36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -235,7 +235,8 @@ static class Header { * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was * formerly known as EXTRA_SERIALIZATION_SPACE). */ - public static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + public static final int BLOCK_METADATA_SPACE = + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index a65226e4ba27..45120a73b9e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.After; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 244412c5a0b9..1d4f1f3d7425 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.Cacheable; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.testclassification.IOTests; From bbf3de6a9999db5b663b523168715b03c5f1ed65 Mon Sep 17 00:00:00 2001 From: Yutong Sean Date: Fri, 14 Jan 2022 10:05:54 +0800 Subject: [PATCH 3/4] Added comment of the buffer reuse --- .../org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index bb26f74b5bb6..603505f64483 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1527,6 +1527,10 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a ioEngine.write(sliceBuf, offset); ioEngine.write(metaBuff, offset + len - metaBuff.limit()); // Reset the position for reuse. + // The data in metaBuff should be guaranteed that has been transferred to the ioEngine + // safely. Otherwise, this reuse is problematic. Fortunately, the data is already + // transferred with our current IOEngines. Should take care, when we have new types of + // IOEngine in the future. metaBuff.clear(); } else { // Only used for testing. From a1cd62f0ccfe86f876cfefa4dcf7a2c9754b29de Mon Sep 17 00:00:00 2001 From: Yutong Sean Date: Thu, 20 Jan 2022 09:15:28 +0800 Subject: [PATCH 4/4] Move metaBuff clear upper layer --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 603505f64483..e05645415fc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1075,7 +1075,12 @@ void doDrain(final List entries, ByteBuffer metaBuff) throws Inte if (ramCache.containsKey(cacheKey)) { blocksByHFile.add(cacheKey); } - + // Reset the position for reuse. + // It should be guaranteed that the data in the metaBuff has been transferred to the + // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already + // transferred with our current IOEngines. Should take care, when we have new kinds of + // IOEngine in the future. + metaBuff.clear(); BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, this::createRecycler, metaBuff); // Successfully added. Up index and add bucketEntry. Clear io exceptions. @@ -1526,12 +1531,6 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); ioEngine.write(metaBuff, offset + len - metaBuff.limit()); - // Reset the position for reuse. - // The data in metaBuff should be guaranteed that has been transferred to the ioEngine - // safely. Otherwise, this reuse is problematic. Fortunately, the data is already - // transferred with our current IOEngines. Should take care, when we have new types of - // IOEngine in the future. - metaBuff.clear(); } else { // Only used for testing. ByteBuffer bb = ByteBuffer.allocate(len);