Skip to content

Commit

Permalink
HBASE-26807 Unify CallQueueTooBigException special pause with CallDro…
Browse files Browse the repository at this point in the history
…ppedException
  • Loading branch information
bbeaudreault committed Mar 10, 2022
1 parent addace2 commit e20d78b
Show file tree
Hide file tree
Showing 24 changed files with 242 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -28,7 +26,7 @@
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallDroppedException extends IOException {
public class CallDroppedException extends ServerOverloadedException {
public CallDroppedException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Returned to clients when their request could not be enqueued due to the server being
* overloaded. Clients should retry upon receiving it.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallQueueTooBigException extends IOException {
public class CallQueueTooBigException extends ServerOverloadedException {
public CallQueueTooBigException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Base class for exceptions thrown when the hbase server is overloaded.
*/
@InterfaceAudience.Public
public class ServerOverloadedException extends HBaseIOException {
public ServerOverloadedException() {
}

public ServerOverloadedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ public interface AsyncAdminBuilder {
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);

/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* Set the base pause time for retrying when we hit {@code ServerOverloadedException}. We use an
* exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code ServerOverloadedException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* {@code ServerOverloadedException} if here you specify a smaller value.
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);

/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {

protected long pauseNs;

protected long pauseForCQTBENs;
protected long pauseForServerOverloaded;

protected int maxAttempts;

Expand All @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
Expand All @@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
}

@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
this.pauseForServerOverloaded = unit.toNanos(timeout);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public interface Callable<T> {
private ServerName serverName;

public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
super(retryTimer, conn, priority, pauseNs, pauseForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ServerOverloadedException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
Expand Down Expand Up @@ -104,7 +104,7 @@ class AsyncBatchRpcRetryingCaller<T> {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

Expand Down Expand Up @@ -150,13 +150,14 @@ public int getPriority() {
}

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, List<? extends Row> actions, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -466,17 +467,17 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
error instanceof ServerOverloadedException);
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
boolean isServerOverloadedException) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = isServerOverloadedException ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class AsyncClientScanner {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

Expand All @@ -86,7 +86,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
Expand All @@ -100,7 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -170,7 +170,8 @@ private void startScan(OpenScannerResponse resp) {
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
Expand All @@ -191,7 +192,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
Expand Down Expand Up @@ -92,7 +93,7 @@ class AsyncConnectionConfiguration {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseForServerOverloaded;

private final int maxRetries;

Expand Down Expand Up @@ -137,15 +138,17 @@ class AsyncConnectionConfiguration {
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs));
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
if (pauseForCQTBEMs < pauseMs) {
long pauseForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
if (pauseForServerOverloaded < pauseMs) {
LOG.warn(
"The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead",
HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs);
pauseForCQTBEMs = pauseMs;
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseForServerOverloaded,
HBASE_CLIENT_PAUSE, pauseMs);
pauseForServerOverloaded = pauseMs;
}
this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
this.pauseForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseForServerOverloaded);
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
Expand Down Expand Up @@ -196,8 +199,8 @@ long getPauseNs() {
return pauseNs;
}

long getPauseForCQTBENs() {
return pauseForCQTBENs;
long getPauseForServerOverloaded() {
return pauseForServerOverloaded;
}

int getMaxRetries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public interface Callable<T> {
private final Callable<T> callable;

public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
Callable<T> callable, int priority, long pauseNs, long pauseNsForServerOverloaded,
int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.callable = callable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerOverloadedException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
Expand All @@ -60,7 +60,7 @@ public abstract class AsyncRpcRetryingCaller<T> {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private int tries = 1;

Expand All @@ -81,13 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;

public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -127,7 +127,8 @@ protected final void resetCallTimeout() {
}

private void tryScheduleRetry(Throwable error) {
long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = error instanceof ServerOverloadedException ?
pauseNsForServerOverloaded : pauseNs;
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
Expand Down
Loading

0 comments on commit e20d78b

Please sign in to comment.