Skip to content

Commit

Permalink
HBASE-27798 Addendum changes from master branch
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
rmdmattingly authored Jul 11, 2023
1 parent c82cf47 commit 86e3e55
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
Expand Down Expand Up @@ -162,7 +160,8 @@ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size());
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
for (int i = 0, n = actions.size(); i < n; i++) {
Row rawAction = actions.get(i);
Action action;
Expand Down Expand Up @@ -203,10 +202,6 @@ private static boolean hasIncrementOrAppend(RowMutations mutations) {
return false;
}

private long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
}

private List<ThrowableWithExtraContext> removeErrors(Action action) {
synchronized (action2Errors) {
return action2Errors.remove(action);
Expand Down Expand Up @@ -366,7 +361,7 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
remainingNs = pauseManager.remainingTimeNs(startNs);
if (remainingNs <= 0) {
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
Expand Down Expand Up @@ -473,27 +468,13 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
groupAndSend(actions, tries);
return;
}
long delayNs;
boolean isServerOverloaded = HBaseServerException.isServerOverloaded(error);
OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
failAll(actions, tries);
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if (isServerOverloaded) {
long delayNs = maybePauseNsToUse.getAsLong();
if (HBaseServerException.isServerOverloaded(error)) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, TimeUnit.NANOSECONDS));
}
Expand All @@ -503,7 +484,7 @@ private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
if (locateTimeoutNs <= 0) {
failAll(actions, tries);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;

Expand Down Expand Up @@ -93,15 +91,16 @@ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int pr
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.startNs = System.nanoTime();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, operationTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
}

protected final long remainingTimeNs() {
return operationTimeoutNs - (System.nanoTime() - startNs);
return pauseManager.remainingTimeNs(startNs);
}

protected final void completeExceptionally() {
Expand All @@ -124,25 +123,12 @@ protected final void resetCallTimeout() {
}

private void tryScheduleRetry(Throwable error) {
OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
OptionalLong maybePauseNsToUse = pauseManager.getPauseNsFromException(error, tries, startNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally();
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally();
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
long delayNs = maybePauseNsToUse.getAsLong();
tries++;

if (HBaseServerException.isServerOverloaded(error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
Expand Down Expand Up @@ -344,17 +342,14 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI
this.controller = conn.rpcControllerFactory.newController();
this.controller.setPriority(priority);
this.exceptions = new ArrayList<>();
this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded);
this.pauseManager =
new HBaseServerExceptionPauseManager(pauseNs, pauseNsForServerOverloaded, scanTimeoutNs);
}

private long elapsedMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
}

private long remainingTimeNs() {
return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
}

private void closeScanner() {
incRPCCallsMetrics(scanMetrics, regionServerRemote);
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
Expand Down Expand Up @@ -417,26 +412,14 @@ private void onError(Throwable error) {
completeExceptionally(!scannerClosed);
return;
}
long delayNs;

OptionalLong maybePauseNsToUse =
pauseManager.getPauseNsFromException(error, remainingTimeNs() - SLEEP_DELTA_NS);
pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
if (!maybePauseNsToUse.isPresent()) {
completeExceptionally(!scannerClosed);
return;
}
long pauseNsToUse = maybePauseNsToUse.getAsLong();

if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed);
return;
}
delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
long delayNs = maybePauseNsToUse.getAsLong();
if (scannerClosed) {
completeWhenError(false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static Result filterCells(Result result, Cell keepCellsAfter) {
}

// Add a delta to avoid timeout immediately after a retry sleeping.
static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);

static Get toCheckExistenceOnly(Get get) {
if (get.isCheckExistenceOnly()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client.backoff;

import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;

import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseServerException;
Expand All @@ -31,29 +34,59 @@ public class HBaseServerExceptionPauseManager {

private final long pauseNs;
private final long pauseNsForServerOverloaded;
private final long timeoutNs;

public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded) {
public HBaseServerExceptionPauseManager(long pauseNs, long pauseNsForServerOverloaded,
long timeoutNs) {
this.pauseNs = pauseNs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.timeoutNs = timeoutNs;
}

public OptionalLong getPauseNsFromException(Throwable error, long remainingTimeNs) {
/**
* Returns the nanos, if any, for which the client should wait
* @param error The exception from the server
* @param tries The current retry count
* @return The time, in nanos, to pause. If empty then pausing would exceed our timeout, so we
* should throw now
*/
public OptionalLong getPauseNsFromException(Throwable error, int tries, long startNs) {
long expectedSleepNs;
long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
if (error instanceof RpcThrottlingException) {
RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) error;
expectedSleepNs = TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
if (expectedSleepNs > remainingTimeNs) {
if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("RpcThrottlingException suggested pause of {}ns which would exceed "
+ "the timeout. We should throw instead.", expectedSleepNs, rpcThrottlingException);
}
return OptionalLong.empty();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", expectedSleepNs,
LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", expectedSleepNs,
rpcThrottlingException);
}
} else {
expectedSleepNs =
HBaseServerException.isServerOverloaded(error) ? pauseNsForServerOverloaded : pauseNs;
// RpcThrottlingException tells us exactly how long the client should wait for,
// so we should not factor in the retry count for said exception
expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
}

if (timeoutNs > 0) {
if (remainingTimeNs <= 0) {
return OptionalLong.empty();
}
expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
}

return OptionalLong.of(expectedSleepNs);
}

public long remainingTimeNs(long startNs) {
return timeoutNs - (System.nanoTime() - startNs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,93 @@ public class TestHBaseServerExceptionPauseManager {
private final Throwable OTHER_EXCEPTION = new RuntimeException("");
private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new HBaseServerException(true);

private final HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);

@Test
public void itSupportsRpcThrottlingNanos() {
public void itSupportsRpcThrottlingNanosNoTimeout() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, Long.MAX_VALUE);
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
}

@Test
public void itSupportsRpcThrottlingNanosLenientTimeout() {
HBaseServerExceptionPauseManager pauseManager = new HBaseServerExceptionPauseManager(
PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
}

@Test
public void itSupportsServerOverloadedExceptionNanos() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, Long.MAX_VALUE);
pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
// account for 1% jitter in pause time
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 0.99);
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 1.01);
}

@Test
public void itSupportsOtherExceptionNanos() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
// account for 1% jitter in pause time
assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
}

@Test
public void itThrottledTimeoutFastFail() {
OptionalLong pauseNanos = pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
public void itTimesOutRpcThrottlingException() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, System.nanoTime());

assertFalse(pauseNanos.isPresent());
}

@Test
public void itTimesOutRpcOtherException() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertFalse(pauseNanos.isPresent());
}

@Test
public void itDoesNotTimeOutIfDisabled() {
HBaseServerExceptionPauseManager pauseManager =
new HBaseServerExceptionPauseManager(PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);

OptionalLong pauseNanos =
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, System.nanoTime());

assertTrue(pauseNanos.isPresent());
}

}
Loading

0 comments on commit 86e3e55

Please sign in to comment.