Skip to content

Commit

Permalink
HBASE-27013 Introduce read all bytes when using pread for Prefetch
Browse files Browse the repository at this point in the history
  • Loading branch information
taklwu committed May 11, 2022
1 parent f6e9d3e commit c8b8bc4
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,13 @@ public enum OperationStatusCode {
public static final long HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT =
32 * 1024 * 1024L;

/**
* Configuration key for setting pread must read both necessaryLen and extraLen, default is
* disabled. This is an optimized flag for reading HFile from blob storage.
*/
public static final String HFILE_PREAD_ALL_BYTES_ENABLED_KEY = "hfile.pread.all.bytes.enabled";
public static final boolean HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT = false;

/*
* Minimum percentage of free heap necessary for a successful cluster startup.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,43 @@ public static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int nec
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
return preadWithExtra(buff, dis, position, necessaryLen, extraLen, false);
}

/**
* Read from an input stream at least <code>necessaryLen</code> and if possible,
* <code>extraLen</code> also if available. Analogous to
* {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and
* specifies a number of "extra" bytes that would be desirable but not absolutely necessary to
* read. If the input stream supports ByteBufferPositionedReadable, it reads to the byte buffer
* directly, and does not allocate a temporary byte array.
* @param buff ByteBuff to read into.
* @param dis the input stream to read from
* @param position the position within the stream from which to start reading
* @param necessaryLen the number of bytes that are absolutely necessary to read
* @param extraLen the number of extra bytes that would be nice to read
* @param readAllBytes whether we must read the necessaryLen and extraLen
* @return true if and only if extraLen is > 0 and reading those extra bytes was successful
* @throws IOException if failed to read the necessary bytes
*/
public static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
boolean preadbytebuffer = dis.hasCapability("in:preadbytebuffer");

if (preadbytebuffer) {
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen);
return preadWithExtraDirectly(buff, dis, position, necessaryLen, extraLen, readAllBytes);
} else {
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen);
return preadWithExtraOnHeap(buff, dis, position, necessaryLen, extraLen, readAllBytes);
}
}

