Skip to content

Commit

Permalink
HBASE-28579 Hide HFileScanner related methods in StoreFileReader (apa…
Browse files Browse the repository at this point in the history
…che#5889)

Signed-off-by: Xin Sun <sunxin@apache.org>
  • Loading branch information
Apache9 authored and vinayakphegde committed May 21, 2024
1 parent 2132ab1 commit bad64ab
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected boolean isTop() {
}

@Override
public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
protected HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
return new HFileScanner() {
Expand Down Expand Up @@ -283,7 +283,7 @@ public Optional<Cell> getLastKey() {
return super.getLastKey();
}
// Get a scanner that caches the block and that uses pread.
HFileScanner scanner = getScanner(true, true);
HFileScanner scanner = getScanner(true, true, false);
try {
if (scanner.seekBefore(this.splitCell)) {
return Optional.ofNullable(scanner.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,31 +183,9 @@ void readCompleted() {
}

/**
* @deprecated since 2.0.0 and will be removed in 3.0.0. Do not write further code which depends
* on this call. Instead use getStoreFileScanner() which uses the StoreFileScanner
* class/interface which is the preferred way to scan a store with higher level
* concepts.
* @param cacheBlocks should we cache the blocks?
* @param pread use pread (for concurrent small readers)
* @return the underlying HFileScanner
* @see <a href="https://issues.apache.org/jira/browse/HBASE-15296">HBASE-15296</a>
* Will be overridden in HalfStoreFileReader
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
return getScanner(cacheBlocks, pread, false);
}

/**
* @deprecated since 2.0.0 and will be removed in 3.0.0. Do not write further code which depends
* on this call. Instead use getStoreFileScanner() which uses the StoreFileScanner
* class/interface which is the preferred way to scan a store with higher level
* concepts. should we cache the blocks? use pread (for concurrent small readers) is
* scanner being used for compaction?
* @return the underlying HFileScanner
* @see <a href="https://issues.apache.org/jira/browse/HBASE-15296">HBASE-15296</a>
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction) {
protected HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction) {
return reader.getScanner(conf, cacheBlocks, pread, isCompaction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncAdmin;
Expand All @@ -74,7 +75,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
Expand All @@ -83,11 +83,12 @@
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -757,6 +758,41 @@ static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
}

private static StoreFileWriter initStoreFileWriter(Configuration conf, Cell cell,
HFileContext hFileContext, CacheConfig cacheConf, BloomType bloomFilterType, FileSystem fs,
Path outFile, AsyncTableRegionLocator loc) throws IOException {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
byte[] rowKey = CellUtil.cloneRow(cell);
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
InetSocketAddress[] favoredNodes = null;
if (null == hRegionLocation) {
LOG.warn("Failed get region location for rowkey {} , Using writer without favoured nodes.",
Bytes.toString(rowKey));
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
if (initialIsa.isUnresolved()) {
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
hRegionLocation);
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
favoredNodes = new InetSocketAddress[] { initialIsa };
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
}
} else {
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
}
}

/**
* Copy half of an HFile into a new HFile with favored nodes.
*/
Expand All @@ -765,14 +801,14 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
throws IOException {
FileSystem fs = inFile.getFileSystem(conf);
CacheConfig cacheConf = CacheConfig.DISABLED;
HalfStoreFileReader halfReader = null;
StoreFileReader halfReader = null;
StoreFileWriter halfWriter = null;
try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
StoreFileInfo storeFileInfo =
new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
storeFileInfo.initHFileInfo(context);
halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
halfReader = storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();

Expand All @@ -785,51 +821,22 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();

HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();
do {
final Cell cell = scanner.getCell();
if (null != halfWriter) {
halfWriter.append(cell);
} else {

// init halfwriter
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
byte[] rowKey = CellUtil.cloneRow(cell);
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
InetSocketAddress[] favoredNodes = null;
if (null == hRegionLocation) {
LOG.warn(
"Failed get region location for rowkey {} , Using writer without favoured nodes.",
Bytes.toString(rowKey));
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
if (initialIsa.isUnresolved()) {
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
hRegionLocation);
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
favoredNodes = new InetSocketAddress[] { initialIsa };
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
}
} else {
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
try (StoreFileScanner scanner =
halfReader.getStoreFileScanner(false, false, false, Long.MAX_VALUE, 0, false)) {
scanner.seek(KeyValue.LOWESTKEY);
for (;;) {
Cell cell = scanner.next();
if (cell == null) {
break;
}
if (halfWriter == null) {
// init halfwriter
halfWriter = initStoreFileWriter(conf, cell, hFileContext, cacheConf, bloomFilterType,
fs, outFile, loc);
}
halfWriter.append(cell);
}

} while (scanner.next());

}
for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
if (shouldCopyHFileMetaKey(entry.getKey())) {
halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,22 @@ private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, Cach
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);

scanner.seekTo();
Cell curr;
do {
curr = scanner.getCell();
KeyValue reseekKv = getLastOnCol(curr);
int ret = scanner.reseekTo(reseekKv);
assertTrue("reseek to returned: " + ret, ret > 0);
// System.out.println(curr + ": " + ret);
} while (scanner.next());

int ret = scanner.reseekTo(getLastOnCol(curr));
// System.out.println("Last reseek: " + ret);
assertTrue(ret > 0);
try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {

scanner.seekTo();
Cell curr;
do {
curr = scanner.getCell();
KeyValue reseekKv = getLastOnCol(curr);
int ret = scanner.reseekTo(reseekKv);
assertTrue("reseek to returned: " + ret, ret > 0);
// System.out.println(curr + ": " + ret);
} while (scanner.next());

int ret = scanner.reseekTo(getLastOnCol(curr));
// System.out.println("Last reseek: " + ret);
assertTrue(ret > 0);
}

halfreader.close(true);
}
Expand Down Expand Up @@ -222,9 +223,14 @@ private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell se
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore);
return scanner.getCell();
try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
scanner.seekBefore(seekBefore);
if (scanner.getCell() != null) {
return KeyValueUtil.copyToNewKeyValue(scanner.getCell());
} else {
return null;
}
}
}

private KeyValue getLastOnCol(Cell curr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -62,7 +63,6 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
Expand Down Expand Up @@ -316,13 +316,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
private int count() throws IOException {
int count = 0;
for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
f.initReader();
try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
scanner.seek(KeyValue.LOWESTKEY);
while (scanner.next() != null) {
count++;
}
}
do {
count++;
} while (scanner.next());
}
return count;
}
Expand Down
Loading

0 comments on commit bad64ab

Please sign in to comment.