Skip to content

Commit

Permalink
Add test case to AbstractTestAsyncTableScan suite for testing closing…
Browse files Browse the repository at this point in the history
… of a scan prematurely
  • Loading branch information
bbeaudreault committed Mar 3, 2022
1 parent 1635eb6 commit a5ab3ce
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected static List<Object[]> getTableAndScanCreatorParams() {

protected abstract Scan createScan();

protected abstract List<Result> doScan(Scan scan) throws Exception;
protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;

protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
Expand All @@ -145,7 +145,7 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {

@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan());
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(
Expand All @@ -169,7 +169,7 @@ private void assertResultEquals(Result result, int i) {

@Test
public void testReversedScanAll() throws Exception {
List<Result> results = doScan(createScan().setReversed(true));
List<Result> results = doScan(createScan().setReversed(true), -1);
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
}
Expand All @@ -178,7 +178,7 @@ public void testReversedScanAll() throws Exception {
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results =
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))));
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1);
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
}
Expand All @@ -187,36 +187,44 @@ public void testScanNoStopKey() throws Exception {
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
List<Result> results = doScan(
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1);
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
}

@Test
public void testScanWrongColumnFamily() throws Exception {
try {
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")));
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1);
} catch (Exception e) {
assertTrue(e instanceof NoSuchColumnFamilyException ||
e.getCause() instanceof NoSuchColumnFamilyException);
}
}

private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit) throws Exception {
int limit) throws Exception {
testScan(start, startInclusive, stop, stopInclusive, limit, -1);
}

private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit, int closeAfter) throws Exception {
Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, closeAfter);
int actualStart = startInclusive ? start : start + 1;
int actualStop = stopInclusive ? stop + 1 : stop;
int count = actualStop - actualStart;
if (limit > 0) {
count = Math.min(count, limit);
}
if (closeAfter > 0) {
count = Math.min(count, closeAfter);
}
assertEquals(count, results.size());
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
}
Expand All @@ -229,12 +237,15 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, -1);
int actualStart = startInclusive ? start : start - 1;
int actualStop = stopInclusive ? stop - 1 : stop;
int count = actualStart - actualStop;
if (limit > 0) {
count = Math.min(count, limit);
}
if (scan.getBatch() > 0) {

}
assertEquals(count, results.size());
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
Expand Down Expand Up @@ -309,4 +320,13 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
testReversedScan(765, false, 543, true, 200);
testReversedScan(876, false, 654, false, 200);
}

@Test
public void testScanEndingEarly() throws Exception {
testScan(1, true, 998, false, 0, 900); // from first region to last region
testScan(123, true, 234, true, 0, 100);
testScan(234, true, 456, false, 0, 100);
testScan(345, false, 567, true, 0, 100);
testScan(456, false, 678, false, 0, 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -55,16 +57,74 @@ protected Scan createScan() {
}

@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
table.scan(scan, consumer);
List<Result> results = consumer.getAll();
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter);
table.scan(scan, consumer);
results = consumer.getAll();
} else {
SimpleScanResultConsumer consumer = new SimpleScanResultConsumer();
table.scan(scan, consumer);
results = consumer.getAll();
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
return results;
}

private static class LimitedScanResultConsumer implements ScanResultConsumer {

private final int limit;

public LimitedScanResultConsumer(int limit) {
this.limit = limit;
}

private final List<Result> results = new ArrayList<>();

private Throwable error;

private boolean finished = false;

@Override
public synchronized boolean onNext(Result result) {
results.add(result);
return results.size() < limit;
}

@Override
public synchronized void onError(Throwable error) {
this.error = error;
finished = true;
notifyAll();
}

@Override
public synchronized void onComplete() {
finished = true;
notifyAll();
}

public synchronized List<Result> getAll() throws Exception {
while (!finished) {
wait();
}
if (error != null) {
Throwables.propagateIfPossible(error, Exception.class);
throw new Exception(error);
}
return results;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ protected Scan createScan() {
}

@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
List<Result> results = getTable.get().scanAll(scan).get();
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
}
// we can't really close the scan early for scanAll, but to keep the assertions
// simple in AbstractTestAsyncTableScan we'll just sublist here instead.
if (closeAfter > 0 && closeAfter < results.size()) {
results = results.subList(0, closeAfter);
}
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,21 @@ protected Scan createScan() {
}

@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (closeAfter > 0 && scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result result; (result = scanner.next()) != null;) {
results.add(result);
if (closeAfter > 0 && results.size() >= closeAfter) {
break;
}
}
}
if (scan.getBatch() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,21 @@ protected Scan createScan() {
}

@Override
protected List<Result> doScan(Scan scan) throws Exception {
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer();
ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
// our true limit. see convertFromBatchResult for details.
if (closeAfter > 0 && scan.getBatch() > 0) {
closeAfter = closeAfter * 2;
}
for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result);
if (closeAfter > 0 && results.size() >= closeAfter) {
break;
}
}
if (scan.getBatch() > 0) {
results = convertFromBatchResult(results);
Expand Down

0 comments on commit a5ab3ce

Please sign in to comment.