Skip to content

Commit

Permalink
fix race
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Nov 15, 2023
1 parent 52ab06d commit 2ff62c4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -73,7 +72,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure

private List<String> disabledPeerIds;

private CompletableFuture<?> future;
private CompletableFuture<Void> future;

private ExecutorService executor;

Expand All @@ -84,6 +83,14 @@ public String getGlobalId() {
return getClass().getSimpleName();
}

private CompletableFuture<Void> getFuture() {
return future;
}

private void setFuture(CompletableFuture<Void> f) {
future = f;
}

private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
throws ProcedureSuspendedException {
if (retryCounter == null) {
Expand Down Expand Up @@ -153,6 +160,12 @@ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSu
LOG.info("No pending peer procedures found, continue...");
}

private void finishMigartion() {
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
resetRetry();
}

@Override
protected Flow executeFromState(MasterProcedureEnv env,
MigrateReplicationQueueFromZkToTableState state)
Expand Down Expand Up @@ -195,52 +208,23 @@ protected Flow executeFromState(MasterProcedureEnv env,
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
if (future != null) {
// should have finished when we arrive here
assert future.isDone();
try {
future.get();
} catch (Exception e) {
future = null;
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
backoff / 1000, e));
try {
if (
ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
this::finishMigartion)
) {
return Flow.HAS_MORE_STATE;
}
shutdownExecutorService();
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
resetRetry();
return Flow.HAS_MORE_STATE;
ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
env.getReplicationPeerManager()
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()),
env, this::finishMigartion);
} catch (IOException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
future = env.getReplicationPeerManager()
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
FutureUtils.addListener(future, (r, e) -> {
// should acquire procedure execution lock to make sure that the procedure executor has
// finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
// race and cause unexpected result
IdLock procLock =
env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
IdLock.Entry lockEntry;
try {
lockEntry = procLock.getLockEntry(getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", this, ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
try {
setTimeoutFailure(env);
} finally {
procLock.releaseLockEntry(lockEntry);
}
});
// here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
setTimeout(-1);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
// skip persistence is a must now since when restarting, if the procedure is in
// WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
skipPersistence();
throw new ProcedureSuspendedException();
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
long rsWithLowerVersion =
env.getMasterServices().getServerManager().getOnlineServers().values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService
/**
* Submit the migration tasks to the given {@code executor}.
*/
CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
// the replication queue table creation is asynchronous and will be triggered by addPeer, so
// here we need to manually initialize it since we will not call addPeer.
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;

Expand All @@ -35,6 +39,8 @@
@InterfaceAudience.Private
public final class ProcedureFutureUtil {

private static final Logger LOG = LoggerFactory.getLogger(ProcedureFutureUtil.class);

private ProcedureFutureUtil() {
}

Expand All @@ -55,19 +61,44 @@ public static void suspendIfNecessary(Procedure<?> proc,
Consumer<CompletableFuture<Void>> setFuture, CompletableFuture<Void> future,
MasterProcedureEnv env, Runnable actionAfterDone)
throws IOException, ProcedureSuspendedException {
if (future.isDone()) {
FutureUtils.get(future);
actionAfterDone.run();
return;
}
// suspend the procedure and add it back when future completes
setFuture.accept(future);
MutableBoolean completed = new MutableBoolean(false);
Thread currentThread = Thread.currentThread();
FutureUtils.addListener(future, (r, e) -> {
synchronized (proc) {
proc.setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(proc);
if (Thread.currentThread() == currentThread) {
// this means the future has already been completed, as we call the callback directly while
// calling addListener, so here we just set completed to true without doing anything
completed.setTrue();
return;
}
// should acquire procedure execution lock to make sure that the procedure executor has
// finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
// race and cause unexpected result
IdLock procLock = env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
IdLock.Entry lockEntry;
try {
lockEntry = procLock.getLockEntry(proc.getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", proc, ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
try {
synchronized (proc) {
proc.setState(ProcedureProtos.ProcedureState.RUNNABLE);
env.getProcedureScheduler().addFront(proc);
}
} finally {
procLock.releaseLockEntry(lockEntry);
}
});
proc.suspend(Procedure.NO_TIMEOUT, false);
if (completed.getValue()) {
FutureUtils.get(future);
actionAfterDone.run();
} else {
// suspend the procedure
setFuture.accept(future);
proc.suspend(Procedure.NO_TIMEOUT, false);
}
}
}

0 comments on commit 2ff62c4

Please sign in to comment.