diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java index 13ab3ed47cee..d3d948f372d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallDroppedException.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -28,7 +26,7 @@ */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallDroppedException extends IOException { +public class CallDroppedException extends ServerOverloadedException { public CallDroppedException() { super(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java index 12fa242693c8..0924c087ac03 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CallQueueTooBigException.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.yetus.audience.InterfaceAudience; +/** + * Returned to clients when their request could not be enqueued due to the server being + * overloaded. Clients should retry upon receiving it. + */ @SuppressWarnings("serial") @InterfaceAudience.Public -public class CallQueueTooBigException extends IOException { +public class CallQueueTooBigException extends ServerOverloadedException { public CallQueueTooBigException() { super(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerOverloadedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerOverloadedException.java new file mode 100644 index 000000000000..b466da2c7def --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerOverloadedException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base class for exceptions thrown when the hbase server is overloaded. + */ +@InterfaceAudience.Public +public class ServerOverloadedException extends HBaseIOException { + public ServerOverloadedException() { + } + + public ServerOverloadedException(String message) { + super(message); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java index 49bc350bb9a6..1072546af4be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java @@ -50,21 +50,21 @@ public interface AsyncAdminBuilder { * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. * @return this for invocation chaining - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an + * Set the base pause time for retrying when we hit {@code ServerOverloadedException}. We use an * exponential policy to generate sleep time when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} + * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code ServerOverloadedException} * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@code ServerOverloadedException} if here you specify a smaller value. * @see #setRetryPause(long, TimeUnit) */ - AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an admin operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java index ffb3ae97ecff..ab39d34f3843 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseForServerOverloaded; protected int maxAttempts; @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder { this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); this.operationTimeoutNs = connConf.getOperationTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded(); this.maxAttempts = connConf.getMaxRetries(); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) { } @Override - public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(timeout); + public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) { + this.pauseForServerOverloaded = unit.toNanos(timeout); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 7a381db39c82..a40eb8fe0c1c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private ServerName serverName; public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, priority, pauseNs, pauseForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 7af385da2d83..963366ab598e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -45,13 +45,13 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RetryImmediatelyException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.MultiResponse.RegionResult; import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; @@ -104,7 +104,7 @@ class AsyncBatchRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -150,13 +150,14 @@ public int getPriority() { } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, long pauseForCQTBENs, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + TableName tableName, List actions, long pauseNs, + long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, + long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -466,17 +467,17 @@ private void onError(Map actionsByRegion, int tries, Thro .collect(Collectors.toList()); addError(copiedActions, error, serverName); tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException, - error instanceof CallQueueTooBigException); + error instanceof ServerOverloadedException); } private void tryResubmit(Stream actions, int tries, boolean immediately, - boolean isCallQueueTooBig) { + boolean isServerOverloadedException) { if (immediately) { groupAndSend(actions, tries); return; } long delayNs; - long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = isServerOverloadedException ? pauseNsForServerOverloaded : pauseNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 48f004c0a29c..6f405a0582aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -73,7 +73,7 @@ class AsyncClientScanner { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -86,7 +86,7 @@ class AsyncClientScanner { private final ScanResultCache resultCache; public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, + AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); @@ -100,7 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN this.conn = conn; this.retryTimer = retryTimer; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -170,7 +170,8 @@ private void startScan(OpenScannerResponse resp) { .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), (hasMore, error) -> { if (error != null) { @@ -191,7 +192,8 @@ private CompletableFuture openScanner(int replicaId) { .priority(scan.getPriority()) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index a6d403f597b8..e50a9b616512 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE; +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY; @@ -92,7 +93,7 @@ class AsyncConnectionConfiguration { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseForServerOverloaded; private final int maxRetries; @@ -137,15 +138,17 @@ class AsyncConnectionConfiguration { this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutMs)); long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); - long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs); - if (pauseForCQTBEMs < pauseMs) { + long pauseForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); + if (pauseForServerOverloaded < pauseMs) { LOG.warn( "The {} setting: {} ms is less than the {} setting: {} ms, use the greater one instead", - HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE, pauseMs); - pauseForCQTBEMs = pauseMs; + HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, pauseForServerOverloaded, + HBASE_CLIENT_PAUSE, pauseMs); + pauseForServerOverloaded = pauseMs; } this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs); - this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs); + this.pauseForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(pauseForServerOverloaded); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT); @@ -196,8 +199,8 @@ long getPauseNs() { return pauseNs; } - long getPauseForCQTBENs() { - return pauseForCQTBENs; + long getPauseForServerOverloaded() { + return pauseForServerOverloaded; } int getMaxRetries() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index de2778cf6d78..976e9e78477c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -44,10 +44,10 @@ public interface Callable { private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxRetries, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxRetries, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index 8648572a04a3..e7a6b2c7866b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -31,9 +31,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.TableNotFoundException; @@ -60,7 +60,7 @@ public abstract class AsyncRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private int tries = 1; @@ -81,13 +81,13 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; this.priority = priority; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -127,7 +127,8 @@ protected final void resetCallTimeout() { } private void tryScheduleRetry(Throwable error) { - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = error instanceof ServerOverloadedException ? + pauseNsForServerOverloaded : pauseNs; long delayNs; if (operationTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 48bde4434be7..c3a426eeb74c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -58,7 +58,7 @@ private abstract class BuilderBase { protected long pauseNs = conn.connConf.getPauseNs(); - protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs(); + protected long pauseNsForServerOverloaded = conn.connConf.getPauseForServerOverloaded(); protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries()); @@ -119,8 +119,8 @@ public SingleRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public SingleRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public SingleRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -156,8 +156,8 @@ private void preCheck() { public AsyncSingleRequestRpcRetryingCaller build() { preCheck(); return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, - locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + locateType, callable, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -263,8 +263,8 @@ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ScanSingleRegionCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -292,8 +292,8 @@ public AsyncScanSingleRegionRpcRetryingCaller build() { preCheck(); return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, - scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + scannerLeaseTimeoutPeriodNs, pauseNs, pauseNsForServerOverloaded, maxAttempts, + scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -347,8 +347,8 @@ public BatchCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public BatchCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -364,7 +364,8 @@ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) { public AsyncBatchRpcRetryingCaller build() { return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName, actions, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } public List> call() { @@ -406,8 +407,8 @@ public MasterRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public MasterRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public MasterRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -438,7 +439,8 @@ private void preCheck() { public AsyncMasterRequestRpcRetryingCaller build() { preCheck(); return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, - pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + pauseNs, pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -487,8 +489,8 @@ public AdminRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public AdminRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AdminRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -514,8 +516,9 @@ public AdminRequestCallerBuilder priority(int priority) { public AsyncAdminRequestRetryingCaller build() { return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, - pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, - checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), + checkNotNull(callable, "action is null")); } public CompletableFuture call() { @@ -558,8 +561,8 @@ public ServerRequestCallerBuilder pause(long pause, TimeUnit unit) { return this; } - public ServerRequestCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public ServerRequestCallerBuilder pauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseNsForServerOverloaded = unit.toNanos(pause); return this; } @@ -579,7 +582,8 @@ public ServerRequestCallerBuilder serverName(ServerName serverName) { } public AsyncServerRequestRpcRetryingCaller build() { - return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, pauseForCQTBENs, + return new AsyncServerRequestRpcRetryingCaller(retryTimer, conn, pauseNs, + pauseNsForServerOverloaded, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 7f19180a0ab2..f5bb7bbde796 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -34,11 +34,11 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerOverloadedException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; @@ -99,7 +99,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -308,7 +308,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, - long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, + long pauseNsForServerOverloaded, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; @@ -321,7 +321,7 @@ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionI this.regionServerRemote = isRegionServerRemote; this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs; this.pauseNs = pauseNs; - this.pauseForCQTBENs = pauseForCQTBENs; + this.pauseNsForServerOverloaded = pauseNsForServerOverloaded; this.maxAttempts = maxAttempts; this.scanTimeoutNs = scanTimeoutNs; this.rpcTimeoutNs = rpcTimeoutNs; @@ -411,7 +411,8 @@ private void onError(Throwable error) { return; } long delayNs; - long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs; + long pauseNsToUse = error instanceof ServerOverloadedException ? + pauseNsForServerOverloaded : pauseNs; if (scanTimeoutNs > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index 52a2abe39440..8c6cf81f4c71 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -46,10 +46,10 @@ public interface Callable { private ServerName serverName; public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs, + long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseNsForServerOverloaded, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 2a552c71b3dd..31fa1834bb70 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,10 +56,10 @@ CompletableFuture call(HBaseRpcController controller, HRegionLocation loc, public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts, - long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + Callable callable, int priority, long pauseNs, long pauseNsForServerOverloaded, + int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts, + operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; this.replicaId = replicaId; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 4c883a8332d7..17248085572c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -76,21 +76,21 @@ public interface AsyncTableBuilder { /** * Set the base pause time for retrying. We use an exponential policy to generate sleep time when * retrying. - * @see #setRetryPauseForCQTBE(long, TimeUnit) + * @see #setRetryPauseForServerOverloaded(long, TimeUnit) */ AsyncTableBuilder setRetryPause(long pause, TimeUnit unit); /** - * Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an + * Set the base pause time for retrying when we hit {@code ServerOverloadedException}. We use an * exponential policy to generate sleep time when retrying. *

* This value should be greater than the normal pause value which could be set with the above - * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException} + * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code ServerOverloadedException} * means the server is overloaded. We just use the normal pause value for - * {@code CallQueueTooBigException} if here you specify a smaller value. + * {@code ServerOverloadedException} if here you specify a smaller value. * @see #setRetryPause(long, TimeUnit) */ - AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit); + AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit); /** * Set the max retry times for an operation. Usually it is the max attempt times minus 1. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 399d9ddfaffe..731bc7255f80 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -45,7 +45,7 @@ abstract class AsyncTableBuilderBase protected long pauseNs; - protected long pauseForCQTBENs; + protected long pauseForServerOverloaded; protected int maxAttempts; @@ -60,7 +60,7 @@ abstract class AsyncTableBuilderBase this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); - this.pauseForCQTBENs = connConf.getPauseForCQTBENs(); + this.pauseForServerOverloaded = connConf.getPauseForServerOverloaded(); this.maxAttempts = retries2Attempts(connConf.getMaxRetries()); this.startLogErrorsCnt = connConf.getStartLogErrorsCnt(); } @@ -102,8 +102,8 @@ public AsyncTableBuilderBase setRetryPause(long pause, TimeUnit unit) { } @Override - public AsyncTableBuilderBase setRetryPauseForCQTBE(long pause, TimeUnit unit) { - this.pauseForCQTBENs = unit.toNanos(pause); + public AsyncTableBuilderBase setRetryPauseForServerOverloaded(long pause, TimeUnit unit) { + this.pauseForServerOverloaded = unit.toNanos(pause); return this; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index b7bf6c5c9c48..8341a540cda7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -364,7 +364,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -380,15 +380,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { this.rpcTimeoutNs = builder.rpcTimeoutNs; this.operationTimeoutNs = builder.operationTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" + + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -399,7 +399,8 @@ MasterRequestCallerBuilder newMasterCaller() { return this.connection.callerFactory. masterRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -407,7 +408,8 @@ private AdminRequestCallerBuilder newAdminCaller() { return this.connection.callerFactory. adminRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -3523,7 +3525,8 @@ ServerRequestCallerBuilder newServerCaller() { return this.connection.callerFactory. serverRequest() .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index a144550022b3..cb6609840f1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -110,7 +110,7 @@ class RawAsyncTableImpl implements AsyncTable { private final long pauseNs; - private final long pauseForCQTBENs; + private final long pauseNsForServerOverloaded; private final int maxAttempts; @@ -126,15 +126,15 @@ class RawAsyncTableImpl implements AsyncTable { this.operationTimeoutNs = builder.operationTimeoutNs; this.scanTimeoutNs = builder.scanTimeoutNs; this.pauseNs = builder.pauseNs; - if (builder.pauseForCQTBENs < builder.pauseNs) { + if (builder.pauseForServerOverloaded < builder.pauseNs) { LOG.warn( - "Configured value of pauseForCQTBENs is {} ms, which is less than" + + "Configured value of pauseNsForServerOverloaded is {} ms, which is less than" + " the normal pause value {} ms, use the greater one instead", - TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs), + TimeUnit.NANOSECONDS.toMillis(builder.pauseForServerOverloaded), TimeUnit.NANOSECONDS.toMillis(builder.pauseNs)); - this.pauseForCQTBENs = builder.pauseNs; + this.pauseNsForServerOverloaded = builder.pauseNs; } else { - this.pauseForCQTBENs = builder.pauseForCQTBENs; + this.pauseNsForServerOverloaded = builder.pauseForServerOverloaded; } this.maxAttempts = builder.maxAttempts; this.startLogErrorsCnt = builder.startLogErrorsCnt; @@ -204,7 +204,8 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo return conn.callerFactory. single().table(tableName).row(row).priority(priority) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS) + .pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt); } @@ -618,7 +619,8 @@ private Scan setDefaultScanConfig(Scan scan) { @Override public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, - pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt) + pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, + startLogErrorsCnt) .start(); } @@ -722,7 +724,8 @@ private List> batch(List actions, long r return conn.callerFactory.batch().table(tableName).actions(actions) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) + .maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).call(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java index 2482a632ca8d..0e40e97eee17 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ClientExceptionsUtil.java @@ -111,28 +111,6 @@ public static Throwable findException(Object exception) { return null; } - /** - * Checks if the exception is CallQueueTooBig exception (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallQueueTooBigException(Throwable t) { - t = findException(t); - return (t instanceof CallQueueTooBigException); - } - - /** - * Checks if the exception is CallDroppedException (maybe wrapped - * into some RemoteException). - * @param t exception to check - * @return true if it's a CQTBE, false otherwise - */ - public static boolean isCallDroppedException(Throwable t) { - t = findException(t); - return (t instanceof CallDroppedException); - } - // This list covers most connectivity exceptions but not all. // For example, in SocketOutputStream a plain IOException is thrown at times when the channel is // closed. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index d5deb0927305..63728623a824 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -798,9 +798,19 @@ public enum OperationStatusCode { /** * Parameter name for client pause value for special case such as call queue too big, etc. + * @deprecated Since 2.6.0, will be removed in 4.0.0. Please use + * hbase.client.pause.server.overloaded instead. */ + @Deprecated public static final String HBASE_CLIENT_PAUSE_FOR_CQTBE = "hbase.client.pause.cqtbe"; + /** + * Parameter name for client pause when server is overloaded, denoted by a + * subclass of ServerOverloadedException. + */ + public static final String HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED = + "hbase.client.pause.server.overloaded"; + /** * The maximum number of concurrent connections the client will maintain. */ diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f07c614dd201..7b9221b0950b 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -493,12 +493,13 @@ possible configurations would overwhelm and obscure the important. this initial pause amount and how this pause works w/ retries. - hbase.client.pause.cqtbe + hbase.client.pause.server.overloaded - Whether or not to use a special client pause for - CallQueueTooBigException (cqtbe). Set this property to a higher value - than hbase.client.pause if you observe frequent CQTBE from the same - RegionServer and the call queue there keeps full + Pause time when encountering an exception indicating a + server is overloaded, CallQueueTooBigException or CallDroppedException. + Set this property to a higher value than hbase.client.pause if you + observe frequent CallQueueTooBigException or CallDroppedException from the same + RegionServer and the call queue there keeps filling up hbase.client.retries.number diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index 726559fc28c0..36ff1b13f276 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -53,7 +53,7 @@ public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer, AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs, RegionInfo replica, List entries) { super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()), - conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts, + conn.connConf.getPauseNs(), conn.connConf.getPauseForServerOverloaded(), maxAttempts, operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); this.replica = replica; this.entries = entries.toArray(new Entry[0]); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java similarity index 69% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java index ba871066eb3c..5c65fe045ac5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForServerOverloaded.java @@ -36,9 +36,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.CallRunner; +import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; +import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; @@ -56,31 +58,44 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; @Category({ MediumTests.class, ClientTests.class }) -public class TestAsyncClientPauseForCallQueueTooBig { +public class TestAsyncClientPauseForServerOverloaded { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class); + HBaseClassTestRule.forClass(TestAsyncClientPauseForServerOverloaded.class); private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); - private static TableName TABLE_NAME = TableName.valueOf("CQTBE"); + private static TableName TABLE_NAME = TableName.valueOf("ServerOverloaded"); private static byte[] FAMILY = Bytes.toBytes("Family"); private static byte[] QUALIFIER = Bytes.toBytes("Qualifier"); - private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1); + private static long PAUSE_FOR_SERVER_OVERLOADED_NS = TimeUnit.SECONDS.toNanos(1); private static AsyncConnection CONN; - private static boolean FAIL = false; + private static volatile FailMode MODE = null; - private static ConcurrentMap INVOKED = new ConcurrentHashMap<>(); + enum FailMode { + CALL_QUEUE_TOO_BIG, + CALL_DROPPED; - public static final class CQTBERpcScheduler extends SimpleRpcScheduler { + private ConcurrentMap invoked = new ConcurrentHashMap<>(); - public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, + // this is for test scan, where we will send a open scanner first and then a next, and we + // expect that we hit CQTBE two times. + private boolean shouldFail(CallRunner callRunner) { + MethodDescriptor method = callRunner.getRpcCall().getMethod(); + return invoked.computeIfAbsent(method, + k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0; + } + } + + public static final class OverloadedRpcScheduler extends SimpleRpcScheduler { + + public OverloadedRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, Abortable server, int highPriorityLevel) { super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, @@ -89,25 +104,36 @@ public CQTBERpcScheduler(Configuration conf, int handlerCount, int priorityHandl @Override public boolean dispatch(CallRunner callTask) { - if (FAIL) { - MethodDescriptor method = callTask.getRpcCall().getMethod(); - // this is for test scan, where we will send a open scanner first and then a next, and we - // expect that we hit CQTBE two times. - if (INVOKED.computeIfAbsent(method, k -> new AtomicInteger(0)).getAndIncrement() % 2 == 0) { - return false; - } + if (MODE == FailMode.CALL_QUEUE_TOO_BIG && MODE.shouldFail(callTask)) { + return false; } return super.dispatch(callTask); } } - public static final class CQTBERpcSchedulerFactory extends SimpleRpcSchedulerFactory { + public static final class OverloadedQueue extends TestPluggableQueueImpl { + + public OverloadedQueue(int maxQueueLength, PriorityFunction priority, Configuration conf) { + super(maxQueueLength, priority, conf); + } + + @Override + public boolean offer(CallRunner callRunner) { + if (MODE == FailMode.CALL_DROPPED && MODE.shouldFail(callRunner)) { + callRunner.drop(); + return true; + } + return super.offer(callRunner); + } + } + + public static final class OverloadedRpcSchedulerFactory extends SimpleRpcSchedulerFactory { @Override public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) { int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); - return new CQTBERpcScheduler(conf, handlerCount, + return new OverloadedRpcScheduler(conf, handlerCount, conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT), conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT, @@ -122,10 +148,13 @@ public RpcScheduler create(Configuration conf, PriorityFunction priority, Aborta @BeforeClass public static void setUp() throws Exception { UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10); - UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, - TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS)); + UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, + TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_SERVER_OVERLOADED_NS)); + UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); + UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", + OverloadedQueue.class, PluggableBlockingQueue.class); UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, - CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class); + OverloadedRpcSchedulerFactory.class, RpcSchedulerFactory.class); UTIL.startMiniCluster(1); CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); } @@ -143,22 +172,28 @@ public void setUpBeforeTest() throws IOException { table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); } } - FAIL = true; + MODE = FailMode.CALL_QUEUE_TOO_BIG; } @After public void tearDownAfterTest() throws IOException { - FAIL = false; - INVOKED.clear(); + for (FailMode mode : FailMode.values()) { + mode.invoked.clear(); + } + MODE = null; UTIL.getAdmin().disableTable(TABLE_NAME); UTIL.getAdmin().deleteTable(TABLE_NAME); } private void assertTime(Callable callable, long time) throws Exception { - long startNs = System.nanoTime(); - callable.call(); - long costNs = System.nanoTime() - startNs; - assertTrue(costNs > time); + for (FailMode mode : FailMode.values()) { + MODE = mode; + + long startNs = System.nanoTime(); + callable.call(); + long costNs = System.nanoTime() - startNs; + assertTrue(costNs > time); + } } @Test @@ -167,7 +202,7 @@ public void testGet() throws Exception { Result result = CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).get(); assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER)); return null; - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NS); } @Test @@ -181,7 +216,7 @@ public void testBatch() throws Exception { } } return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); - }, PAUSE_FOR_CQTBE_NS); + }, PAUSE_FOR_SERVER_OVERLOADED_NS); } @Test @@ -197,6 +232,6 @@ public void testScan() throws Exception { assertNull(scanner.next()); } return null; - }, PAUSE_FOR_CQTBE_NS * 2); + }, PAUSE_FOR_SERVER_OVERLOADED_NS * 2); } }