Skip to content

Commit

Permalink
HBASE-28509 ScanResumer.resume would perform unnecessary scan when cl… (
Browse files Browse the repository at this point in the history
  • Loading branch information
comnetwork authored Apr 20, 2024
1 parent a34b4bc commit bc37ce8
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ private enum ScanResumerState {
// Notice that, the public methods of this class is supposed to be called by upper layer only, and
// package private methods can only be called within the implementation of
// AsyncScanSingleRegionRpcRetryingCaller.
private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
@InterfaceAudience.Private
final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {

// INITIALIZED -> SUSPENDED -> RESUMED
// INITIALIZED -> RESUMED
Expand All @@ -250,6 +251,18 @@ private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanRe

@Override
public void resume() {
doResume(false);
}

/**
* This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a
* {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results.
*/
public void terminate() {
doResume(true);
}

private void doResume(boolean stopScan) {
// just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
// just return at the first if condition without loading the resp and numValidResuls field. If
// resume is called after suspend, then it is also safe to just reference resp and
Expand All @@ -274,7 +287,11 @@ public void resume() {
localResp = this.resp;
localNumberOfCompleteRows = this.numberOfCompleteRows;
}
completeOrNext(localResp, localNumberOfCompleteRows);
if (stopScan) {
stopScan(localResp);
} else {
completeOrNext(localResp, localNumberOfCompleteRows);
}
}

private void scheduleRenewLeaseTask() {
Expand Down Expand Up @@ -536,12 +553,7 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) {
}
ScanControllerState state = scanController.destroy();
if (state == ScanControllerState.TERMINATED) {
if (resp.getMoreResultsInRegion()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
closeScanner();
}
completeNoMoreResults();
stopScan(resp);
return;
}
int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
Expand All @@ -553,6 +565,15 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) {
completeOrNext(resp, numberOfCompleteRows);
}

private void stopScan(ScanResponse resp) {
if (resp.getMoreResultsInRegion()) {
// we have more results in region but user request to stop the scan, so we need to close the
// scanner explicitly.
closeScanner();
}
completeNoMoreResults();
}

private void call() {
// As we have a call sequence for scan, it is useless to have a different rpc timeout which is
// less than the scan timeout. If the server does not respond in time(usually this will not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -143,6 +144,25 @@ private void resumePrefetch() {
resumer = null;
}

private void terminateResumerIfPossible() {
if (resumer == null) {
return;
}
// AsyncTableResultScanner.close means we do not need scan results any more, but for
// ScanResumerImpl.resume, it would perform another scan on RegionServer and call
// AsyncTableResultScanner.onNext again when ScanResponse is received. This time
// AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse
// because AsyncTableResultScanner.closed is true. So here we would better save this
// unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close
// scanner directly.
if (resumer instanceof ScanResumerImpl) {
((ScanResumerImpl) resumer).terminate();
} else {
resumePrefetch();
}
resumer = null;
}

@Override
public synchronized Result next() throws IOException {
while (queue.isEmpty()) {
Expand Down Expand Up @@ -173,9 +193,7 @@ public synchronized void close() {
closed = true;
queue.clear();
cacheSize = 0;
if (resumer != null) {
resumePrefetch();
}
terminateResumerIfPossible();
notifyAll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -78,21 +80,41 @@ private int getScannersCount() {

@Test
public void testCloseScannerWhileSuspending() throws Exception {
try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) {
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return ((AsyncTableResultScanner) scanner).isSuspended();
}

@Override
public String explainFailure() throws Exception {
return "The given scanner has been suspended in time";
}
});
assertEquals(1, getScannersCount());
}
final AtomicInteger onNextCounter = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(1);
final Scan scan = new Scan().setMaxResultSize(1);
final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
@Override
public void onNext(Result[] results, ScanController controller) {
onNextCounter.incrementAndGet();
super.onNext(results, controller);
}

@Override
public void onComplete() {
super.onComplete();
latch.countDown();
}
};

CONN.getTable(TABLE_NAME).scan(scan, scanner);

TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
return scanner.isSuspended();
}

@Override
public String explainFailure() throws Exception {
return "The given scanner has been suspended in time";
}
});
assertEquals(1, getScannersCount());
assertEquals(1, onNextCounter.get());

scanner.close();
TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate<Exception>() {

@Override
Expand All @@ -105,5 +127,7 @@ public String explainFailure() throws Exception {
return "Still have " + getScannersCount() + " scanners opened";
}
});
latch.await();
assertEquals(1, onNextCounter.get());
}
}

0 comments on commit bc37ce8

Please sign in to comment.