Skip to content

Commit

Permalink
HBASE-27227 Long running heavily filtered scans hold up too many Byte…
Browse files Browse the repository at this point in the history
…BuffAllocator buffers
  • Loading branch information
bbeaudreault committed Jan 5, 2023
1 parent 7ef63b6 commit c2ede4b
Show file tree
Hide file tree
Showing 14 changed files with 811 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public void checkpoint(State state) {
this.delegate.checkpoint(state);
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
return createBuilder(blk, newBuf).build();
}

static HFileBlock deepCloneOnHeap(HFileBlock blk) {
// Publicly visible for access in tests
public static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk, deepCloned).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,13 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Previous blocks that were used in the course of the read

// Updated to the current prevBlocks size when checkpoint is called. Used to eagerly release
// any blocks accumulated in the fetching of a row, if that row is thrown away due to filterRow.
private int lastCheckpointIndex = -1;

// Previous blocks that were used in the course of the read, to be released at close,
// checkpoint, or shipped
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
Expand Down Expand Up @@ -366,8 +372,15 @@ void reset() {
}

private void returnBlocks(boolean returnAll) {
this.prevBlocks.forEach(HFileBlock::release);
this.prevBlocks.forEach((block) -> {
if (block != null) {
block.release();
}
});
this.prevBlocks.clear();
if (lastCheckpointIndex > 0) {
this.lastCheckpointIndex = 0;
}
if (returnAll && this.curBlock != null) {
this.curBlock.release();
this.curBlock = null;
Expand Down Expand Up @@ -1047,6 +1060,23 @@ public int compareKey(CellComparator comparator, Cell key) {
public void shipped() throws IOException {
this.returnBlocks(false);
}

/**
* Sets the last checkpoint index to the current prevBlocks size. If called with
* {@link State#FILTERED}, releases and nulls out any prevBlocks entries which were added since
* the last checkpoint. Nulls out instead of removing to avoid unnecessary resizing of the list.
*/
@Override
public void checkpoint(State state) {
if (state == State.FILTERED) {
assert lastCheckpointIndex >= 0;
for (int i = lastCheckpointIndex; i < prevBlocks.size(); i++) {
prevBlocks.get(i).release();
prevBlocks.set(i, null);
}
}
lastCheckpointIndex = prevBlocks.size();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,24 @@ public void shipped() throws IOException {
}
}
}

@Override
public void checkpoint(State state) {
if (current != null) {
current.checkpoint(state);
}
if (this.heap != null) {
for (KeyValueScanner scanner : this.heap) {
scanner.checkpoint(state);
}
}
// Also checkpoint any scanners for delayed close. These would be exhausted scanners,
// which may contain blocks that were totally filtered during a request. If so, the checkpoint
// will release them.
if (scannersForDelayedClose != null) {
for (KeyValueScanner scanner : scannersForDelayedClose) {
scanner.checkpoint(state);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public Cell getNextIndexedKey() {
public void shipped() throws IOException {
// do nothing
}

@Override
public void checkpoint(State state) {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;

checkpoint(State.START);

// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
Expand Down Expand Up @@ -501,6 +503,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
checkpoint(State.FILTERED);

// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
Expand Down Expand Up @@ -553,6 +556,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
checkpoint(State.FILTERED);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
Expand Down Expand Up @@ -602,6 +606,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
checkpoint(State.FILTERED);
incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
Expand Down Expand Up @@ -783,6 +788,16 @@ public void shipped() throws IOException {
}
}

@Override
public void checkpoint(State state) {
if (storeHeap != null) {
storeHeap.checkpoint(state);
}
if (joinedHeap != null) {
joinedHeap.checkpoint(state);
}
}

@Override
public void run() throws IOException {
// This is the RPC callback method executed. We do the close in of the scanner in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ public void shipped() throws IOException {
// do nothing
}

@Override
public void checkpoint(State state) {
// do nothing
}

// debug method
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
/**
* This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
* to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
* {@link #shipped()} will get called.
* {@link #shipped()} will get called. <br>
* Scans of large numbers of fully filtered blocks (due to Filter, or sparse columns, etc) can cause
* excess memory to be held while waiting for {@link #shipped()} to be called. Therefore, there's a
* checkpoint mechanism via {@link #checkpoint(State)}. These enable fully filtered blocks to be
* eagerly released, since they are not referenced by cells being returned to clients.
*/
@InterfaceAudience.Private
public interface Shipper {
Expand All @@ -33,4 +37,19 @@ public interface Shipper {
* can be done here.
*/
void shipped() throws IOException;

enum State {
START,
FILTERED
}

/**
* Called during processing of a batch of scanned rows, before returning to the client. Allows
* releasing of blocks which have been totally skipped in the result set due to filters. <br>
* Should be called with {@link State#START} at the beginning of a request for a row. This will
* set state necessary to handle {@link State#FILTERED}. Calling with {@link State#FILTERED} will
* release any blocks which have been fully processed since the last call to
* {@link #checkpoint(State)}. Calling again with {@link State#START} will reset the pointers.
*/
void checkpoint(State state);
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,4 +558,9 @@ public Cell getNextIndexedKey() {
public void shipped() throws IOException {
this.hfs.shipped();
}

@Override
public void checkpoint(State state) {
this.hfs.checkpoint(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final long readPt;
private boolean topChanged = false;

// when creating new scanners, i.e. in flush or switching to stream read, we want
// to checkpoint the new scanners iff we've received a checkpoint call ourselves.
// this keeps the new scanners in sync with the old in terms of enabling eager release
// of unneeded blocks.
private boolean checkpointed = false;

/** An internal constructor. */
private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt,
boolean cacheBlocks, ScanType scanType) {
Expand Down Expand Up @@ -250,8 +256,8 @@ public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byt
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
seekScanners(scanners, matcher.getStartKey(), explicitColumnQuery && lazySeekEnabledGlobally,
parallelSeekEnabled);
seekScannersWithCheckpoint(scanners, matcher.getStartKey(),
explicitColumnQuery && lazySeekEnabledGlobally, parallelSeekEnabled);

// set storeLimit
this.storeLimit = scan.getMaxResultsPerColumnFamily();
Expand Down Expand Up @@ -317,7 +323,7 @@ private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueSca
scanners = selectScannersFrom(store, scanners);

// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
seekScannersWithCheckpoint(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, comparator);
Expand All @@ -326,7 +332,7 @@ private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueSca
private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners)
throws IOException {
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
seekScannersWithCheckpoint(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
resetKVHeap(scanners, comparator);
}
Expand Down Expand Up @@ -359,9 +365,15 @@ public StoreScanner(ScanInfo scanInfo, ScanType scanType,

// Used to instantiate a scanner for user scan in test
StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
this(null, scan, scanInfo, columns, scanners);
}

// Used to instantiate a scanner for user scan in test
StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
this(store, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(),
ScanType.USER_SCAN);
this.matcher =
UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
Expand All @@ -383,6 +395,25 @@ boolean isScanUsePread() {
return this.scanUsePread;
}

/**
* Seek the specified scanners with the given key. Delegates to
* {@link #seekScanners(List, Cell, boolean, boolean)}, but also checkpoints the scanners
* afterward if this StoreScanner has been checkpointed yet.
* @param scanners the scanners to seek
* @param seekKey the key to seek to
* @param isLazy true if lazy seek
* @param isParallelSeek true if using parallel seek
*/
private void seekScannersWithCheckpoint(List<? extends KeyValueScanner> scanners, Cell seekKey,
boolean isLazy, boolean isParallelSeek) throws IOException {
seekScanners(scanners, seekKey, isLazy, isParallelSeek);
if (checkpointed) {
for (KeyValueScanner scanner : scanners) {
scanner.checkpoint(State.START);
}
}
}

/**
* Seek the specified scanners with the given key
* @param isLazy true if using lazy seek
Expand Down Expand Up @@ -764,6 +795,11 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
if (count > 0 && matcher.isUserScan()) {
// if true increment memstore metrics, if not the mixed one
updateMetricsStore(onlyFromMemstore);
} else if (count == 0 && checkpointed) {
// If we returned nothing, it means the row has been filtered for this store. If we've
// previously checkpointed, we can call checkpoint again here to release any blocks we may
// have scanned in reaching this point.
checkpoint(State.FILTERED);
}
}
}
Expand Down Expand Up @@ -1011,7 +1047,7 @@ protected final boolean reopenAfterFlush() throws IOException {
}

// Seek the new scanners to the last key
seekScanners(scanners, lastTop, false, parallelSeekEnabled);
seekScannersWithCheckpoint(scanners, lastTop, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = currentScanners.size() - 1; i >= 0; i--) {
if (!currentScanners.get(i).isFileScanner()) {
Expand Down Expand Up @@ -1117,7 +1153,7 @@ void trySwitchToStreamRead() {
if (fileScanners == null) {
return;
}
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
seekScannersWithCheckpoint(fileScanners, lastTop, false, parallelSeekEnabled);
newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
newCurrentScanners.addAll(fileScanners);
newCurrentScanners.addAll(memstoreScanners);
Expand Down Expand Up @@ -1241,4 +1277,18 @@ public void shipped() throws IOException {
trySwitchToStreamRead();
}
}

@Override
public void checkpoint(State state) {
this.checkpointed = true;
if (this.heap != null) {
this.heap.checkpoint(state);
}
// Also checkpoint any scanners for delayed close. These would be exhausted scanners,
// which may contain blocks that were totally filtered during a request. If so, the checkpoint
// will release them.
for (KeyValueScanner scanner : scannersForDelayedClose) {
scanner.checkpoint(state);
}
}
}
Loading

0 comments on commit c2ede4b

Please sign in to comment.