diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 7e3c4340947b..51a9a07c9a2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -230,7 +230,8 @@ private enum ScanResumerState { // Notice that, the public methods of this class is supposed to be called by upper layer only, and // package private methods can only be called within the implementation of // AsyncScanSingleRegionRpcRetryingCaller. - private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { + @InterfaceAudience.Private + final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer { // INITIALIZED -> SUSPENDED -> RESUMED // INITIALIZED -> RESUMED @@ -250,6 +251,18 @@ private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanRe @Override public void resume() { + doResume(false); + } + + /** + * This method is used when {@link ScanControllerImpl#suspend} had ever been called to get a + * {@link ScanResumerImpl}, but now user stops scan and does not need any more scan results. + */ + public void terminate() { + doResume(true); + } + + private void doResume(boolean stopScan) { // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we // just return at the first if condition without loading the resp and numValidResuls field. If // resume is called after suspend, then it is also safe to just reference resp and @@ -274,7 +287,11 @@ public void resume() { localResp = this.resp; localNumberOfCompleteRows = this.numberOfCompleteRows; } - completeOrNext(localResp, localNumberOfCompleteRows); + if (stopScan) { + stopScan(localResp); + } else { + completeOrNext(localResp, localNumberOfCompleteRows); + } } private void scheduleRenewLeaseTask() { @@ -536,12 +553,7 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) { } ScanControllerState state = scanController.destroy(); if (state == ScanControllerState.TERMINATED) { - if (resp.getMoreResultsInRegion()) { - // we have more results in region but user request to stop the scan, so we need to close the - // scanner explicitly. - closeScanner(); - } - completeNoMoreResults(); + stopScan(resp); return; } int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore; @@ -553,6 +565,15 @@ private void onComplete(HBaseRpcController controller, ScanResponse resp) { completeOrNext(resp, numberOfCompleteRows); } + private void stopScan(ScanResponse resp) { + if (resp.getMoreResultsInRegion()) { + // we have more results in region but user request to stop the scan, so we need to close the + // scanner explicitly. + closeScanner(); + } + completeNoMoreResults(); + } + private void call() { // As we have a call sequence for scan, it is useless to have a different rpc timeout which is // less than the scan timeout. If the server does not respond in time(usually this will not diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index 1f9d7497ee0f..7773042434c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -25,6 +25,7 @@ import java.util.ArrayDeque; import java.util.Queue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncScanSingleRegionRpcRetryingCaller.ScanResumerImpl; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -143,6 +144,25 @@ private void resumePrefetch() { resumer = null; } + private void terminateResumerIfPossible() { + if (resumer == null) { + return; + } + // AsyncTableResultScanner.close means we do not need scan results any more, but for + // ScanResumerImpl.resume, it would perform another scan on RegionServer and call + // AsyncTableResultScanner.onNext again when ScanResponse is received. This time + // AsyncTableResultScanner.onNext would do nothing else but just discard ScanResponse + // because AsyncTableResultScanner.closed is true. So here we would better save this + // unnecessary scan on RegionServer and introduce ScanResumerImpl.terminate to close + // scanner directly. + if (resumer instanceof ScanResumerImpl) { + ((ScanResumerImpl) resumer).terminate(); + } else { + resumePrefetch(); + } + resumer = null; + } + @Override public synchronized Result next() throws IOException { while (queue.isEmpty()) { @@ -173,9 +193,7 @@ public synchronized void close() { closed = true; queue.clear(); cacheSize = 0; - if (resumer != null) { - resumePrefetch(); - } + terminateResumerIfPossible(); notifyAll(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java index 4507664715fc..4f52e6c8c13f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScannerCloseWhileSuspending.java @@ -19,7 +19,9 @@ import static org.junit.Assert.assertEquals; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -78,21 +80,41 @@ private int getScannersCount() { @Test public void testCloseScannerWhileSuspending() throws Exception { - try (ResultScanner scanner = TABLE.getScanner(new Scan().setMaxResultSize(1))) { - TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { - - @Override - public boolean evaluate() throws Exception { - return ((AsyncTableResultScanner) scanner).isSuspended(); - } - - @Override - public String explainFailure() throws Exception { - return "The given scanner has been suspended in time"; - } - }); - assertEquals(1, getScannersCount()); - } + final AtomicInteger onNextCounter = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + final Scan scan = new Scan().setMaxResultSize(1); + final AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { + @Override + public void onNext(Result[] results, ScanController controller) { + onNextCounter.incrementAndGet(); + super.onNext(results, controller); + } + + @Override + public void onComplete() { + super.onComplete(); + latch.countDown(); + } + }; + + CONN.getTable(TABLE_NAME).scan(scan, scanner); + + TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return scanner.isSuspended(); + } + + @Override + public String explainFailure() throws Exception { + return "The given scanner has been suspended in time"; + } + }); + assertEquals(1, getScannersCount()); + assertEquals(1, onNextCounter.get()); + + scanner.close(); TEST_UTIL.waitFor(10000, 100, new ExplainingPredicate() { @Override @@ -105,5 +127,7 @@ public String explainFailure() throws Exception { return "Still have " + getScannersCount() + " scanners opened"; } }); + latch.await(); + assertEquals(1, onNextCounter.get()); } }