Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28579 Hide HFileScanner related methods in StoreFileReader #5889

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected boolean isTop() {
}

@Override
public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
protected HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
return new HFileScanner() {
Expand Down Expand Up @@ -297,7 +297,7 @@ public Optional<Cell> getLastKey() {
return super.getLastKey();
}
// Get a scanner that caches the block and that uses pread.
HFileScanner scanner = getScanner(true, true);
HFileScanner scanner = getScanner(true, true, false);
try {
if (scanner.seekBefore(this.splitCell)) {
return Optional.ofNullable(scanner.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,31 +183,9 @@ void readCompleted() {
}

/**
* @deprecated since 2.0.0 and will be removed in 3.0.0. Do not write further code which depends
* on this call. Instead use getStoreFileScanner() which uses the StoreFileScanner
* class/interface which is the preferred way to scan a store with higher level
* concepts.
* @param cacheBlocks should we cache the blocks?
* @param pread use pread (for concurrent small readers)
* @return the underlying HFileScanner
* @see <a href="https://issues.apache.org/jira/browse/HBASE-15296">HBASE-15296</a>
* Will be overridden in HalfStoreFileReader
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
return getScanner(cacheBlocks, pread, false);
}

/**
* @deprecated since 2.0.0 and will be removed in 3.0.0. Do not write further code which depends
* on this call. Instead use getStoreFileScanner() which uses the StoreFileScanner
* class/interface which is the preferred way to scan a store with higher level
* concepts. should we cache the blocks? use pread (for concurrent small readers) is
* scanner being used for compaction?
* @return the underlying HFileScanner
* @see <a href="https://issues.apache.org/jira/browse/HBASE-15296">HBASE-15296</a>
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction) {
protected HFileScanner getScanner(boolean cacheBlocks, boolean pread, boolean isCompaction) {
return reader.getScanner(conf, cacheBlocks, pread, isCompaction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncAdmin;
Expand All @@ -74,7 +75,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
Expand All @@ -83,11 +83,12 @@
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -757,6 +758,41 @@ static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path
copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
}

private static StoreFileWriter initStoreFileWriter(Configuration conf, Cell cell,
HFileContext hFileContext, CacheConfig cacheConf, BloomType bloomFilterType, FileSystem fs,
Path outFile, AsyncTableRegionLocator loc) throws IOException {
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
byte[] rowKey = CellUtil.cloneRow(cell);
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
InetSocketAddress[] favoredNodes = null;
if (null == hRegionLocation) {
LOG.warn("Failed get region location for rowkey {} , Using writer without favoured nodes.",
Bytes.toString(rowKey));
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
if (initialIsa.isUnresolved()) {
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
hRegionLocation);
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
favoredNodes = new InetSocketAddress[] { initialIsa };
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
}
} else {
return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
}
}

/**
* Copy half of an HFile into a new HFile with favored nodes.
*/
Expand All @@ -765,14 +801,14 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
throws IOException {
FileSystem fs = inFile.getFileSystem(conf);
CacheConfig cacheConf = CacheConfig.DISABLED;
HalfStoreFileReader halfReader = null;
StoreFileReader halfReader = null;
StoreFileWriter halfWriter = null;
try {
ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
StoreFileInfo storeFileInfo =
new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
storeFileInfo.initHFileInfo(context);
halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
halfReader = storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();

Expand All @@ -785,51 +821,22 @@ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
.withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();

HFileScanner scanner = halfReader.getScanner(false, false, false);
scanner.seekTo();
do {
final Cell cell = scanner.getCell();
if (null != halfWriter) {
halfWriter.append(cell);
} else {

// init halfwriter
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
byte[] rowKey = CellUtil.cloneRow(cell);
HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
InetSocketAddress[] favoredNodes = null;
if (null == hRegionLocation) {
LOG.warn(
"Failed get region location for rowkey {} , Using writer without favoured nodes.",
Bytes.toString(rowKey));
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
InetSocketAddress initialIsa =
new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
if (initialIsa.isUnresolved()) {
LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
hRegionLocation);
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
} else {
LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
favoredNodes = new InetSocketAddress[] { initialIsa };
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext)
.withFavoredNodes(favoredNodes).build();
}
}
} else {
halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
.withBloomType(bloomFilterType).withFileContext(hFileContext).build();
try (StoreFileScanner scanner =
halfReader.getStoreFileScanner(false, false, false, Long.MAX_VALUE, 0, false)) {
scanner.seek(KeyValue.LOWESTKEY);
for (;;) {
Cell cell = scanner.next();
if (cell == null) {
break;
}
if (halfWriter == null) {
// init halfwriter
halfWriter = initStoreFileWriter(conf, cell, hFileContext, cacheConf, bloomFilterType,
fs, outFile, loc);
}
halfWriter.append(cell);
}

} while (scanner.next());

}
for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
if (shouldCopyHFileMetaKey(entry.getKey())) {
halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,22 @@ private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, Cach
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);

scanner.seekTo();
Cell curr;
do {
curr = scanner.getCell();
KeyValue reseekKv = getLastOnCol(curr);
int ret = scanner.reseekTo(reseekKv);
assertTrue("reseek to returned: " + ret, ret > 0);
// System.out.println(curr + ": " + ret);
} while (scanner.next());

int ret = scanner.reseekTo(getLastOnCol(curr));
// System.out.println("Last reseek: " + ret);
assertTrue(ret > 0);
try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {

scanner.seekTo();
Cell curr;
do {
curr = scanner.getCell();
KeyValue reseekKv = getLastOnCol(curr);
int ret = scanner.reseekTo(reseekKv);
assertTrue("reseek to returned: " + ret, ret > 0);
// System.out.println(curr + ": " + ret);
} while (scanner.next());

int ret = scanner.reseekTo(getLastOnCol(curr));
// System.out.println("Last reseek: " + ret);
assertTrue(ret > 0);
}

halfreader.close(true);
}
Expand Down Expand Up @@ -222,9 +223,14 @@ private Cell doTestOfSeekBefore(Path p, FileSystem fs, Reference bottom, Cell se
(HalfStoreFileReader) storeFileInfo.createReader(context, cacheConfig);
storeFileInfo.getHFileInfo().initMetaAndIndex(halfreader.getHFileReader());
halfreader.loadFileInfo();
final HFileScanner scanner = halfreader.getScanner(false, false);
scanner.seekBefore(seekBefore);
return scanner.getCell();
try (HFileScanner scanner = halfreader.getScanner(false, false, false)) {
scanner.seekBefore(seekBefore);
if (scanner.getCell() != null) {
return KeyValueUtil.copyToNewKeyValue(scanner.getCell());
} else {
return null;
}
}
}

private KeyValue getLastOnCol(Cell curr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand All @@ -62,7 +63,6 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
Expand Down Expand Up @@ -316,13 +316,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
private int count() throws IOException {
int count = 0;
for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
f.initReader();
try (StoreFileScanner scanner = f.getPreadScanner(false, Long.MAX_VALUE, 0, false)) {
scanner.seek(KeyValue.LOWESTKEY);
while (scanner.next() != null) {
count++;
}
}
do {
count++;
} while (scanner.next());
}
return count;
}
Expand Down
Loading