diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index c34f87279495..58aa602591ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -129,7 +129,7 @@ protected static List getTableAndScanCreatorParams() { protected abstract Scan createScan(); - protected abstract List doScan(Scan scan) throws Exception; + protected abstract List doScan(Scan scan, int closeAfter) throws Exception; protected final List convertFromBatchResult(List results) { assertTrue(results.size() % 2 == 0); @@ -145,7 +145,7 @@ protected final List convertFromBatchResult(List results) { @Test public void testScanAll() throws Exception { - List results = doScan(createScan()); + List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) .forEach( @@ -169,7 +169,7 @@ private void assertResultEquals(Result result, int i) { @Test public void testReversedScanAll() throws Exception { - List results = doScan(createScan().setReversed(true)); + List results = doScan(createScan().setReversed(true), -1); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); } @@ -178,7 +178,7 @@ public void testReversedScanAll() throws Exception { public void testScanNoStopKey() throws Exception { int start = 345; List results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); } @@ -187,7 +187,7 @@ public void testScanNoStopKey() throws Exception { public void testReverseScanNoStopKey() throws Exception { int start = 765; List results = doScan( - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); + createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); } @@ -195,7 +195,7 @@ public void testReverseScanNoStopKey() throws Exception { @Test public void testScanWrongColumnFamily() throws Exception { try { - doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily"))); + doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1); } catch (Exception e) { assertTrue(e instanceof NoSuchColumnFamilyException || e.getCause() instanceof NoSuchColumnFamilyException); @@ -203,20 +203,28 @@ public void testScanWrongColumnFamily() throws Exception { } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, - int limit) throws Exception { + int limit) throws Exception { + testScan(start, startInclusive, stop, stopInclusive, limit, -1); + } + + private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, + int limit, int closeAfter) throws Exception { Scan scan = createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, closeAfter); int actualStart = startInclusive ? start : start + 1; int actualStop = stopInclusive ? stop + 1 : stop; int count = actualStop - actualStart; if (limit > 0) { count = Math.min(count, limit); } + if (closeAfter > 0) { + count = Math.min(count, closeAfter); + } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); } @@ -229,12 +237,15 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole if (limit > 0) { scan.setLimit(limit); } - List results = doScan(scan); + List results = doScan(scan, -1); int actualStart = startInclusive ? start : start - 1; int actualStop = stopInclusive ? stop - 1 : stop; int count = actualStart - actualStop; if (limit > 0) { count = Math.min(count, limit); + } + if (scan.getBatch() > 0) { + } assertEquals(count, results.size()); IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); @@ -309,4 +320,13 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { testReversedScan(765, false, 543, true, 200); testReversedScan(876, false, 654, false, 200); } + + @Test + public void testScanEndingEarly() throws Exception { + testScan(1, true, 998, false, 0, 900); // from first region to last region + testScan(123, true, 234, true, 0, 100); + testScan(234, true, 456, false, 0, 100); + testScan(345, false, 567, true, 0, 100); + testScan(456, false, 678, false, 0, 100); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index 42d2c38376d6..04aaba1deacb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,12 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -55,16 +57,74 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); - table.scan(scan, consumer); - List results = consumer.getAll(); + List results; + if (closeAfter > 0) { + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } + LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter); + table.scan(scan, consumer); + results = consumer.getAll(); + } else { + SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + table.scan(scan, consumer); + results = consumer.getAll(); + } if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } return results; } + private static class LimitedScanResultConsumer implements ScanResultConsumer { + + private final int limit; + + public LimitedScanResultConsumer(int limit) { + this.limit = limit; + } + + private final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return results.size() < limit; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index d9a53952ab8c..96c2d40138ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -60,11 +60,16 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { List results = getTable.get().scanAll(scan).get(); if (scan.getBatch() > 0) { results = convertFromBatchResult(results); } + // we can't really close the scan early for scanAll, but to keep the assertions + // simple in AbstractTestAsyncTableScan we'll just sublist here instead. + if (closeAfter > 0 && closeAfter < results.size()) { + results = results.subList(0, closeAfter); + } return results; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index f832cfd759a3..2e990f763da0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -62,12 +62,21 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } try (ResultScanner scanner = table.getScanner(scan)) { for (Result result; (result = scanner.next()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } } if (scan.getBatch() > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 78a54edd24b3..26c201e19865 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -55,12 +55,21 @@ protected Scan createScan() { } @Override - protected List doScan(Scan scan) throws Exception { + protected List doScan(Scan scan, int closeAfter) throws Exception { BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); + // these tests batch settings with the sample data result in each result being + // split in two. so we must allow twice the expected results in order to reach + // our true limit. see convertFromBatchResult for details. + if (closeAfter > 0 && scan.getBatch() > 0) { + closeAfter = closeAfter * 2; + } for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); + if (closeAfter > 0 && results.size() >= closeAfter) { + break; + } } if (scan.getBatch() > 0) { results = convertFromBatchResult(results);