Skip to content

Commit

Permalink
HBASE-22343 Make procedure retry interval configurable in test
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed May 4, 2019
1 parent 68f14c1 commit e884a25
Show file tree
Hide file tree
Showing 13 changed files with 165 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -335,20 +339,35 @@ public static LockServiceProtos.LockedResource convertToProtoLockedResource(
return builder.build();
}

public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
"hbase.procedure.retry.sleep.interval.ms";

// default to 1 second
public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;

public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
"hbase.procedure.retry.max.sleep.time.ms";

// default to 10 minutes
public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
TimeUnit.MINUTES.toMillis(10);

/**
* Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max
* backoff time is 10 minutes. This is the general backoff policy for most procedure
* implementation.
* Get a retry counter for getting the backoff time. We will use the
* {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
* is 10 minutes by default.
* <p/>
* For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
* {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
* timeout.
*/
public static long getBackoffTimeMs(int attempts) {
long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now.
// avoid overflow
if (attempts >= 30) {
return maxBackoffTime;
}
long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime);
// 1% possible jitter
long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f);
return backoffTimeMs + jitter;
public static RetryCounter createRetryCounter(Configuration conf) {
long sleepIntervalMs =
conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
long maxSleepTimeMs =
conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
.setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
return new RetryCounter(retryConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.procedure2;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
Expand Down Expand Up @@ -59,24 +57,6 @@ public void testConvert() throws Exception {
assertEquals("Procedure protobuf does not match", proto1, proto2);
}

@Test
public void testGetBackoffTimeMs() {
for (int i = 30; i < 1000; i++) {
assertEquals(TimeUnit.MINUTES.toMillis(10), ProcedureUtil.getBackoffTimeMs(30));
}
long backoffTimeMs = ProcedureUtil.getBackoffTimeMs(0);
assertTrue(backoffTimeMs >= 1000);
assertTrue(backoffTimeMs <= 1000 * 1.01f);

backoffTimeMs = ProcedureUtil.getBackoffTimeMs(1);
assertTrue(backoffTimeMs >= 2000);
assertTrue(backoffTimeMs <= 2000 * 1.01f);

backoffTimeMs = ProcedureUtil.getBackoffTimeMs(5);
assertTrue(backoffTimeMs >= 32000);
assertTrue(backoffTimeMs <= 32000 * 1.01f);
}

public static class TestProcedureNoDefaultConstructor extends TestProcedure {
public TestProcedureNoDefaultConstructor(int x) {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +69,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur

private long seqId;

private int attempt;
private RetryCounter retryCounter;

protected RegionRemoteProcedureBase() {
}
Expand Down Expand Up @@ -268,7 +269,10 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throw new IllegalStateException("Unknown state: " + state);
}
} catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -115,7 +116,7 @@ public class TransitRegionStateProcedure

private boolean forceNewPlan;

private int attempt;
private RetryCounter retryCounter;

private RegionRemoteProcedureBase remoteProc;

Expand Down Expand Up @@ -210,7 +211,7 @@ private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode) thro
private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
if (regionNode.isInState(State.OPEN)) {
attempt = 0;
retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are the last state, finish
regionNode.unsetProcedure(this);
Expand Down Expand Up @@ -271,7 +272,7 @@ private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) thr
private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
throws IOException {
if (regionNode.isInState(State.CLOSED)) {
attempt = 0;
retryCounter = null;
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are the last state, finish
regionNode.unsetProcedure(this);
Expand Down Expand Up @@ -300,7 +301,7 @@ private Flow confirmClosed(MasterProcedureEnv env, RegionStateNode regionNode)
regionNode.unsetProcedure(this);
return Flow.NO_MORE_STATE;
}
attempt = 0;
retryCounter = null;
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE);
return Flow.HAS_MORE_STATE;
}
Expand Down Expand Up @@ -347,7 +348,10 @@ protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionSta
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn(
"Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " +
"by other Procedure or operator intervention",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,7 +52,7 @@ public class InitMetaProcedure extends AbstractStateMachineTableProcedure<InitMe

private CountDownLatch latch = new CountDownLatch(1);

private int attempts;
private RetryCounter retryCounter;

@Override
public TableName getTableName() {
Expand Down Expand Up @@ -85,7 +86,10 @@ protected Flow executeFromState(MasterProcedureEnv env, InitMetaState state)
insertNamespaceToMeta(env.getMasterServices().getConnection(), DEFAULT_NAMESPACE);
insertNamespaceToMeta(env.getMasterServices().getConnection(), SYSTEM_NAMESPACE);
} catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++);
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to init default and system namespaces, suspend {}secs", backoff, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,7 +52,7 @@ public class ReopenTableRegionsProcedure

