Skip to content

Commit

Permalink
HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Nov 14, 2023
1 parent 23c4156 commit 00590be
Show file tree
Hide file tree
Showing 16 changed files with 726 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
Expand Down Expand Up @@ -1989,71 +1991,78 @@ public RegionInfo getRegionInfo(final String encodedRegionName) {
// Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
// and pre-assumptions are very tricky.
// ============================================================================================
private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
RegionState.State... expectedStates) throws IOException {
private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
RegionState.State newState, RegionState.State... expectedStates) {
RegionState.State state = regionNode.getState();
regionNode.transitionState(newState, expectedStates);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
regionNode.transitionState(newState, expectedStates);
} catch (UnexpectedStateException e) {
return FutureUtils.failedFuture(e);
}
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e != null) {
// revert
regionNode.setState(state);
}
}
});
return future;
}

// should be called within the synchronized block of RegionStateNode
void regionOpening(RegionStateNode regionNode) throws IOException {
CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
// As in SCP, for performance reason, there is no TRSP attached with this region, we will not
// update the region state, which means that the region could be in any state when we want to
// assign it after a RS crash. So here we do not pass the expectedStates parameter.
transitStateAndUpdate(regionNode, State.OPENING);
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// should be called under the RegionStateNode lock
// The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
// we will persist the FAILED_OPEN state into hbase:meta.
void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
if (giveUp) {
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
if (!giveUp) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
return CompletableFuture.completedFuture(null);
}
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
});
return future;
}

// should be called under the RegionStateNode lock
void regionClosing(RegionStateNode regionNode) throws IOException {
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);

RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
.thenAccept(r -> {
RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// for open and close, they will first be persist to the procedure store in
Expand All @@ -2062,15 +2071,17 @@ void regionClosing(RegionStateNode regionNode) throws IOException {
// RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.

// should be called under the RegionStateNode lock
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
RegionInfo regionInfo = regionNode.getRegionInfo();
regionStates.addRegionToServer(regionNode);
regionStates.removeFromFailedOpen(regionInfo);
}

// should be called under the RegionStateNode lock
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
regionNode.setRegionLocation(null);
Expand All @@ -2080,40 +2091,41 @@ void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOEx
}
}

// should be called under the RegionStateNode lock
CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
}

// should be called under the RegionStateNode lock
// for SCP
public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.ABNORMALLY_CLOSED);
regionNode.setState(State.ABNORMALLY_CLOSED);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
}
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
}

void persistToMeta(RegionStateNode regionNode) throws IOException {
regionStateStore.updateRegionLocation(regionNode);
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
return future;
}

// ============================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
Expand Down Expand Up @@ -73,6 +75,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur

private RetryCounter retryCounter;

private CompletableFuture<Void> future;

protected RegionRemoteProcedureBase() {
}

Expand Down Expand Up @@ -268,11 +272,21 @@ private void unattach(MasterProcedureEnv env) {
getParent(env).unattachRemoteProc(this);
}

private CompletableFuture<Void> getFuture() {
return future;
}

private void setFuture(CompletableFuture<Void> f) {
future = f;
}

@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
if (future == null) {
regionNode.lock(this);
}
try {
switch (state) {
case REGION_REMOTE_PROCEDURE_DISPATCH: {
Expand All @@ -294,16 +308,29 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throw new ProcedureSuspendedException();
}
case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
env.getAssignmentManager().persistToMeta(regionNode);
unattach(env);
if (
ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
() -> unattach(env))
) {
return null;
}
ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
return null;
case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
// the remote call is failed so we do not need to change the region state, just return.
unattach(env);
return null;
case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
env.getAssignmentManager().regionClosedAbnormally(regionNode);
unattach(env);
if (
ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
() -> unattach(env))
) {
return null;
}
ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
() -> unattach(env));
return null;
default:
throw new IllegalStateException("Unknown state: " + state);
Expand All @@ -314,12 +341,11 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
throw suspend(Math.toIntExact(backoff), true);
} finally {
regionNode.unlock();
if (future == null) {
regionNode.unlock(this);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -30,6 +28,7 @@
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -75,7 +74,7 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) {
}
}

final Lock lock = new ReentrantLock();
private final RegionStateNodeLock lock;
private final RegionInfo regionInfo;
private final ProcedureEvent<?> event;
private final ConcurrentMap<RegionInfo, RegionStateNode> ritMap;
Expand Down Expand Up @@ -106,6 +105,7 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) {
this.regionInfo = regionInfo;
this.event = new AssignmentProcedureEvent(regionInfo);
this.ritMap = ritMap;
this.lock = new RegionStateNodeLock(regionInfo);
}

/**
Expand Down Expand Up @@ -319,6 +319,9 @@ public void checkOnline() throws DoNotRetryRegionException {
}
}

// The below 3 methods are for normal locking operation, where the thread owner is the current
// thread. Typically you just need to use these 3 methods, and use try..finally to release the
// lock in the finally block
public void lock() {
lock.lock();
}
Expand All @@ -330,4 +333,28 @@ public boolean tryLock() {
public void unlock() {
lock.unlock();
}

// The below 3 methods are for locking region state node when executing procedures, where we may
// do some time consuming work under the lock, for example, updating meta. As we may suspend the
// procedure while holding the lock and then release it when the procedure is back, in another
// thread, so we need to use the procedure itself as owner, instead of the current thread. You can
// see the usage in TRSP, SCP, and RegionRemoteProcedureBase for more details.
// Notice that, this does not mean you must use these 3 methods when locking region state node in
// procedure, you are free to use the above 3 methods if you do not want to hold the lock when
// suspending the procedure.
public void lock(Procedure<?> proc) {
lock.lock(proc);
}

public boolean tryLock(Procedure<?> proc) {
return lock.tryLock(proc);
}

public void unlock(Procedure<?> proc) {
lock.unlock(proc);
}

boolean isLocked() {
return lock.isLocked();
}
}
Loading

0 comments on commit 00590be

Please sign in to comment.