From c8b8bc4b56abe4d9b7ef305b32f1e18cf2a20769 Mon Sep 17 00:00:00 2001 From: "Tak Lon (Stephen) Wu" Date: Thu, 5 May 2022 17:48:11 -0700 Subject: [PATCH] HBASE-27013 Introduce read all bytes when using pread for Prefetch --- .../org/apache/hadoop/hbase/HConstants.java | 7 ++ .../hadoop/hbase/io/util/BlockIOUtils.java | 35 +++++- .../hadoop/hbase/io/hfile/HFileBlock.java | 7 +- .../hadoop/hbase/regionserver/HStore.java | 12 +- .../hbase/regionserver/StoreEngine.java | 9 +- .../regionserver/ReplicationSource.java | 3 +- .../replication/TestBadReplicationPeer.java | 10 +- .../hbase/io/hfile/TestBlockIOUtils.java | 112 ++++++++++++++++++ 8 files changed, 170 insertions(+), 25 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ce7fd1ccf0ab..f4d43a2da291 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1011,6 +1011,13 @@ public enum OperationStatusCode { public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT = 32 * 1024 * 1024L; + /** + * Configuration key for setting pread must read both necessaryLen and extraLen, default is + * disabled. This is an optimized flag for reading HFile from blob storage. + */ + public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled"; + public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false; + /* * Minimum percentage of free heap necessary for a successful cluster startup. */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java index 418d09c38af1..1720cae2300c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java @@ -228,21 +228,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec */ public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, int necessaryLen, int extraLen) throws IOException { + return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false); + } + + /** + * Read from an input stream at least necessaryLen and if possible, + * extraLen also if available. Analogous to + * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and + * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to + * read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer + * directly, and does not allocate a temporary byte array. + * @param buff ByteBuff to read into. + * @param dis the input stream to read from + * @param position the position within the stream from which to start reading + * @param necessaryLen the number of bytes that are absolutely necessary to read + * @param extraLen the number of extra bytes that would be nice to read + * @param readAllBytes whether we must read the necessaryLen and extraLen + * @return true if and only if extraLen is > 0 and reading those extra bytes was successful + * @throws IOException if failed to read the necessary bytes + */ + public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer"); if (preadbytebuffer) { - return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen); + return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes); } else { - return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen); + return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes); } } private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position, - int necessaryLen, int extraLen) throws IOException { + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { int remain = necessaryLen + extraLen; byte[] buf = new byte[remain]; int bytesRead = 0; - while (bytesRead < necessaryLen) { + int lengthMustRead = readAllBytes ? remain : necessaryLen; + while (bytesRead < lengthMustRead) { int ret = dis.read(position + bytesRead, buf, bytesRead, remain); if (ret < 0) { throw new IOException("Premature EOF from inputStream (positional read returned " + ret @@ -257,11 +279,12 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis } private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position, - int necessaryLen, int extraLen) throws IOException { + int necessaryLen, int extraLen, boolean readAllBytes) throws IOException { int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0; ByteBuffer[] buffers = buff.nioByteBuffers(); ByteBuffer cur = buffers[idx]; - while (bytesRead < necessaryLen) { + int lengthMustRead = readAllBytes ? remain : necessaryLen; + while (bytesRead < lengthMustRead) { int ret; while (!cur.hasRemaining()) { if (++idx >= buffers.length) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 99a952b52ee0..86a8e8e04628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1453,7 +1453,12 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz } else { // Positional read. Better for random reads; or when the streamLock is already locked. int extraSize = peekIntoNextBlock ? hdrSize : 0; - if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) { + boolean readAllBytes = + hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, + HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT); + if ( + !BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, readAllBytes) + ) { // did not read the next block header. return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d4482dd44ba4..020009c7f5ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1232,13 +1232,11 @@ private void writeCompactionWalRecord(Collection filesCompacted, allowedOnPath = ".*/(HStore|TestHStore).java") void replaceStoreFiles(Collection compactedFiles, Collection result, boolean writeCompactionMarker) throws IOException { - storeEngine.replaceStoreFiles(compactedFiles, result, - () -> { - if (writeCompactionMarker) { - writeCompactionWalRecord(compactedFiles, result); - } - }, - () -> { + storeEngine.replaceStoreFiles(compactedFiles, result, () -> { + if (writeCompactionMarker) { + writeCompactionWalRecord(compactedFiles, result); + } + }, () -> { synchronized (filesCompacting) { filesCompacting.removeAll(compactedFiles); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index a9863ea84511..ab1f56fb772e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -34,7 +34,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellComparator; @@ -408,7 +407,9 @@ private void refreshStoreFilesInternal(Collection newFiles) throw List openedFiles = openStoreFiles(toBeAddedFiles, false); // propogate the file changes to the underlying store file manager - replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {}, () -> {}); // won't throw an exception + replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> { + }, () -> { + }); // won't throw an exception } /** @@ -491,8 +492,8 @@ public void addStoreFiles(Collection storeFiles, } public void replaceStoreFiles(Collection compactedFiles, - Collection newFiles, IOExceptionRunnable walMarkerWriter, - Runnable actionUnderLock) throws IOException { + Collection newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock) + throws IOException { storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles), StoreUtils.toStoreFileInfo(newFiles)); walMarkerWriter.run(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 0b69c7c674e3..2373751afbb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -538,7 +538,6 @@ private void initialize() { } } - if (!this.isSourceActive()) { setSourceStartupStatus(false); if (Thread.currentThread().isInterrupted()) { @@ -569,7 +568,7 @@ private void initialize() { } } - if(!this.isSourceActive()) { + if (!this.isSourceActive()) { setSourceStartupStatus(false); if (Thread.currentThread().isInterrupted()) { // If source is not running and thread is interrupted this means someone has tried to diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestBadReplicationPeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestBadReplicationPeer.java index cf70ab705c6b..04093355af35 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestBadReplicationPeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestBadReplicationPeer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,7 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category({ MediumTests.class, ClientTests.class}) +@Category({ MediumTests.class, ClientTests.class }) public class TestBadReplicationPeer { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -65,15 +65,15 @@ public static void tearDownAfterClass() throws Exception { } /* - Add dummy peer and make sure that we are able to remove that peer. + * Add dummy peer and make sure that we are able to remove that peer. */ @Test public void testRemovePeerSucceeds() throws IOException { String peerId = "dummypeer_1"; try (Connection connection = ConnectionFactory.createConnection(conf); - Admin admin = connection.getAdmin()){ + Admin admin = connection.getAdmin()) { ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); - String quorum = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper().getQuorum(); + String quorum = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper().getQuorum(); rpcBuilder.setClusterKey(quorum + ":/1"); ReplicationPeerConfig rpc = rpcBuilder.build(); admin.addReplicationPeer(peerId, rpc); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index 638b0b5f1dbe..50f176f9f508 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -28,15 +29,23 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.BlockIOUtils; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.MultiByteBuff; @@ -49,6 +58,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TestName; @Category({ IOTests.class, SmallTests.class }) public class TestBlockIOUtils { @@ -57,11 +67,17 @@ public class TestBlockIOUtils { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestBlockIOUtils.class); + @Rule + public TestName testName = new TestName(); + @Rule public ExpectedException exception = ExpectedException.none(); private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final int NUM_TEST_BLOCKS = 2; + private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ; + @Test public void testIsByteBufferReadable() throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem(); @@ -92,6 +108,102 @@ public void testReadFully() throws IOException { assertArrayEquals(Bytes.toBytes(s), heapBuf); } + @Test + public void testPreadWithReadFullBytes() throws IOException { + testPreadReadFullBytesInternal(true); + } + + @Test + public void testPreadWithoutReadFullBytes() throws IOException { + testPreadReadFullBytesInternal(false); + } + + private void testPreadReadFullBytesInternal(boolean readAllBytes) throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName()); + // give a fixed seed such we can see failure easily. + Random rand = new Random(5685632); + long totalDataBlockBytes = + writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path); + readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes); + } + + private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo, + Path path) throws IOException { + FileSystem fs = HFileSystem.get(conf); + FSDataOutputStream os = fs.create(path); + HFileContext meta = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); + HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta); + long totalDataBlockBytes = 0; + for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { + int blockTypeOrdinal = rand.nextInt(BlockType.values().length); + if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) { + blockTypeOrdinal = BlockType.DATA.ordinal(); + } + BlockType bt = BlockType.values()[blockTypeOrdinal]; + DataOutputStream dos = hbw.startWriting(bt); + int size = rand.nextInt(500); + for (int j = 0; j < size; ++j) { + dos.writeShort(i + 1); + dos.writeInt(j + 1); + } + + hbw.writeHeaderAndData(os); + totalDataBlockBytes += hbw.getOnDiskSizeWithHeader(); + } + // append a dummy trailer and in a actual HFile it should have more data. + FixedFileTrailer trailer = new FixedFileTrailer(3, 3); + trailer.setFirstDataBlockOffset(0); + trailer.setLastDataBlockOffset(totalDataBlockBytes); + trailer.setComparatorClass(meta.getCellComparator().getClass()); + trailer.setDataIndexCount(NUM_TEST_BLOCKS); + trailer.setCompressionCodec(compressAlgo); + trailer.serialize(os); + // close the stream + os.close(); + return totalDataBlockBytes; + } + + private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo, + long totalDataBlockBytes) throws IOException { + FSDataInputStream is = fs.open(path); + HFileContext fileContext = + new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build(); + ReaderContext context = + new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is)) + .withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes) + .withFilePath(path).withFileSystem(fs).build(); + HFileBlock.FSReader hbr = + new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf()); + + long onDiskSizeOfNextBlock = -1; + long offset = 0; + int numOfReadBlock = 0; + // offset and totalBytes shares the same logic in the HFilePreadReader + while (offset < totalDataBlockBytes) { + HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false); + numOfReadBlock++; + try { + onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); + offset += block.getOnDiskSizeWithHeader(); + } finally { + block.release(); + } + } + assertEquals(totalDataBlockBytes, offset); + assertEquals(NUM_TEST_BLOCKS, numOfReadBlock); + deleteFile(fs, path); + } + + private void deleteFile(FileSystem fs, Path path) throws IOException { + if (fs.exists(path)) { + fs.delete(path, true); + } + } + @Test public void testReadWithExtra() throws IOException { FileSystem fs = TEST_UTIL.getTestFileSystem();