diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 5fd00a5f6ed2..48f004c0a29c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -188,6 +188,7 @@ private void startScan(OpenScannerResponse resp) { private CompletableFuture openScanner(int replicaId) { return conn.callerFactory. single().table(tableName) .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .priority(scan.getPriority()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index 77f71d88b622..53333dfaf5dd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -21,7 +21,10 @@ import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -35,6 +38,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -92,6 +98,8 @@ public class TestAsyncTableRpcPriority { private ClientService.Interface stub; + private ExecutorService threadPool; + private AsyncConnection conn; @Rule @@ -99,34 +107,9 @@ public class TestAsyncTableRpcPriority { @Before public void setUp() throws IOException { + this.threadPool = Executors.newSingleThreadExecutor(); stub = mock(ClientService.Interface.class); - AtomicInteger scanNextCalled = new AtomicInteger(0); - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ScanRequest req = invocation.getArgument(1); - RpcCallback done = invocation.getArgument(2); - if (!req.hasScannerId()) { - done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) - .setMoreResultsInRegion(true).setMoreResults(true).build()); - } else { - if (req.hasCloseScanner() && req.getCloseScanner()) { - done.run(ScanResponse.getDefaultInstance()); - } else { - Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) - .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) - .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) - .setValue(Bytes.toBytes("v")).build(); - Result result = Result.create(Arrays.asList(cell)); - done.run( - ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) - .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); - } - } - return null; - } - }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); doAnswer(new Answer() { @Override @@ -489,69 +472,123 @@ public void testCheckAndMutateMetaTable() throws IOException { any(ClientProtos.MultiRequest.class), any()); } + private void mockScan(int scanPriority) { + int scannerId = 1; + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + assertFalse("close scanner should not come in with scan priority " + scanPriority, + req.hasCloseScanner() && req.getCloseScanner()); + + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")).setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true).setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); + } + }); + return null; + } + }).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any()); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + threadPool.submit(() ->{ + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + assertTrue("close request should have scannerId", req.hasScannerId()); + assertEquals("close request's scannerId should match", scannerId, req.getScannerId()); + assertTrue("close request should have closerScanner set", req.hasCloseScanner() && req.getCloseScanner()); + + done.run(ScanResponse.getDefaultInstance()); + }); + return null; + } + }).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + } + @Test public void testScan() throws IOException, InterruptedException { + mockScan(19); try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19).setLimit(10))) { - for (Result result : scanner) { - assertNotNull(result); - Thread.sleep(1000); - } + .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); } - Thread.sleep(1000); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking // open, next, then several renew lease - verify(stub, atLeast(3)).scan(assertPriority(19), any(ScanRequest.class), any()); - // close should use high qos - verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanNormalTable() throws IOException, InterruptedException { + mockScan(NORMAL_QOS); try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setLimit(10))) { - for (Result result : scanner) { - assertNotNull(result); - Thread.sleep(1000); - } + .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); } - Thread.sleep(1000); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking // open, next, then several renew lease - verify(stub, atLeast(3)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any()); - // close should use high qos - verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanSystemTable() throws IOException, InterruptedException { + mockScan(SYSTEMTABLE_QOS); try (ResultScanner scanner = conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) - .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setLimit(10))) { - for (Result result : scanner) { - assertNotNull(result); - Thread.sleep(1000); - } + .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); } - Thread.sleep(1000); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking // open, next, then several renew lease - verify(stub, atLeast(3)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); - // close should use high qos - verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test public void testScanMetaTable() throws IOException, InterruptedException { + mockScan(SYSTEMTABLE_QOS); try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME) .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { - for (Result result : scanner) { - assertNotNull(result); - Thread.sleep(1000); - } + assertNotNull(scanner.next()); + Thread.sleep(1000); } - Thread.sleep(1000); - // open, next, several renew lease, and then close - verify(stub, atLeast(3)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); - // close should use high qos - verify(stub, times(1)).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any()); + // ensures the close thread has time to finish before asserting + threadPool.shutdown(); + threadPool.awaitTermination(5, TimeUnit.SECONDS); + + // just verify that the calls happened. verification of priority occurred in the mocking + // open, next, then several renew lease + verify(stub, atLeast(3)).scan(any(), any(ScanRequest.class), any()); + verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any()); } @Test