Skip to content

Commit

Permalink
fix tests and fix priority of openScanner call
Browse files Browse the repository at this point in the history
  • Loading branch information
bbeaudreault committed Mar 3, 2022
1 parent a5ab3ce commit 0f6b831
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private void startScan(OpenScannerResponse resp) {
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -35,6 +38,9 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -92,41 +98,18 @@ public class TestAsyncTableRpcPriority {

private ClientService.Interface stub;

private ExecutorService threadPool;

private AsyncConnection conn;

@Rule
public TestName name = new TestName();

@Before
public void setUp() throws IOException {
this.threadPool = Executors.newSingleThreadExecutor();
stub = mock(ClientService.Interface.class);
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
}
}
return null;
}
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {

@Override
Expand Down Expand Up @@ -489,69 +472,123 @@ public void testCheckAndMutateMetaTable() throws IOException {
any(ClientProtos.MultiRequest.class), any());
}

private void mockScan(int scanPriority) {
int scannerId = 1;
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() ->{
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(
ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
assertFalse("close scanner should not come in with scan priority " + scanPriority,
req.hasCloseScanner() && req.getCloseScanner());

Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
}
});
return null;
}
}).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());

doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() ->{
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
assertTrue("close request should have scannerId", req.hasScannerId());
assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
assertTrue("close request should have closerScanner set", req.hasCloseScanner() && req.getCloseScanner());

done.run(ScanResponse.getDefaultInstance());
});
return null;
}
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
}

@Test
public void testScan() throws IOException, InterruptedException {
mockScan(19);
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19).setLimit(10))) {
for (Result result : scanner) {
assertNotNull(result);
Thread.sleep(1000);
}
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);

// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(assertPriority(19), any(ScanRequest.class), any());
// close should use high qos
verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any());
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}

@Test
public void testScanNormalTable() throws IOException, InterruptedException {
mockScan(NORMAL_QOS);
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setLimit(10))) {
for (Result result : scanner) {
assertNotNull(result);
Thread.sleep(1000);
}
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);

// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
// close should use high qos
verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any());
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}

@Test
public void testScanSystemTable() throws IOException, InterruptedException {
mockScan(SYSTEMTABLE_QOS);
try (ResultScanner scanner =
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setLimit(10))) {
for (Result result : scanner) {
assertNotNull(result);
Thread.sleep(1000);
}
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);

// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
// close should use high qos
verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any());
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}

@Test
public void testScanMetaTable() throws IOException, InterruptedException {
mockScan(SYSTEMTABLE_QOS);
try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
for (Result result : scanner) {
assertNotNull(result);
Thread.sleep(1000);
}
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(3)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
// close should use high qos
verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);

// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then several renew lease
verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any());
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}

@Test
Expand Down

0 comments on commit 0f6b831

Please sign in to comment.