Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2] HBASE-27798: Client side should back off based on wait interval in RpcThrottlingException #5320

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
Expand Down Expand Up @@ -162,7 +160,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size());
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
for (int i = 0, n = actions.size(); i < n; i++) {
Row rawAction = actions.get(i);
Action action;
Expand Down Expand Up @@ -203,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
return false;
}

private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}

private List<ThrowableWithExtraContext> removeErrors(Action action) {
synchronized (action2Errors) {
return action2Errors.remove(action);
Expand Down Expand Up @@ -366,7 +361,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
remainingNs = pauseManager.remainingTimeNs(startNs);
if (remainingNs <= 0) {
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
Expand Down Expand Up @@ -473,27 +468,13 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
groupAndSend(actions, tries);
return;
}
long delayNs;
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
failAll(actions, tries);
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if (isServerOverloaded) {
long delayNs = maybePauseNsToUse.getAsLong();
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
Expand All @@ -503,7 +484,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
if (locateTimeoutNs <= 0) {
failAll(actions, tries);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;

Expand Down Expand Up @@ -93,15 +91,16 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}

protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
return pauseManager.remainingTimeNs(startNs);
}

protected final void completeExceptionally() {
Expand All @@ -124,25 +123,12 @@ protected final void resetCallTimeout() {
}

private void tryScheduleRetry(Throwable error) {
OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally();
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
long delayNs = maybePauseNsToUse.getAsLong();
tries++;

if (HBaseServerException.isServerOverloaded(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
Expand Down Expand Up @@ -344,17 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
}

private long remainingTimeNs() {
return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
}

private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
Expand Down Expand Up @@ -417,26 +412,14 @@ private void onError(Throwable error) {
completeExceptionally(!scannerClosed);
return;
}
long delayNs;

OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally(!scannerClosed);
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
long delayNs = maybePauseNsToUse.getAsLong();
if (scannerClosed) {
completeWhenError(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static Result filterCells(Result result, Cell keepCellsAfter) {
}

// Add a delta to avoid timeout immediately after a retry sleeping.
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);

static Get toCheckExistenceOnly(Get get) {
if (get.isCheckExistenceOnly()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client.backoff;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;

import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseServerException;
Expand All @@ -31,29 +34,59 @@ public class HBaseServerExceptionPauseManager {

private final long pauseNs;
private final long pauseNsForServerOverloaded;
private final long timeoutNs;

public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
long timeoutNs) {
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.timeoutNs = timeoutNs;
}

public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
/**
* Returns the nanos, if any, for which the client should wait
* @param error The exception from the server
* @param tries The current retry count
* @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
* should throw now
*/
public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
long expectedSleepNs;
long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
if (error instanceof RpcThrottlingException) {
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
if (expectedSleepNs > remainingTimeNs) {
if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
+ "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException);
}
return OptionalLong.empty();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs,
LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs,
rpcThrottlingException);
}
} else {
expectedSleepNs =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
// RpcThrottlingException tells us exactly how long the client should wait for,
// so we should not factor in the retry count for said exception
expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
}

if (timeoutNs > 0) {
if (remainingTimeNs <= 0) {
return OptionalLong.empty();
}
expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
}

return OptionalLong.of(expectedSleepNs);
}

public long remainingTimeNs(long startNs) {
return timeoutNs - (System.nanoTime() - startNs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,93 @@ public class TestHBaseServerExceptionPauseManager {
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);

private final HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);

@Test
public void itSupportsRpcThrottlingNanos() {
public void itSupportsRpcThrottlingNanosNoTimeout() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE);
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
}

@Test
public void itSupportsRpcThrottlingNanosLenientTimeout() {
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
}

@Test
public void itSupportsServerOverloadedExceptionNanos() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE);
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
// account for 1% jitter in pause time
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99);
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01);
}

@Test
public void itSupportsOtherExceptionNanos() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
// account for 1% jitter in pause time
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
}

@Test
public void itThrottledTimeoutFastFail() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
public void itTimesOutRpcThrottlingException() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertFalse(pauseNanos.isPresent());
}

@Test
public void itTimesOutRpcOtherException() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertFalse(pauseNanos.isPresent());
}

@Test
public void itDoesNotTimeOutIfDisabled() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
}

}
Loading