Skip to content

Commit

Permalink
HBASE-26784 Use HIGH_QOS for ResultScanner.close requests (#4146)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
  • Loading branch information
bbeaudreault authored Mar 8, 2022
1 parent bcd9a9a commit 39ecaa1
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ private void startScan(OpenScannerResponse resp) {
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
Expand Down Expand Up @@ -347,7 +348,7 @@ private long remainingTimeNs() {

private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, priority);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand All @@ -33,8 +37,13 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
Expand All @@ -59,9 +68,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
Expand Down Expand Up @@ -91,41 +98,18 @@ public class TestAsyncTableRpcPriority {

private ClientService.Interface stub;

private ExecutorService threadPool;

private AsyncConnection conn;

@Rule
public TestName name = new TestName();

@Before
public void setUp() throws IOException {
this.threadPool = Executors.newSingleThreadExecutor();
stub = mock(ClientService.Interface.class);
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true).build());
} else {
if (req.hasCloseScanner() && req.getCloseScanner()) {
done.run(ScanResponse.getDefaultInstance());
} else {
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
}
}
return null;
}
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
doAnswer(new Answer<Void>() {

@Override
Expand Down Expand Up @@ -218,6 +202,16 @@ public boolean matches(HBaseRpcController controller) {
});
}

private ScanRequest assertScannerCloseRequest() {
return argThat(new ArgumentMatcher<ScanRequest>() {

@Override
public boolean matches(ScanRequest request) {
return request.hasCloseScanner() && request.getCloseScanner();
}
});
}

@Test
public void testGet() {
conn.getTable(TableName.valueOf(name.getMethodName()))
Expand Down Expand Up @@ -478,53 +472,113 @@ public void testCheckAndMutateMetaTable() throws IOException {
any(ClientProtos.MultiRequest.class), any());
}

private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
int scannerId = 1;
CompletableFuture<Void> future = new CompletableFuture<>();
AtomicInteger scanNextCalled = new AtomicInteger(0);
doAnswer(new Answer<Void>() {

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() -> {
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
if (!req.hasScannerId()) {
done.run(ScanResponse.newBuilder()
.setScannerId(scannerId).setTtl(800)
.setMoreResultsInRegion(true).setMoreResults(true)
.build());
} else {
if (req.hasRenew() && req.getRenew()) {
future.complete(null);
}

assertFalse("close scanner should not come in with scan priority " + scanPriority,
req.hasCloseScanner() && req.getCloseScanner());

Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
.setValue(Bytes.toBytes("v")).build();
Result result = Result.create(Arrays.asList(cell));
done.run(
ScanResponse.newBuilder()
.setScannerId(scannerId).setTtl(800).setMoreResultsInRegion(true)
.setMoreResults(true).addResults(ProtobufUtil.toResult(result))
.build());
}
});
return null;
}
}).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());

doAnswer(new Answer<Void>() {

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
threadPool.submit(() ->{
ScanRequest req = invocation.getArgument(1);
RpcCallback<ScanResponse> done = invocation.getArgument(2);
assertTrue("close request should have scannerId", req.hasScannerId());
assertEquals("close request's scannerId should match", scannerId, req.getScannerId());
assertTrue("close request should have closerScanner set",
req.hasCloseScanner() && req.getCloseScanner());

done.run(ScanResponse.getDefaultInstance());
});
return null;
}
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
return future;
}

@Test
public void testScan() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
public void testScan() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19));
}

@Test
public void testScanNormalTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
public void testScanNormalTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS));
}

@Test
public void testScanSystemTable() throws IOException, InterruptedException {
try (ResultScanner scanner =
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
assertNotNull(scanner.next());
Thread.sleep(1000);
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
public void testScanSystemTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()),
renewFuture, Optional.empty());
}

@Test
public void testScanMetaTable() throws IOException, InterruptedException {
try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
public void testScanMetaTable() throws Exception {
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
}

private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
Optional<Integer> priority) throws Exception {
Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
priority.ifPresent(scan::setPriority);

try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
assertNotNull(scanner.next());
Thread.sleep(1000);
// wait for at least one renew to come in before closing
renewFuture.join();
}
Thread.sleep(1000);
// open, next, several renew lease, and then close
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());

// ensures the close thread has time to finish before asserting
threadPool.shutdown();
threadPool.awaitTermination(5, TimeUnit.SECONDS);

// just verify that the calls happened. verification of priority occurred in the mocking
// open, next, then one or more lease renewals, then close
verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
// additionally, explicitly check for a close request
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ protected static List<Object[]> getTableAndScanCreatorParams() {

protected abstract Scan createScan();

protected abstract List<Result> doScan(Scan scan) throws Exception;
protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;

protected final List<Result> convertFromBatchResult(List<Result> results) {
assertTrue(results.size() % 2 == 0);
Expand All @@ -145,7 +145,7 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {

@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan());
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
.forEach(
Expand All @@ -169,7 +169,7 @@ private void assertResultEquals(Result result, int i) {

@Test
public void testReversedScanAll() throws Exception {
List<Result> results = doScan(createScan().setReversed(true));
List<Result> results = doScan(createScan().setReversed(true), -1);
assertEquals(COUNT, results.size());
IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
}
Expand All @@ -178,7 +178,7 @@ public void testReversedScanAll() throws Exception {
public void testScanNoStopKey() throws Exception {
int start = 345;
List<Result> results =
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))));
doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1);
assertEquals(COUNT - start, results.size());
IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
}
Expand All @@ -187,36 +187,44 @@ public void testScanNoStopKey() throws Exception {
public void testReverseScanNoStopKey() throws Exception {
int start = 765;
List<Result> results = doScan(
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1);
assertEquals(start + 1, results.size());
IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
}

@Test
public void testScanWrongColumnFamily() throws Exception {
try {
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")));
doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1);
} catch (Exception e) {
assertTrue(e instanceof NoSuchColumnFamilyException ||
e.getCause() instanceof NoSuchColumnFamilyException);
}
}

private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit) throws Exception {
int limit) throws Exception {
testScan(start, startInclusive, stop, stopInclusive, limit, -1);
}

private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
int limit, int closeAfter) throws Exception {
Scan scan =
createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
.withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, closeAfter);
int actualStart = startInclusive ? start : start + 1;
int actualStop = stopInclusive ? stop + 1 : stop;
int count = actualStop - actualStart;
if (limit > 0) {
count = Math.min(count, limit);
}
if (closeAfter > 0) {
count = Math.min(count, closeAfter);
}
assertEquals(count, results.size());
IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
}
Expand All @@ -229,7 +237,7 @@ private void testReversedScan(int start, boolean startInclusive, int stop, boole
if (limit > 0) {
scan.setLimit(limit);
}
List<Result> results = doScan(scan);
List<Result> results = doScan(scan, -1);
int actualStart = startInclusive ? start : start - 1;
int actualStop = stopInclusive ? stop - 1 : stop;
int count = actualStart - actualStop;
Expand Down Expand Up @@ -309,4 +317,13 @@ public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
testReversedScan(765, false, 543, true, 200);
testReversedScan(876, false, 654, false, 200);
}

@Test
public void testScanEndingEarly() throws Exception {
testScan(1, true, 998, false, 0, 900); // from first region to last region
testScan(123, true, 234, true, 0, 100);
testScan(234, true, 456, false, 0, 100);
testScan(345, false, 567, true, 0, 100);
testScan(456, false, 678, false, 0, 100);
}
}
Loading

0 comments on commit 39ecaa1

Please sign in to comment.