diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index fd6cea1b0a19..7c7fa4ef8c36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -235,7 +235,8 @@ static class Header { * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was * formerly known as EXTRA_SERIALIZATION_SPACE). */ - static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; + public static final int BLOCK_METADATA_SPACE = + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT; /** * Each checksum value is an integer that can be stored in 4 bytes. @@ -1883,8 +1884,7 @@ public void serialize(ByteBuffer destination, boolean includeNextBlockMetadata) /** * For use by bucketcache. This exposes internals. */ - public ByteBuffer getMetaData() { - ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE); + public ByteBuffer getMetaData(ByteBuffer bb) { bb = addMetaData(bb, true); bb.flip(); return bb; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 25859d4a7169..e05645415fc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -945,6 +945,7 @@ private void freeSpace(final String why) { class WriterThread extends Thread { private final BlockingQueue inputQueue; private volatile boolean writerEnabled = true; + private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE); WriterThread(BlockingQueue queue) { super("BucketCacheWriterThread"); @@ -970,7 +971,7 @@ public void run() { break; } } - doDrain(entries); + doDrain(entries, metaBuff); } catch (Exception ioe) { LOG.error("WriterThread encountered error", ioe); } @@ -1046,7 +1047,7 @@ private static String getAllocationFailWarningMessage(final BucketAllocatorExcep * @param entries Presumes list passed in here will be processed by this invocation only. No * interference expected. */ - void doDrain(final List entries) throws InterruptedException { + void doDrain(final List entries, ByteBuffer metaBuff) throws InterruptedException { if (entries.isEmpty()) { return; } @@ -1074,9 +1075,14 @@ void doDrain(final List entries) throws InterruptedException { if (ramCache.containsKey(cacheKey)) { blocksByHFile.add(cacheKey); } - + // Reset the position for reuse. + // It should be guaranteed that the data in the metaBuff has been transferred to the + // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already + // transferred with our current IOEngines. Should take care, when we have new kinds of + // IOEngine in the future. + metaBuff.clear(); BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize, - this::createRecycler); + this::createRecycler, metaBuff); // Successfully added. Up index and add bucketEntry. Clear io exceptions. bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { @@ -1504,8 +1510,8 @@ private ByteBuffAllocator getByteBuffAllocator() { } public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc, - final LongAdder realCacheSize, Function createRecycler) - throws IOException { + final LongAdder realCacheSize, Function createRecycler, + ByteBuffer metaBuff) throws IOException { int len = data.getSerializedLength(); // This cacheable thing can't be serialized if (len == 0) { @@ -1522,9 +1528,9 @@ public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator a // If an instance of HFileBlock, save on some allocations. HFileBlock block = (HFileBlock) data; ByteBuff sliceBuf = block.getBufferReadOnly(); - ByteBuffer metadata = block.getMetaData(); + block.getMetaData(metaBuff); ioEngine.write(sliceBuf, offset); - ioEngine.write(metadata, offset + len - metadata.limit()); + ioEngine.write(metaBuff, offset + len - metaBuff.limit()); } else { // Only used for testing. ByteBuffer bb = ByteBuffer.allocate(len); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 22b48dc91632..45120a73b9e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -686,7 +686,8 @@ public void testFreeBlockWhenIOEngineWriteFailure() throws IOException { Assert.assertEquals(0, allocator.getUsedSize()); try { - re.writeToCache(ioEngine, allocator, null, null); + re.writeToCache(ioEngine, allocator, null, null, + ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); Assert.fail(); } catch (Exception e) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java index fd083fd3c898..e9c6e5e2cc63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java @@ -627,8 +627,8 @@ protected void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) { } @Override - void doDrain(List entries) throws InterruptedException { - super.doDrain(entries); + void doDrain(List entries, ByteBuffer metaBuff) throws InterruptedException { + super.doDrain(entries, metaBuff); if (entries.size() > 0) { /** * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 0ba7dea48081..1d4f1f3d7425 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -140,7 +142,7 @@ public void testIOE() throws IOException, InterruptedException { RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). - writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -162,7 +164,8 @@ public void testCacheFullException() BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). - when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } @@ -171,7 +174,7 @@ private static void doDrainOfOneEntry(final BucketCache bc, final BucketCache.Wr final BlockingQueue q) throws InterruptedException { List rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1)); - bc.doDrain(rqes); + bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE)); assertTrue(q.isEmpty()); assertTrue(bc.ramCache.isEmpty()); assertEquals(0, bc.heapSize());