Skip to content

Commit

Permalink
HBASE-27646 Should not use pread when prefetching in HFilePreadReader (
Browse files Browse the repository at this point in the history
…apache#5063)

Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
  • Loading branch information
sunhelly committed Mar 20, 2023
1 parent 7e1f243 commit 2933be3
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private static class ReadStatistics {
private Boolean instanceOfCanUnbuffer = null;
private CanUnbuffer unbuffer = null;

protected Path readerPath;

public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, path, false, -1L);
}
Expand Down Expand Up @@ -127,6 +129,9 @@ private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path, boolea
// Initially we are going to read the tail block. Open the reader w/FS checksum.
this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
this.readerPath = this.stream.getWrappedStream() instanceof FileLink.FileLinkInputStream
? ((FileLink.FileLinkInputStream) this.stream.getWrappedStream()).getCurrentPath()
: path;
setStreamOptions(stream);
}

Expand Down Expand Up @@ -342,4 +347,8 @@ public void unbuffer() {
}
}
}

public Path getReaderPath() {
return readerPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class FileLink {
* FileLink InputStream that handles the switch between the original path and the alternative
* locations, when the file is moved.
*/
private static class FileLinkInputStream extends InputStream
protected static class FileLinkInputStream extends InputStream
implements Seekable, PositionedReadable, CanSetDropBehind, CanSetReadahead, CanUnbuffer {
private FSDataInputStream in = null;
private Path currentPath = null;
Expand Down Expand Up @@ -282,6 +282,10 @@ public void setReadahead(Long readahead) throws IOException, UnsupportedOperatio
public void setDropBehind(Boolean dropCache) throws IOException, UnsupportedOperationException {
in.setDropBehind(dropCache);
}

public Path getCurrentPath() {
return currentPath;
}
}

private Path[] locations = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +42,15 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
public void run() {
long offset = 0;
long end = 0;
HFile.Reader prefetchStreamReader = null;
try {
ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
.withReaderType(ReaderContext.ReaderType.STREAM)
.withInputStreamWrapper(new FSDataInputStreamWrapper(context.getFileSystem(),
context.getInputStreamWrapper().getReaderPath()))
.build();
prefetchStreamReader =
new HFileStreamReader(streamReaderContext, fileInfo, cacheConf, conf);
end = getTrailer().getLoadOnOpenDataOffset();
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
Expand All @@ -56,8 +65,8 @@ public void run() {
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
/* pread= */true, false, false, null, null, true);
HFileBlock block = prefetchStreamReader.readBlock(offset, onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
Expand All @@ -77,6 +86,13 @@ public void run() {
// Other exceptions are interesting
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
} finally {
if (prefetchStreamReader != null) {
try {
prefetchStreamReader.close(false);
} catch (IOException e) {
LOG.warn("Close prefetch stream reader failed, path: " + path, e);
}
}
PrefetchExecutor.complete(path);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ public class ReaderContextBuilder {
public ReaderContextBuilder() {
}

public static ReaderContextBuilder newBuilder(ReaderContext readerContext) {
return new ReaderContextBuilder(readerContext);
}

private ReaderContextBuilder(ReaderContext readerContext) {
this.filePath = readerContext.getFilePath();
this.fsdis = readerContext.getInputStreamWrapper();
this.fileSize = readerContext.getFileSize();
this.hfs = readerContext.getFileSystem();
this.type = readerContext.getReaderType();
}

public ReaderContextBuilder withFilePath(Path filePath) {
this.filePath = filePath;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,33 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -190,6 +201,60 @@ public void testPrefetchCompressed() throws Exception {

}

@Test
public void testPrefetchDoesntSkipHFileLink() throws Exception {
testPrefetchWhenHFileLink(c -> {
boolean isCached = c != null;
assertTrue(isCached);
});
}

private void testPrefetchWhenHFileLink(Consumer<Cacheable> test) throws Exception {
cacheConf = new CacheConfig(conf, blockCache);
HFileContext context = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
Path testDir = TEST_UTIL.getDataTestDir("testPrefetchWhenHFileLink");
final RegionInfo hri =
RegionInfoBuilder.newBuilder(TableName.valueOf("testPrefetchWhenHFileLink")).build();
// force temp data in hbase/target/test-data instead of /tmp/hbase-xxxx/
Configuration testConf = new Configuration(this.conf);
CommonFSUtils.setRootDir(testConf, testDir);
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(testConf, fs,
CommonFSUtils.getTableDir(testDir, hri.getTable()), hri);

// Make a store file and write data to it.
StoreFileWriter writer = new StoreFileWriter.Builder(conf, cacheConf, this.fs)
.withFilePath(regionFs.createTempName()).withFileContext(context).build();
TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("testPrefetchWhenHFileLink"),
Bytes.toBytes("testPrefetchWhenHFileLink"));

Path storeFilePath = regionFs.commitStoreFile("cf", writer.getPath());
Path dstPath = new Path(regionFs.getTableDir(), new Path("test-region", "cf"));
HFileLink.create(testConf, this.fs, dstPath, hri, storeFilePath.getName());
Path linkFilePath =
new Path(dstPath, HFileLink.createHFileLinkName(hri, storeFilePath.getName()));

// Try to open store file from link
StoreFileInfo storeFileInfo = new StoreFileInfo(testConf, this.fs, linkFilePath, true);
HStoreFile hsf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
assertTrue(storeFileInfo.isLink());

hsf.initReader();
HFile.Reader reader = hsf.getReader().getHFileReader();
while (!reader.prefetchComplete()) {
// Sleep for a bit
Thread.sleep(1000);
}
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null, true);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
if (block.getBlockType() == BlockType.DATA) {
test.accept(blockCache.getBlock(blockCacheKey, true, false, true));
}
offset += block.getOnDiskSizeWithHeader();
}
}

private Path writeStoreFile(String fname) throws IOException {
HFileContext meta = new HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
return writeStoreFile(fname, meta);
Expand Down Expand Up @@ -227,5 +292,4 @@ public static KeyValue.Type generateKeyType(Random rand) {
return keyType;
}
}

}

0 comments on commit 2933be3

Please sign in to comment.