diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java index 13ab3ed47cee..3feaaaf17a81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -28,14 +26,16 @@ */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallDroppedException extends IOException { +public class CallDroppedException extends HBaseServerException { public CallDroppedException() { - super(); + // For now all call drops are due to server being overloaded. + // We could decouple this if desired. + super(true); } // Absence of this constructor prevents proper unwrapping of // remote exception on the client side public CallDroppedException(String message) { - super(message); + super(true, message); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java index 12fa242693c8..6bf68bc4ad0e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; +/** + * Returned to clients when their request was dropped because the call queue was too big to + * accept a new call. Clients should retry upon receiving it. + */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallQueueTooBigException extends IOException { +public class CallQueueTooBigException extends CallDroppedException { public CallQueueTooBigException() { super(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java new file mode 100644 index 000000000000..c72ed19e486b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HBaseServerException.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for exceptions thrown by an HBase server. May contain extra info about + * the state of the server when the exception was thrown. + */ +@InterfaceAudience.Public +public class HBaseServerException extends HBaseIOException { + private boolean serverOverloaded; + + public HBaseServerException() { + this(false); + } + + public HBaseServerException(String message) { + this(false, message); + } + + public HBaseServerException(boolean serverOverloaded) { + this.serverOverloaded = serverOverloaded; + } + + public HBaseServerException(boolean serverOverloaded, String message) { + super(message); + this.serverOverloaded = serverOverloaded; + } + + /** + * @param t throwable to check for server overloaded state + * @return True if the server was considered overloaded when the exception was thrown + */ + public static boolean isServerOverloaded(Throwable t) { + if (t instanceof HBaseServerException) { + return ((HBaseServerException) t).isServerOverloaded(); + } + return false; + } + + /** + * Necessary for parsing RemoteException on client side + * @param serverOverloaded True if server was overloaded when exception was thrown + */ + public void setServerOverloaded(boolean serverOverloaded) { + this.serverOverloaded = serverOverloaded; + } + + /** + * @return True if server was considered overloaded when exception was thrown + */ + public boolean isServerOverloaded() { + return serverOverloaded; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java index 49bc350bb9a6..c55977dba5e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -50,21 +51,42 @@ public interface AsyncAdminBuilder { * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. * @return this for invocation chaining - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an - * exponential policy to generate sleep time when retrying. + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time from this base when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} - * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * * @see #setRetryPause(long, TimeUnit) + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead. */ - AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + @Deprecated + default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { + return setRetryPauseForServerOverloaded(pause, unit); + } + + /** + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * + * @see #setRetryPause(long, TimeUnit) + */ + AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java index ffb3ae97ecff..cd023d8134d8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = connConf.getMaxRetries(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) { } @Override - public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(timeout); + public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(timeout); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 7a381db39c82..f03e8b5cacb3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private ServerName serverName; public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7af385da2d83..6e4ed552931f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -45,9 +45,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RetryImmediatelyException; @@ -63,9 +63,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; @@ -104,7 +102,7 @@ class AsyncBatchRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -150,13 +148,14 @@ public int getPriority() { } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, long pauseForCQTBENs, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, List actions, long pauseNs, + long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -466,17 +465,17 @@ private void onError(Map actionsByRegion, int tries, Thro .collect(Collectors.toList()); addError(copiedActions, error, serverName); tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, - error instanceof CallQueueTooBigException); + HBaseServerException.isServerOverloaded(error)); } private void tryResubmit(Stream actions, int tries, boolean immediately, - boolean isCallQueueTooBig) { + boolean isServerOverloaded) { if (immediately) { groupAndSend(actions, tries); return; } long delayNs; - long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d7984628319a..9ea26b4afb3c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -75,7 +75,7 @@ class AsyncClientScanner { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -90,7 +90,7 @@ class AsyncClientScanner { private final Span span; public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, + AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); @@ -104,7 +104,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.conn = conn; this.retryTimer = retryTimer; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -198,7 +198,8 @@ private void startScan(OpenScannerResponse resp) { .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), (hasMore, error) -> { try (Scope ignored = span.makeCurrent()) { @@ -232,7 +233,8 @@ private CompletableFuture openScanner(int replicaId) { .priority(scan.getPriority()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index a6d403f597b8..e376ae597d44 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -30,7 +30,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; -import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; @@ -52,6 +51,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +64,23 @@ class AsyncConnectionConfiguration { private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionConfiguration.class); + /** + * Parameter name for client pause when server is overloaded, denoted by + * {@link org.apache.hadoop.hbase.HBaseServerException#isServerOverloaded()} + */ + public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = + "hbase.client.pause.server.overloaded"; + + static { + // This is added where the configs are referenced. It may be too late to happen before + // any user _sets_ the old cqtbe config onto a Configuration option. So we still need + // to handle checking both properties in parsing below. The benefit of calling this is + // that it should still cause Configuration to log a warning if we do end up falling + // through to the old deprecated config. + Configuration.addDeprecation( + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); + } + /** * Configure the number of failures after which the client will start logging. A few failures * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable @@ -92,7 +109,7 @@ class AsyncConnectionConfiguration { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxRetries; @@ -137,15 +154,17 @@ class AsyncConnectionConfiguration { this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); - long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs); - if (pauseForCQTBEMs < pauseMs) { + long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); + if (pauseMsForServerOverloaded < pauseMs) { LOG.warn( "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", - HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs); - pauseForCQTBEMs = pauseMs; + HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseMsForServerOverloaded, + HBASE_CLIENT_PAUSE, pauseMs); + pauseMsForServerOverloaded = pauseMs; } this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); - this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs); + this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseMsForServerOverloaded); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -196,8 +215,8 @@ long getPauseNs() { return pauseNs; } - long getPauseForCQTBENs() { - return pauseForCQTBENs; + long getPauseNsForServerOverloaded() { + return pauseNsForServerOverloaded; } int getMaxRetries() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index de2778cf6d78..976e9e78477c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 8648572a04a3..65fbbd53f4a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -44,7 +44,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.io.netty.util.Timer; @InterfaceAudience.Private @@ -60,7 +59,7 @@ public abstract class AsyncRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private int tries = 1; @@ -81,13 +80,13 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -127,7 +126,8 @@ protected final void resetCallTimeout() { } private void tryScheduleRetry(Throwable error) { - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ? + pauseNsForServerOverloaded : pauseNs; long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 48bde4434be7..d501998f8684 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -58,7 +58,7 @@ private abstract class BuilderBase { protected long pauseNs = conn.connConf.getPauseNs(); - protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs(); + protected long pauseNsForServerOverloaded = conn.connConf.getPauseNsForServerOverloaded(); protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); @@ -119,8 +119,8 @@ public SingleRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public SingleRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public SingleRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -156,8 +156,8 @@ private void preCheck() { public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, - locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -263,8 +263,8 @@ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -292,8 +292,8 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { preCheck(); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, - scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -347,8 +347,8 @@ public BatchCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -364,7 +364,8 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } public List> call() { @@ -406,8 +407,8 @@ public MasterRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public MasterRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public MasterRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -438,7 +439,8 @@ private void preCheck() { public AsyncMasterRequestRpcRetryingCaller build() { preCheck(); return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, - pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -487,8 +489,8 @@ public AdminRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public AdminRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AdminRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -514,8 +516,9 @@ public AdminRequestCallerBuilder priority(int priority) { public AsyncAdminRequestRetryingCaller build() { return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, - checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), + checkNotNull(callable, "action is null")); } public CompletableFuture call() { @@ -558,8 +561,8 @@ public ServerRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ServerRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ServerRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -579,7 +582,8 @@ public ServerRequestCallerBuilder serverName(ServerName serverName) { } public AsyncServerRequestRpcRetryingCaller build() { - return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, pauseForCQTBENs, + return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 48e038ecd2e7..84d14aefcebe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -35,8 +35,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; @@ -98,7 +98,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -307,7 +307,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, - long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, + long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; @@ -320,7 +320,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -410,7 +410,8 @@ private void onError(Throwable error) { return; } long delayNs; - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = HBaseServerException.isServerOverloaded(error) ? + pauseNsForServerOverloaded : pauseNs; if (scanTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 52a2abe39440..8c6cf81f4c71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -46,10 +46,10 @@ public interface Callable { private ServerName serverName; public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 2a552c71b3dd..31fa1834bb70 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,10 +56,10 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 4c883a8332d7..ebf98f98bc3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -76,21 +77,42 @@ public interface AsyncTableBuilder { /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncTableBuilder setRetryPause(long pause, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an - * exponential policy to generate sleep time when retrying. + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} - * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * * @see #setRetryPause(long, TimeUnit) + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead. */ - AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + @Deprecated + default AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) { + return setRetryPauseForServerOverloaded(pause, unit); + } + + /** + * Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}. + * We use an exponential policy to generate sleep time when retrying. + *

+ * This value should be greater than the normal pause value which could be set with the above + * {@link #setRetryPause(long, TimeUnit)} method, as usually + * {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use + * the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you + * specify a smaller value. + * + * @see #setRetryPause(long, TimeUnit) + */ + AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 399d9ddfaffe..bec9f1236907 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseNsForServerOverloaded; protected int maxAttempts; @@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -102,8 +102,8 @@ public AsyncTableBuilderBase setRetryPause(long pause, TimeUnit unit) { } @Override - public AsyncTableBuilderBase setRetryPauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AsyncTableBuilderBase setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 19ca9adbf3f4..232d7cdf0750 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -62,7 +62,7 @@ public class ConnectionConfiguration { // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; - /** + /** * Constructor * @param conf Configuration object */ @@ -206,5 +206,4 @@ public boolean isClientScannerAsyncPrefetch() { public int getRpcTimeout() { return rpcTimeout; } - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index e50d308ec46d..15d5775be3dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -193,7 +193,8 @@ public Table build() { conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS) .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS) .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS) - .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(), + .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS) + .build(), poolSupplier); } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 572eb0960ea1..ad0fd7ac807e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -364,7 +364,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -380,15 +380,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" + + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -399,7 +399,8 @@ MasterRequestCallerBuilder newMasterCaller() { return this.connection.callerFactory. masterRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -407,7 +408,8 @@ private AdminRequestCallerBuilder newAdminCaller() { return this.connection.callerFactory. adminRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -3551,7 +3553,8 @@ ServerRequestCallerBuilder newServerCaller() { return this.connection.callerFactory. serverRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index c3d16e78ebf4..e51ae136e5d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -110,7 +110,7 @@ class RawAsyncTableImpl implements AsyncTable { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -126,15 +126,15 @@ class RawAsyncTableImpl implements AsyncTable { this.operationTimeoutNs = builder.operationTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseNsForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" + + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseNsForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseNsForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -204,7 +204,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo return conn.callerFactory. single().table(tableName).row(row).priority(priority) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -618,7 +619,8 @@ private Scan setDefaultScanConfig(Scan scan) { @Override public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, - pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) + pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, + startLogErrorsCnt) .start(); } @@ -718,7 +720,8 @@ private List> batch(List actions, long r return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index 2482a632ca8d..0e40e97eee17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -111,28 +111,6 @@ public static Throwable findException(Object exception) { return null; } - /** - * Checks if the exception is CallQueueTooBig exception (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallQueueTooBigException(Throwable t) { - t = findException(t); - return (t instanceof CallQueueTooBigException); - } - - /** - * Checks if the exception is CallDroppedException (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallDroppedException(Throwable t) { - t = findException(t); - return (t instanceof CallDroppedException); - } - // This list covers most connectivity exceptions but not all. // For example, in SocketOutputStream a plain IOException is thrown at times when the channel is // closed. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index fd42214d1d30..2b71493e76c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -40,14 +40,12 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal; - import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; @@ -140,11 +138,13 @@ static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) static RemoteException createRemoteException(final ExceptionResponse e) { String innerExceptionClassName = e.getExceptionClassName(); boolean doNotRetry = e.getDoNotRetry(); + boolean serverOverloaded = e.hasServerOverloaded() && e.getServerOverloaded(); return e.hasHostname() ? - // If a hostname then add it to the RemoteWithExtrasException - new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), - e.getPort(), doNotRetry) - : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry); + // If a hostname then add it to the RemoteWithExtrasException + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(), + e.getPort(), doNotRetry, serverOverloaded) : + new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry, + serverOverloaded); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java index bfc66e622e85..4f3a039de251 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.util.DynamicClassLoader; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -41,6 +42,7 @@ public class RemoteWithExtrasException extends RemoteException { private final String hostname; private final int port; private final boolean doNotRetry; + private final boolean serverOverloaded; /** * Dynamic class loader to load filter/comparators @@ -58,15 +60,26 @@ private final static class ClassLoaderHolder { } public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) { - this(className, msg, null, -1, doNotRetry); + this(className, msg, doNotRetry, false); + } + + public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry, + final boolean serverOverloaded) { + this(className, msg, null, -1, doNotRetry, serverOverloaded); + } + + public RemoteWithExtrasException(String className, String msg, final String hostname, + final int port, final boolean doNotRetry) { + this(className, msg, hostname, port, doNotRetry, false); } public RemoteWithExtrasException(String className, String msg, final String hostname, - final int port, final boolean doNotRetry) { + final int port, final boolean doNotRetry, final boolean serverOverloaded) { super(className, msg); this.hostname = hostname; this.port = port; this.doNotRetry = doNotRetry; + this.serverOverloaded = serverOverloaded; } @Override @@ -98,6 +111,11 @@ private IOException instantiateException(Class cls) throw cn.setAccessible(true); IOException ex = cn.newInstance(this.getMessage()); ex.initCause(this); + + if (ex instanceof HBaseServerException) { + ((HBaseServerException) ex).setServerOverloaded(serverOverloaded); + } + return ex; } @@ -121,4 +139,11 @@ public int getPort() { public boolean isDoNotRetry() { return this.doNotRetry; } + + /** + * @return True if the server was considered overloaded when the exception was thrown. + */ + public boolean isServerOverloaded() { + return serverOverloaded; + } } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java index b2d5b872e757..ec79c5f815f5 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionConfiguration.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.junit.Assert.assertEquals; - +import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -40,6 +40,23 @@ public class TestAsyncConnectionConfiguration { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncConnectionConfiguration.class); + @Test + public void itHandlesDeprecatedPauseForCQTBE() { + Configuration conf = new Configuration(); + long timeoutMs = 1000; + conf.setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, timeoutMs); + AsyncConnectionConfiguration config = new AsyncConnectionConfiguration(conf); + + assertTrue(Configuration.isDeprecated(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE)); + long expected = TimeUnit.MILLISECONDS.toNanos(timeoutMs); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + + conf = new Configuration(); + conf.setLong(AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, timeoutMs); + config = new AsyncConnectionConfiguration(conf); + assertEquals(expected, config.getPauseNsForServerOverloaded()); + } + @Test public void testDefaultReadWriteRpcTimeout() { Configuration conf = HBaseConfiguration.create(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d5deb0927305..0c55c5f3c056 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -798,7 +798,10 @@ public enum OperationStatusCode { /** * Parameter name for client pause value for special case such as call queue too big, etc. + * @deprecated Since 2.5.0, will be removed in 4.0.0. Please use + * hbase.client.pause.server.overloaded instead. */ + @Deprecated public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe"; /** diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f07c614dd201..628b1e2feceb 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -493,12 +493,14 @@ possible configurations would overwhelm and obscure the important. this initial pause amount and how this pause works w/ retries. - hbase.client.pause.cqtbe + hbase.client.pause.server.overloaded - Whether or not to use a special client pause for - CallQueueTooBigException (cqtbe). Set this property to a higher value - than hbase.client.pause if you observe frequent CQTBE from the same - RegionServer and the call queue there keeps full + Pause time when encountering an exception indicating a + server is overloaded, CallQueueTooBigException or CallDroppedException. + Set this property to a higher value than hbase.client.pause if you + observe frequent CallQueueTooBigException or CallDroppedException from the same + RegionServer and the call queue there keeps filling up. This config used to be + called hbase.client.pause.cqtbe, which has been deprecated as of 2.5.0. hbase.client.retries.number diff --git a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto index 7ef3eafa3cb2..6426f0cb06cb 100644 --- a/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto +++ b/hbase-protocol-shaded/src/main/protobuf/rpc/RPC.proto @@ -119,6 +119,8 @@ message ExceptionResponse { optional int32 port = 4; // Set if we are NOT to retry on receipt of this exception optional bool do_not_retry = 5; + // Set true if the server was considered to be overloaded when exception was thrown + optional bool server_overloaded = 6; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index 726559fc28c0..734baad1e0a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -53,7 +53,7 @@ public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, RegionInfo replica, List entries) { super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), - conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts, + conn.connConf.getPauseNs(), conn.connConf.getPauseNsForServerOverloaded(), maxAttempts, operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); this.replica = replica; this.entries = entries.toArray(new Entry[0]); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index f98bfc5cbcaf..2ed3ebb99219 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseServerException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; @@ -320,6 +321,9 @@ static void setExceptionResponse(Throwable t, String errorMsg, RegionMovedException rme = (RegionMovedException)t; exceptionBuilder.setHostname(rme.getHostname()); exceptionBuilder.setPort(rme.getPort()); + } else if (t instanceof HBaseServerException) { + HBaseServerException hse = (HBaseServerException) t; + exceptionBuilder.setServerOverloaded(hse.isServerOverloaded()); } // Set the exception as the result of the method invocation. headerBuilder.setException(exceptionBuilder.build()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java similarity index 66% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java index ba871066eb3c..1ea37411a97e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -36,9 +37,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; @@ -56,31 +59,46 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncClientPauseForCallQueueTooBig { +public class TestAsyncClientPauseForServerOverloaded { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class); + HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class); private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); - private static TableName TABLE_NAME = TableName.valueOf("CQTBE"); + private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded"); private static byte[] FAMILY = Bytes.toBytes("Family"); private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); - private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1); + private static long PAUSE_FOR_SERVER_OVERLOADED_NANOS = TimeUnit.SECONDS.toNanos(1); + private static long PAUSE_FOR_SERVER_OVERLOADED_MILLIS = TimeUnit.NANOSECONDS.toMillis( + PAUSE_FOR_SERVER_OVERLOADED_NANOS); private static AsyncConnection CONN; - private static boolean FAIL = false; + private static volatile FailMode MODE = null; - private static ConcurrentMap INVOKED = new ConcurrentHashMap<>(); + enum FailMode { + CALL_QUEUE_TOO_BIG, + CALL_DROPPED; - public static final class CQTBERpcScheduler extends SimpleRpcScheduler { + private ConcurrentMap invoked = new ConcurrentHashMap<>(); - public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + // this is for test scan, where we will send a open scanner first and then a next, and we + // expect that we hit CQTBE two times. + private boolean shouldFail(CallRunner callRunner) { + MethodDescriptor method = callRunner.getRpcCall().getMethod(); + return invoked.computeIfAbsent(method, + k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0; + } + } + + public static final class OverloadedRpcScheduler extends SimpleRpcScheduler { + + public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, Abortable server, int highPriorityLevel) { super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, @@ -89,25 +107,36 @@ public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandl @Override public boolean dispatch(CallRunner callTask) { - if (FAIL) { - MethodDescriptor method = callTask.getRpcCall().getMethod(); - // this is for test scan, where we will send a open scanner first and then a next, and we - // expect that we hit CQTBE two times. - if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) { - return false; - } + if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) { + return false; } return super.dispatch(callTask); } } - public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory { + public static final class OverloadedQueue extends TestPluggableQueueImpl { + + public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) { + super(maxQueueLength, priority, conf); + } + + @Override + public boolean offer(CallRunner callRunner) { + if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) { + callRunner.drop(); + return true; + } + return super.offer(callRunner); + } + } + + public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory { @Override public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - return new CQTBERpcScheduler(conf, handlerCount, + return new OverloadedRpcScheduler(conf, handlerCount, conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, @@ -122,12 +151,16 @@ public RpcScheduler create(Configuration conf, PriorityFunction priority, Aborta @BeforeClass public static void setUp() throws Exception { UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); - UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, - TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS)); + UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); + UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", + OverloadedQueue.class, PluggableBlockingQueue.class); UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class); + OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class); UTIL.startMiniCluster(1); - CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + + Configuration conf = new Configuration(UTIL.getConfiguration()); + conf.setLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, PAUSE_FOR_SERVER_OVERLOADED_MILLIS); + CONN = ConnectionFactory.createAsyncConnection(conf).get(); } @AfterClass @@ -143,22 +176,28 @@ public void setUpBeforeTest() throws IOException { table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); } } - FAIL = true; + MODE = FailMode.CALL_QUEUE_TOO_BIG; } @After public void tearDownAfterTest() throws IOException { - FAIL = false; - INVOKED.clear(); + for (FailMode mode : FailMode.values()) { + mode.invoked.clear(); + } + MODE = null; UTIL.getAdmin().disableTable(TABLE_NAME); UTIL.getAdmin().deleteTable(TABLE_NAME); } private void assertTime(Callable callable, long time) throws Exception { - long startNs = System.nanoTime(); - callable.call(); - long costNs = System.nanoTime() - startNs; - assertTrue(costNs > time); + for (FailMode mode : FailMode.values()) { + MODE = mode; + + long startNs = System.nanoTime(); + callable.call(); + long costNs = System.nanoTime() - startNs; + assertTrue(costNs > time); + } } @Test @@ -167,7 +206,7 @@ public void testGet() throws Exception { Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); return null; - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); } @Test @@ -181,7 +220,7 @@ public void testBatch() throws Exception { } } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS); } @Test @@ -197,6 +236,6 @@ public void testScan() throws Exception { assertNull(scanner.next()); } return null; - }, PAUSE_FOR_CQTBE_NS * 2); + }, PAUSE_FOR_SERVER_OVERLOADED_NANOS * 2); } }