diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index f25f2681f72e..30201ca5b300 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -21,9 +21,13 @@
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.NonceKey;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
+import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
@@ -335,20 +339,35 @@ public static LockServiceProtos.LockedResource convertToProtoLockedResource(
return builder.build();
}
+ public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
+ "hbase.procedure.retry.sleep.interval.ms";
+
+ // default to 1 second
+ public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
+
+ public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
+ "hbase.procedure.retry.max.sleep.time.ms";
+
+ // default to 10 minutes
+ public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
+ TimeUnit.MINUTES.toMillis(10);
+
/**
- * Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
- * backoff time is 10 minutes. This is the general backoff policy for most procedure
- * implementation.
+ * Get a retry counter for getting the backoff time. We will use the
+ * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
+ * is 10 minutes by default.
+ *
+ * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
+ * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
+ * timeout.
*/
- public static long getBackoffTimeMs(int attempts) {
- long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
- // avoid overflow
- if (attempts >= 30) {
- return maxBackoffTime;
- }
- long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
- // 1% possible jitter
- long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f);
- return backoffTimeMs + jitter;
+ public static RetryCounter createRetryCounter(Configuration conf) {
+ long sleepIntervalMs =
+ conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
+ long maxSleepTimeMs =
+ conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
+ RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
+ .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
+ return new RetryCounter(retryConfig);
}
}
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
index 3629fb74bc6c..4d57c37ac619 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureUtil.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -59,24 +57,6 @@ public void testConvert() throws Exception {
assertEquals("Procedure protobuf does not match", proto1, proto2);
}
- @Test
- public void testGetBackoffTimeMs() {
- for (int i = 30; i < 1000; i++) {
- assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
- }
- long backoffTimeMs = ProcedureUtil.getBackoffTimeMs(0);
- assertTrue(backoffTimeMs >= 1000);
- assertTrue(backoffTimeMs <= 1000 * 1.01f);
-
- backoffTimeMs = ProcedureUtil.getBackoffTimeMs(1);
- assertTrue(backoffTimeMs >= 2000);
- assertTrue(backoffTimeMs <= 2000 * 1.01f);
-
- backoffTimeMs = ProcedureUtil.getBackoffTimeMs(5);
- assertTrue(backoffTimeMs >= 32000);
- assertTrue(backoffTimeMs <= 32000 * 1.01f);
- }
-
public static class TestProcedureNoDefaultConstructor extends TestProcedure {
public TestProcedureNoDefaultConstructor(int x) {}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
index 4a6f375d31ce..9377d895d6bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +69,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure[] execute(MasterProcedureEnv env)
throw new IllegalStateException("Unknown state: " + state);
}
} catch (IOException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
index 716db69a31d8..9e017fdff2b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,7 +116,7 @@ public class TransitRegionStateProcedure
private boolean forceNewPlan;
- private int attempt;
+ private RetryCounter retryCounter;
private RegionRemoteProcedureBase remoteProc;
@@ -210,7 +211,7 @@ private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode) thro
private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
if (regionNode.isInState(State.OPEN)) {
- attempt = 0;
+ retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are the last state, finish
regionNode.unsetProcedure(this);
@@ -271,7 +272,7 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) thr
private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
if (regionNode.isInState(State.CLOSED)) {
- attempt = 0;
+ retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are the last state, finish
regionNode.unsetProcedure(this);
@@ -300,7 +301,7 @@ private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
regionNode.unsetProcedure(this);
return Flow.NO_MORE_STATE;
}
- attempt = 0;
+ retryCounter = null;
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE);
return Flow.HAS_MORE_STATE;
}
@@ -347,7 +348,10 @@ protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionSta
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn(
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
"by other Procedure or operator intervention",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
index 5ca7972fc5a8..952f3b1df3fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/InitMetaProcedure.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure regions = Collections.emptyList();
- private int attempt;
+ private RetryCounter retryCounter;
public ReopenTableRegionsProcedure() {
}
@@ -125,13 +126,16 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
return Flow.NO_MORE_STATE;
}
if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
- attempt = 0;
+ retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
}
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
- long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {} are in " +
"OPENING state, suspend {}secs and try again later",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
index 7e70dedba06c..fd741b5473c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -53,7 +54,7 @@ public class SplitWALProcedure
private String walPath;
private ServerName worker;
private ServerName crashedServer;
- private int attempts = 0;
+ private RetryCounter retryCounter;
public SplitWALProcedure() {
}
@@ -82,11 +83,16 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp
try {
finished = splitWALManager.isSplitWALFinished(walPath);
} catch (IOException ioe) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts++);
- LOG.warn(
- "Failed to check whether splitting wal {} success, wait {} seconds to retry",
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("Failed to check whether splitting wal {} success, wait {} seconds to retry",
walPath, backoff / 1000, ioe);
- throw suspend(backoff);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
}
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
if (!finished) {
@@ -157,15 +163,6 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
return false;
}
- protected final ProcedureSuspendedException suspend(long backoff)
- throws ProcedureSuspendedException {
- attempts++;
- setTimeout(Math.toIntExact(backoff));
- setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
- skipPersistence();
- throw new ProcedureSuspendedException();
- }
-
public String getWAL() {
return walPath;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java
index 1b080b06731a..4dd84ca4c1b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.quotas.RpcThrottleStorage;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,11 @@ public class SwitchRpcThrottleProcedure
private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class);
- RpcThrottleStorage rpcThrottleStorage;
- boolean rpcThrottleEnabled;
- ProcedurePrepareLatch syncLatch;
- ServerName serverName;
- int attempts;
+ private RpcThrottleStorage rpcThrottleStorage;
+ private boolean rpcThrottleEnabled;
+ private ProcedurePrepareLatch syncLatch;
+ private ServerName serverName;
+ private RetryCounter retryCounter;
public SwitchRpcThrottleProcedure() {
}
@@ -68,7 +69,10 @@ protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState s
try {
switchThrottleState(env, rpcThrottleEnabled);
} catch (IOException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++);
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry",
rpcThrottleEnabled, backoff / 1000, e);
setTimeout(Math.toIntExact(backoff));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
index 8f8e1e1921e3..41b26bd02cc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
+import java.util.function.LongConsumer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData;
@@ -38,7 +42,7 @@ public abstract class AbstractPeerNoLockProcedure
protected String peerId;
- protected int attempts;
+ private RetryCounter retryCounter;
protected AbstractPeerNoLockProcedure() {
}
@@ -87,12 +91,20 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
return false;
}
- protected final ProcedureSuspendedException suspend(long backoff)
- throws ProcedureSuspendedException {
- attempts++;
+ protected final ProcedureSuspendedException suspend(Configuration conf,
+ LongConsumer backoffConsumer) throws ProcedureSuspendedException {
+ if (retryCounter == null) {
+ retryCounter = ProcedureUtil.createRetryCounter(conf);
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ backoffConsumer.accept(backoff);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
+
+ protected final void resetRetry() {
+ retryCounter = null;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index d5d27796942a..c4df6131f376 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -172,24 +171,22 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
releaseLatch(env);
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
- getClass().getName(), peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
- peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
@@ -200,24 +197,22 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
try {
reopenRegions(env);
} catch (Exception e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
- peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
- getClass().getName(), peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
@@ -225,12 +220,11 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
try {
enablePeer(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
- getClass().getName(), peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@@ -241,10 +235,10 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st
try {
postPeerModification(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
- getClass().getName(), peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "{} failed to call postPeerModification for peer {}, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e));
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
index 19f7aea58c53..4858bd442c1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALProcedure.java
@@ -23,7 +23,6 @@
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,10 +74,9 @@ protected Flow executeFromState(MasterProcedureEnv env, SyncReplicationReplayWAL
try {
finished = syncReplicationReplayWALManager.isReplayWALFinished(wals.get(0));
} catch (IOException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn("Failed to check whether replay wals {} finished for peer id={}" +
- ", sleep {} secs and retry", wals, peerId, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn("Failed to check whether replay wals {} finished for peer id={}" +
+ ", sleep {} secs and retry", wals, peerId, backoff / 1000, e));
}
syncReplicationReplayWALManager.releasePeerWorker(peerId, worker,
env.getProcedureScheduler());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index fcf41bee72f0..358fd5e3492b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -255,14 +254,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
setPeerNewSyncReplicationState(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to update peer storage for peer {} when starting transiting sync " +
- "replication peer state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to update peer storage for peer {} when starting transiting sync " +
+ "replication peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
return Flow.HAS_MORE_STATE;
@@ -287,13 +285,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
} catch (Exception e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to update last pushed sequence id for peer {} when transiting sync " +
- "replication peer state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to update last pushed sequence id for peer {} when transiting sync " +
+ "replication peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
+ resetRetry();
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
@@ -305,14 +303,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
removeAllReplicationQueues(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to remove all replication queues peer {} when starting transiting" +
- " sync replication peer state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to remove all replication queues peer {} when starting transiting" +
+ " sync replication peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(fromState.equals(SyncReplicationState.ACTIVE)
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
@@ -321,14 +318,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
transitPeerSyncReplicationState(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to update peer storage for peer {} when ending transiting sync " +
- "replication peer state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to update peer storage for peer {} when ending transiting sync " +
+ "replication peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
return Flow.HAS_MORE_STATE;
@@ -342,14 +338,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
enablePeer(env);
} catch (ReplicationException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to set peer enabled for peer {} when transiting sync replication peer " +
- "state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to set peer enabled for peer {} when transiting sync replication peer " +
+ "state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
@@ -361,14 +356,13 @@ protected Flow executeFromState(MasterProcedureEnv env,
try {
createDirForRemoteWAL(env);
} catch (IOException e) {
- long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
- LOG.warn(
- "Failed to create remote wal dir for peer {} when transiting sync replication " +
- "peer state from {} to {}, sleep {} secs and retry",
- peerId, fromState, toState, backoff / 1000, e);
- throw suspend(backoff);
+ throw suspend(env.getMasterConfiguration(),
+ backoff -> LOG.warn(
+ "Failed to create remote wal dir for peer {} when transiting sync replication " +
+ "peer state from {} to {}, sleep {} secs and retry",
+ peerId, fromState, toState, backoff / 1000, e));
}
- attempts = 0;
+ resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
index 9f3acebd383d..73b940aae672 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerBase.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
@@ -144,6 +145,9 @@ protected void setupConfiguration(Configuration conf) throws Exception {
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts());
+ // make retry for TRSP more frequent
+ conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
+ conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
}
@Before