private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int remain = necessaryLen + extraLen;
byte[] buf = new byte[remain];
int bytesRead = 0;
while (bytesRead < necessaryLen) {
int lengthMustRead = readAllBytes ? remain : necessaryLen;
while (bytesRead < lengthMustRead) {
int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream (positional read returned " + ret
Expand All @@ -257,11 +279,12 @@ private static boolean preadWithExtraOnHeap(ByteBuff buff, FSDataInputStream dis
}

private static boolean preadWithExtraDirectly(ByteBuff buff, FSDataInputStream dis, long position,
int necessaryLen, int extraLen) throws IOException {
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
ByteBuffer[] buffers = buff.nioByteBuffers();
ByteBuffer cur = buffers[idx];
while (bytesRead < necessaryLen) {
int lengthMustRead = readAllBytes ? remain : necessaryLen;
while (bytesRead < lengthMustRead) {
int ret;
while (!cur.hasRemaining()) {
if (++idx >= buffers.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,12 @@ protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int siz
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
int extraSize = peekIntoNextBlock ? hdrSize : 0;
if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) {
boolean readAllBytes =
hfs.getConf().getBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY,
HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_DEFAULT);
if (
!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize, readAllBytes)
) {
// did not read the next block header.
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,13 +1232,11 @@ private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted,
allowedOnPath = ".*/(HStore|TestHStore).java")
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result,
boolean writeCompactionMarker) throws IOException {
storeEngine.replaceStoreFiles(compactedFiles, result,
() -> {
if (writeCompactionMarker) {
writeCompactionWalRecord(compactedFiles, result);
}
},
() -> {
storeEngine.replaceStoreFiles(compactedFiles, result, () -> {
if (writeCompactionMarker) {
writeCompactionWalRecord(compactedFiles, result);
}
}, () -> {
synchronized (filesCompacting) {
filesCompacting.removeAll(compactedFiles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparator;
Expand Down Expand Up @@ -408,7 +407,9 @@ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throw
List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);

// propogate the file changes to the underlying store file manager
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {}, () -> {}); // won't throw an exception
replaceStoreFiles(toBeRemovedStoreFiles, openedFiles, () -> {
}, () -> {
}); // won't throw an exception
}

/**
Expand Down Expand Up @@ -491,8 +492,8 @@ public void addStoreFiles(Collection<HStoreFile> storeFiles,
}

public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter,
Runnable actionUnderLock) throws IOException {
Collection<HStoreFile> newFiles, IOExceptionRunnable walMarkerWriter, Runnable actionUnderLock)
throws IOException {
storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
StoreUtils.toStoreFileInfo(newFiles));
walMarkerWriter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ private void initialize() {
}
}


if (!this.isSourceActive()) {
setSourceStartupStatus(false);
if (Thread.currentThread().isInterrupted()) {
Expand Down Expand Up @@ -569,7 +568,7 @@ private void initialize() {
}
}

if(!this.isSourceActive()) {
if (!this.isSourceActive()) {
setSourceStartupStatus(false);
if (Thread.currentThread().isInterrupted()) {
// If source is not running and thread is interrupted this means someone has tried to
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -39,7 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ MediumTests.class, ClientTests.class})
@Category({ MediumTests.class, ClientTests.class })
public class TestBadReplicationPeer {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
Expand All @@ -65,15 +65,15 @@ public static void tearDownAfterClass() throws Exception {
}

/*
Add dummy peer and make sure that we are able to remove that peer.
* Add dummy peer and make sure that we are able to remove that peer.
*/
@Test
public void testRemovePeerSucceeds() throws IOException {
String peerId = "dummypeer_1";
try (Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin()){
Admin admin = connection.getAdmin()) {
ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
String quorum = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper().getQuorum();
String quorum = TEST_UTIL.getHBaseCluster().getMaster().getZooKeeper().getQuorum();
rpcBuilder.setClusterKey(quorum + ":/1");
ReplicationPeerConfig rpc = rpcBuilder.build();
admin.addReplicationPeer(peerId, rpc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -28,15 +29,23 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
Expand All @@ -49,6 +58,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.rules.TestName;

@Category({ IOTests.class, SmallTests.class })
public class TestBlockIOUtils {
Expand All @@ -57,11 +67,17 @@ public class TestBlockIOUtils {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBlockIOUtils.class);

@Rule
public TestName testName = new TestName();

@Rule
public ExpectedException exception = ExpectedException.none();

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();

private static final int NUM_TEST_BLOCKS = 2;
private static final Compression.Algorithm COMPRESSION_ALGO = Compression.Algorithm.GZ;

@Test
public void testIsByteBufferReadable() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down Expand Up @@ -92,6 +108,102 @@ public void testReadFully() throws IOException {
assertArrayEquals(Bytes.toBytes(s), heapBuf);
}

@Test
public void testPreadWithReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(true);
}

@Test
public void testPreadWithoutReadFullBytes() throws IOException {
testPreadReadFullBytesInternal(false);
}

private void testPreadReadFullBytesInternal(boolean readAllBytes) throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(HConstants.HFILE_PREAD_ALL_BYTES_ENABLED_KEY, readAllBytes);
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path path = new Path(TEST_UTIL.getDataTestDirOnTestFS(), testName.getMethodName());
// give a fixed seed such we can see failure easily.
Random rand = new Random(5685632);
long totalDataBlockBytes =
writeBlocks(TEST_UTIL.getConfiguration(), rand, COMPRESSION_ALGO, path);
readDataBlocksAndVerify(fs, path, COMPRESSION_ALGO, totalDataBlockBytes);
}

private long writeBlocks(Configuration conf, Random rand, Compression.Algorithm compressAlgo,
Path path) throws IOException {
FileSystem fs = HFileSystem.get(conf);
FSDataOutputStream os = fs.create(path);
HFileContext meta =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
HFileBlock.Writer hbw = new HFileBlock.Writer(conf, null, meta);
long totalDataBlockBytes = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
int blockTypeOrdinal = rand.nextInt(BlockType.values().length);
if (blockTypeOrdinal == BlockType.ENCODED_DATA.ordinal()) {
blockTypeOrdinal = BlockType.DATA.ordinal();
}
BlockType bt = BlockType.values()[blockTypeOrdinal];
DataOutputStream dos = hbw.startWriting(bt);
int size = rand.nextInt(500);
for (int j = 0; j < size; ++j) {
dos.writeShort(i + 1);
dos.writeInt(j + 1);
}

hbw.writeHeaderAndData(os);
totalDataBlockBytes += hbw.getOnDiskSizeWithHeader();
}
// append a dummy trailer and in a actual HFile it should have more data.
FixedFileTrailer trailer = new FixedFileTrailer(3, 3);
trailer.setFirstDataBlockOffset(0);
trailer.setLastDataBlockOffset(totalDataBlockBytes);
trailer.setComparatorClass(meta.getCellComparator().getClass());
trailer.setDataIndexCount(NUM_TEST_BLOCKS);
trailer.setCompressionCodec(compressAlgo);
trailer.serialize(os);
// close the stream
os.close();
return totalDataBlockBytes;
}

private void readDataBlocksAndVerify(FileSystem fs, Path path, Compression.Algorithm compressAlgo,
long totalDataBlockBytes) throws IOException {
FSDataInputStream is = fs.open(path);
HFileContext fileContext =
new HFileContextBuilder().withHBaseCheckSum(true).withCompression(compressAlgo).build();
ReaderContext context =
new ReaderContextBuilder().withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSize(totalDataBlockBytes)
.withFilePath(path).withFileSystem(fs).build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(context, fileContext, ByteBuffAllocator.HEAP, fs.getConf());

long onDiskSizeOfNextBlock = -1;
long offset = 0;
int numOfReadBlock = 0;
// offset and totalBytes shares the same logic in the HFilePreadReader
while (offset < totalDataBlockBytes) {
HFileBlock block = hbr.readBlockData(offset, onDiskSizeOfNextBlock, true, false, false);
numOfReadBlock++;
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
} finally {
block.release();
}
}
assertEquals(totalDataBlockBytes, offset);
assertEquals(NUM_TEST_BLOCKS, numOfReadBlock);
deleteFile(fs, path);
}

private void deleteFile(FileSystem fs, Path path) throws IOException {
if (fs.exists(path)) {
fs.delete(path, true);
}
}

@Test
public void testReadWithExtra() throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Expand Down

0 comments on commit c8b8bc4

Please sign in to comment.