diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index f8363b87e70f..063d3df34d0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.CellUtil.createCellScanner; -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; @@ -162,7 +160,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, this.actions = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size()); this.action2Future = new IdentityHashMap<>(actions.size()); - this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); + this.pauseManager = + new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); for (int i = 0, n = actions.size(); i < n; i++) { Row rawAction = actions.get(i); Action action; @@ -203,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) { return false; } - private long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); - } - private List removeErrors(Action action) { synchronized (action2Errors) { return action2Errors.remove(action); @@ -366,7 +361,7 @@ private void onComplete(Map actionsByRegion, int tries, private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) { long remainingNs; if (operationTimeoutNs > 0) { - remainingNs = remainingTimeNs(); + remainingNs = pauseManager.remainingTimeNs(startNs); if (remainingNs <= 0) { failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries); @@ -473,27 +468,13 @@ private void tryResubmit(Stream actions, int tries, boolean immediately, groupAndSend(actions, tries); return; } - long delayNs; - boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error); - OptionalLong maybePauseNsToUse = - pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs); if (!maybePauseNsToUse.isPresent()) { failAll(actions, tries); return; } - long pauseNsToUse = maybePauseNsToUse.getAsLong(); - - if (operationTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - failAll(actions, tries); - return; - } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); - } else { - delayNs = getPauseTime(pauseNsToUse, tries - 1); - } - if (isServerOverloaded) { + long delayNs = maybePauseNsToUse.getAsLong(); + if (HBaseServerException.isServerOverloaded(error)) { Optional metrics = conn.getConnectionMetrics(); metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS)); } @@ -503,7 +484,7 @@ private void tryResubmit(Stream actions, int tries, boolean immediately, private void groupAndSend(Stream actions, int tries) { long locateTimeoutNs; if (operationTimeoutNs > 0) { - locateTimeoutNs = remainingTimeNs(); + locateTimeoutNs = pauseManager.remainingTimeNs(startNs); if (locateTimeoutNs <= 0) { failAll(actions, tries); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 7ded355cfd9e..f89aef214391 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; @@ -93,7 +91,8 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); - this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); + this.pauseManager = + new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs); } private long elapsedMs() { @@ -101,7 +100,7 @@ private long elapsedMs() { } protected final long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); + return pauseManager.remainingTimeNs(startNs); } protected final void completeExceptionally() { @@ -124,25 +123,12 @@ protected final void resetCallTimeout() { } private void tryScheduleRetry(Throwable error) { - OptionalLong maybePauseNsToUse = - pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs); if (!maybePauseNsToUse.isPresent()) { completeExceptionally(); return; } - long pauseNsToUse = maybePauseNsToUse.getAsLong(); - - long delayNs; - if (operationTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - completeExceptionally(); - return; - } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); - } else { - delayNs = getPauseTime(pauseNsToUse, tries - 1); - } + long delayNs = maybePauseNsToUse.getAsLong(); tries++; if (HBaseServerException.isServerOverloaded(error)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 949ea07107b6..a046f0c7b6e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; -import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; @@ -344,17 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.controller = conn.rpcControllerFactory.newController(); this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); - this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded); + this.pauseManager = + new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs); } private long elapsedMs() { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs); } - private long remainingTimeNs() { - return scanTimeoutNs - (System.nanoTime() - nextCallStartNs); - } - private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS); @@ -417,26 +412,14 @@ private void onError(Throwable error) { completeExceptionally(!scannerClosed); return; } - long delayNs; OptionalLong maybePauseNsToUse = - pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS); + pauseManager.getPauseNsFromException(error, tries, nextCallStartNs); if (!maybePauseNsToUse.isPresent()) { completeExceptionally(!scannerClosed); return; } - long pauseNsToUse = maybePauseNsToUse.getAsLong(); - - if (scanTimeoutNs > 0) { - long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; - if (maxDelayNs <= 0) { - completeExceptionally(!scannerClosed); - return; - } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1)); - } else { - delayNs = getPauseTime(pauseNsToUse, tries - 1); - } + long delayNs = maybePauseNsToUse.getAsLong(); if (scannerClosed) { completeWhenError(false); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 5c88af3780cb..9b44c682b4a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -280,7 +280,7 @@ static Result filterCells(Result result, Cell keepCellsAfter) { } // Add a delta to avoid timeout immediately after a retry sleeping. - static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); + public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1); static Get toCheckExistenceOnly(Get get) { if (get.isCheckExistenceOnly()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java index 235d1d1d20e4..67f46822fe39 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client.backoff; +import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; + import java.util.OptionalLong; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseServerException; @@ -31,29 +34,59 @@ public class HBaseServerExceptionPauseManager { private final long pauseNs; private final long pauseNsForServerOverloaded; + private final long timeoutNs; - public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) { + public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded, + long timeoutNs) { this.pauseNs = pauseNs; this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; + this.timeoutNs = timeoutNs; } - public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) { + /** + * Returns the nanos, if any, for which the client should wait + * @param error The exception from the server + * @param tries The current retry count + * @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we + * should throw now + */ + public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) { long expectedSleepNs; + long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS; if (error instanceof RpcThrottlingException) { RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error; expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval()); - if (expectedSleepNs > remainingTimeNs) { + if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed " + + "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException); + } return OptionalLong.empty(); } if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs, + LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs, rpcThrottlingException); } } else { expectedSleepNs = HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs; + // RpcThrottlingException tells us exactly how long the client should wait for, + // so we should not factor in the retry count for said exception + expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1); + } + + if (timeoutNs > 0) { + if (remainingTimeNs <= 0) { + return OptionalLong.empty(); + } + expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs); } + return OptionalLong.of(expectedSleepNs); } + public long remainingTimeNs(long startNs) { + return timeoutNs - (System.nanoTime() - startNs); + } + } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java index 793fa9f6d218..ee4ee47f1850 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java @@ -47,40 +47,93 @@ public class TestHBaseServerExceptionPauseManager { private final Throwable OTHER_EXCEPTION = new RuntimeException(""); private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true); - private final HBaseServerExceptionPauseManager pauseManager = - new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED); - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class); @Test - public void itSupportsRpcThrottlingNanos() { + public void itSupportsRpcThrottlingNanosNoTimeout() { + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0); + OptionalLong pauseNanos = - pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE); + pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime()); + + assertTrue(pauseNanos.isPresent()); + assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS); + } + + @Test + public void itSupportsRpcThrottlingNanosLenientTimeout() { + HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager( + PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2); + + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime()); + assertTrue(pauseNanos.isPresent()); assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS); } @Test public void itSupportsServerOverloadedExceptionNanos() { + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0); + OptionalLong pauseNanos = - pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE); + pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime()); + assertTrue(pauseNanos.isPresent()); - assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED); + // account for 1% jitter in pause time + assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99); + assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01); } @Test public void itSupportsOtherExceptionNanos() { - OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE); + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0); + + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime()); + assertTrue(pauseNanos.isPresent()); - assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS); + // account for 1% jitter in pause time + assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99); + assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01); } @Test - public void itThrottledTimeoutFastFail() { - OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L); + public void itTimesOutRpcThrottlingException() { + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1); + + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime()); + + assertFalse(pauseNanos.isPresent()); + } + + @Test + public void itTimesOutRpcOtherException() { + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1); + + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime()); + assertFalse(pauseNanos.isPresent()); } + @Test + public void itDoesNotTimeOutIfDisabled() { + HBaseServerExceptionPauseManager pauseManager = + new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0); + + OptionalLong pauseNanos = + pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime()); + + assertTrue(pauseNanos.isPresent()); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java index ab63a9cb3c39..1523914213a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java @@ -29,10 +29,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -69,7 +71,10 @@ public class TestAsyncClientPauseForRpcThrottling { private static AsyncConnection CONN; private static final AtomicBoolean THROTTLE = new AtomicBoolean(false); + private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0); private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1); + private static final int RETRY_COUNT = 3; + private static final int MAX_MULTIPLIER_EXPECTATION = 2; public static final class ThrottlingRSRpcServicesForTest extends RSRpcServices { @@ -80,6 +85,7 @@ public ThrottlingRSRpcServicesForTest(HRegionServer rs) throws IOException { @Override public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) throws ServiceException { + maybeForceRetry(); maybeThrottle(); return super.get(controller, request); } @@ -87,6 +93,7 @@ public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRe @Override public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) throws ServiceException { + maybeForceRetry(); maybeThrottle(); return super.multi(rpcc, request); } @@ -94,10 +101,18 @@ public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRe @Override public ClientProtos.ScanResponse scan(RpcController controller, ClientProtos.ScanRequest request) throws ServiceException { + maybeForceRetry(); maybeThrottle(); return super.scan(controller, request); } + private void maybeForceRetry() throws ServiceException { + if (FORCE_RETRIES.get() > 0) { + FORCE_RETRIES.addAndGet(-1); + throw new ServiceException(new RegionTooBusyException("Retry")); + } + } + private void maybeThrottle() throws ServiceException { if (THROTTLE.get()) { THROTTLE.set(false); @@ -121,6 +136,12 @@ protected RSRpcServices createRpcServices() throws IOException { @BeforeClass public static void setUp() throws Exception { + assertTrue( + "The MAX_MULTIPLIER_EXPECTATION must be less than HConstants.RETRY_BACKOFF[RETRY_COUNT] " + + "in order for our tests to adequately verify that we aren't " + + "multiplying throttled pauses based on the retry count.", + MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]); + UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); UTIL.startMiniCluster(1); UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, @@ -149,9 +170,7 @@ public static void tearDown() throws Exception { } private void assertTime(Callable callable, long time, boolean isGreater) throws Exception { - long startNs = System.nanoTime(); - callable.call(); - long costNs = System.nanoTime() - startNs; + long costNs = getCostNs(callable); if (isGreater) { assertTrue(costNs > time); } else { @@ -159,6 +178,18 @@ private void assertTime(Callable callable, long time, boolean isGreater) t } } + private void assertTimeBetween(Callable callable, long minNs, long maxNs) throws Exception { + long costNs = getCostNs(callable); + assertTrue(costNs > minNs); + assertTrue(costNs < maxNs); + } + + private long getCostNs(Callable callable) throws Exception { + long startNs = System.nanoTime(); + callable.call(); + return System.nanoTime() - startNs; + } + @Test public void itWaitsForThrottledGet() throws Exception { boolean isThrottled = true; @@ -193,6 +224,21 @@ public void itDoesNotWaitForThrottledGetExceedingTimeout() throws Exception { }, WAIT_INTERVAL_NANOS, false); } + @Test + public void itDoesNotMultiplyThrottledGetWait() throws Exception { + THROTTLE.set(true); + FORCE_RETRIES.set(RETRY_COUNT); + + AsyncTable table = + CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) + .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); + + assertTimeBetween(() -> { + table.get(new Get(Bytes.toBytes(0))).get(); + return null; + }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); + } + @Test public void itWaitsForThrottledBatch() throws Exception { boolean isThrottled = true; @@ -244,6 +290,26 @@ public void itDoesNotWaitForThrottledBatchExceedingTimeout() throws Exception { }, WAIT_INTERVAL_NANOS, false); } + @Test + public void itDoesNotMultiplyThrottledBatchWait() throws Exception { + THROTTLE.set(true); + FORCE_RETRIES.set(RETRY_COUNT); + + assertTimeBetween(() -> { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = + CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) + .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build()) { + for (int i = 100; i < 110; i++) { + futures.add(mutator + .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)))); + } + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + return null; + }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); + } + @Test public void itWaitsForThrottledScan() throws Exception { boolean isThrottled = true; @@ -291,4 +357,24 @@ public void itDoesNotWaitForThrottledScanExceedingTimeout() throws Exception { return null; }, WAIT_INTERVAL_NANOS, false); } + + @Test + public void itDoesNotMultiplyThrottledScanWait() throws Exception { + THROTTLE.set(true); + FORCE_RETRIES.set(RETRY_COUNT); + + AsyncTable table = + CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES) + .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, TimeUnit.NANOSECONDS).build(); + + assertTimeBetween(() -> { + try (ResultScanner scanner = table.getScanner(new Scan().setCaching(80))) { + for (int i = 0; i < 100; i++) { + Result result = scanner.next(); + assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUALIFIER)); + } + } + return null; + }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS); + } }