Skip to content

Commit

Permalink
HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta (apache#…
Browse files Browse the repository at this point in the history
…5520)

Signed-off-by: Yu Li <liyu@apache.org>
  • Loading branch information
Apache9 authored and kadirozde committed Jan 5, 2024
1 parent 63656e9 commit 3e57bdc
Show file tree
Hide file tree
Showing 20 changed files with 879 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -237,6 +239,12 @@ public interface ProcedureExecutorListener {
*/
private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor;

private ExecutorService forceUpdateExecutor;

// A thread pool for executing some asynchronous tasks for procedures, you can find references to
// getAsyncTaskExecutor to see the usage
private ExecutorService asyncTaskExecutor;

private int corePoolSize;
private int maxPoolSize;

Expand All @@ -247,9 +255,6 @@ public interface ProcedureExecutorListener {
*/
private final ProcedureScheduler scheduler;

private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());

private final AtomicLong lastProcId = new AtomicLong(-1);
private final AtomicLong workerId = new AtomicLong(0);
private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
Expand Down Expand Up @@ -317,19 +322,6 @@ public ProcedureExecutor(final Configuration conf, final TEnvironment environmen
this.conf = conf;
this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET);
refreshConfiguration(conf);
store.registerListener(new ProcedureStoreListener() {

@Override
public void forceUpdate(long[] procIds) {
Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
try {
forceUpdateProcedure(procId);
} catch (IOException e) {
LOG.warn("Failed to force update procedure with pid={}", procId);
}
}));
}
});
}

private void load(final boolean abortOnCorruption) throws IOException {
Expand Down Expand Up @@ -614,6 +606,28 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException {
this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout");
this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor");

int size = Math.max(2, Runtime.getRuntime().availableProcessors());
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d").build());
executor.allowCoreThreadTimeOut(true);
this.asyncTaskExecutor = executor;
forceUpdateExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build());
store.registerListener(new ProcedureStoreListener() {

@Override
public void forceUpdate(long[] procIds) {
Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> {
try {
forceUpdateProcedure(procId);
} catch (IOException e) {
LOG.warn("Failed to force update procedure with pid={}", procId);
}
}));
}
});

// Create the workers
workerId.set(0);
workerThreads = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -678,6 +692,8 @@ public void stop() {
scheduler.stop();
timeoutExecutor.sendStopSignal();
workerMonitorExecutor.sendStopSignal();
forceUpdateExecutor.shutdown();
asyncTaskExecutor.shutdown();
}

public void join() {
Expand Down Expand Up @@ -2055,6 +2071,13 @@ public IdLock getProcExecutionLock() {
return procExecutionLock;
}

/**
* Get a thread pool for executing some asynchronous tasks
*/
public ExecutorService getAsyncTaskExecutor() {
return asyncTaskExecutor;
}

// ==========================================================================
// Worker Thread
// ==========================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
Expand Down Expand Up @@ -1989,71 +1991,78 @@ public RegionInfo getRegionInfo(final String encodedRegionName) {
// Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
// and pre-assumptions are very tricky.
// ============================================================================================
private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
RegionState.State... expectedStates) throws IOException {
private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
RegionState.State newState, RegionState.State... expectedStates) {
RegionState.State state = regionNode.getState();
regionNode.transitionState(newState, expectedStates);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
regionNode.transitionState(newState, expectedStates);
} catch (UnexpectedStateException e) {
return FutureUtils.failedFuture(e);
}
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e != null) {
// revert
regionNode.setState(state);
}
}
});
return future;
}

// should be called within the synchronized block of RegionStateNode
void regionOpening(RegionStateNode regionNode) throws IOException {
CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
// As in SCP, for performance reason, there is no TRSP attached with this region, we will not
// update the region state, which means that the region could be in any state when we want to
// assign it after a RS crash. So here we do not pass the expectedStates parameter.
transitStateAndUpdate(regionNode, State.OPENING);
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// should be called under the RegionStateNode lock
// The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
// we will persist the FAILED_OPEN state into hbase:meta.
void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
if (giveUp) {
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
if (!giveUp) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
return CompletableFuture.completedFuture(null);
}
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
});
return future;
}

// should be called under the RegionStateNode lock
void regionClosing(RegionStateNode regionNode) throws IOException {
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);

RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
.thenAccept(r -> {
RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// for open and close, they will first be persist to the procedure store in
Expand All @@ -2062,15 +2071,17 @@ void regionClosing(RegionStateNode regionNode) throws IOException {
// RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.

// should be called under the RegionStateNode lock
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
RegionInfo regionInfo = regionNode.getRegionInfo();
regionStates.addRegionToServer(regionNode);
regionStates.removeFromFailedOpen(regionInfo);
}

// should be called under the RegionStateNode lock
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
regionNode.setRegionLocation(null);
Expand All @@ -2080,40 +2091,41 @@ void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOEx
}
}

// should be called under the RegionStateNode lock
CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
}

// should be called under the RegionStateNode lock
// for SCP
public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.ABNORMALLY_CLOSED);
regionNode.setState(State.ABNORMALLY_CLOSED);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
}
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
}

void persistToMeta(RegionStateNode regionNode) throws IOException {
regionStateStore.updateRegionLocation(regionNode);
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
return future;
}

// ============================================================================================
Expand Down
Loading

0 comments on commit 3e57bdc

Please sign in to comment.