private List<HRegionLocation> regions = Collections.emptyList();

private int attempt;
private RetryCounter retryCounter;

public ReopenTableRegionsProcedure() {
}
Expand Down Expand Up @@ -125,13 +126,16 @@ protected Flow executeFromState(MasterProcedureEnv env, ReopenTableRegionsState
return Flow.NO_MORE_STATE;
}
if (regions.stream().anyMatch(loc -> canSchedule(env, loc))) {
attempt = 0;
retryCounter = null;
setNextState(ReopenTableRegionsState.REOPEN_TABLE_REGIONS_REOPEN_REGIONS);
return Flow.HAS_MORE_STATE;
}
// We can not schedule TRSP for all the regions need to reopen, wait for a while and retry
// again.
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempt++);
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.info(
"There are still {} region(s) which need to be reopened for table {} are in " +
"OPENING state, suspend {}secs and try again later",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand All @@ -53,7 +54,7 @@ public class SplitWALProcedure
private String walPath;
private ServerName worker;
private ServerName crashedServer;
private int attempts = 0;
private RetryCounter retryCounter;

public SplitWALProcedure() {
}
Expand Down Expand Up @@ -82,11 +83,16 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp
try {
finished = splitWALManager.isSplitWALFinished(walPath);
} catch (IOException ioe) {
long backoff = ProcedureUtil.getBackoffTimeMs(attempts++);
LOG.warn(
"Failed to check whether splitting wal {} success, wait {} seconds to retry",
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to check whether splitting wal {} success, wait {} seconds to retry",
walPath, backoff / 1000, ioe);
throw suspend(backoff);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
if (!finished) {
Expand Down Expand Up @@ -157,15 +163,6 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
return false;
}

protected final ProcedureSuspendedException suspend(long backoff)
throws ProcedureSuspendedException {
attempts++;
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

public String getWAL() {
return walPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.quotas.RpcThrottleStorage;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,11 +44,11 @@ public class SwitchRpcThrottleProcedure

private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class);

RpcThrottleStorage rpcThrottleStorage;
boolean rpcThrottleEnabled;
ProcedurePrepareLatch syncLatch;
ServerName serverName;
int attempts;
private RpcThrottleStorage rpcThrottleStorage;
private boolean rpcThrottleEnabled;
private ProcedurePrepareLatch syncLatch;
private ServerName serverName;
private RetryCounter retryCounter;

public SwitchRpcThrottleProcedure() {
}
Expand All @@ -68,7 +69,10 @@ protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState s
try {
switchThrottleState(env, rpcThrottleEnabled);
} catch (IOException e) {
long backoff = ProcedureUtil.getBackoffTimeMs(this.attempts++);
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed to store rpc throttle value {}, sleep {} secs and retry",
rpcThrottleEnabled, backoff / 1000, e);
setTimeout(Math.toIntExact(backoff));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package org.apache.hadoop.hbase.master.replication;

import java.io.IOException;
import java.util.function.LongConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerProcedureStateData;
Expand All @@ -38,7 +42,7 @@ public abstract class AbstractPeerNoLockProcedure<TState>

protected String peerId;

protected int attempts;
private RetryCounter retryCounter;

protected AbstractPeerNoLockProcedure() {
}
Expand Down Expand Up @@ -87,12 +91,20 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
return false;
}

protected final ProcedureSuspendedException suspend(long backoff)
throws ProcedureSuspendedException {
attempts++;
protected final ProcedureSuspendedException suspend(Configuration conf,
LongConsumer backoffConsumer) throws ProcedureSuspendedException {
if (retryCounter == null) {
retryCounter = ProcedureUtil.createRetryCounter(conf);
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
backoffConsumer.accept(backoff);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
}

protected final void resetRetry() {
retryCounter = null;
}
}
Loading

0 comments on commit e884a25

Please sign in to comment.