From cc5f68bd88061d687e1730ef48b43c0b6c4d4551 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 13 Nov 2023 19:45:03 +0800 Subject: [PATCH] HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta --- .../hbase/procedure2/ProcedureExecutor.java | 57 ++++-- .../master/assignment/AssignmentManager.java | 150 ++++++++-------- .../assignment/RegionRemoteProcedureBase.java | 46 +++-- .../master/assignment/RegionStateNode.java | 33 +++- .../assignment/RegionStateNodeLock.java | 166 ++++++++++++++++++ .../master/assignment/RegionStateStore.java | 67 +++++-- .../TransitRegionStateProcedure.java | 126 +++++++++---- .../master/procedure/MasterProcedureEnv.java | 8 + .../procedure/ServerCrashProcedure.java | 57 +++++- .../procedure/TruncateRegionProcedure.java | 2 +- ...eplicationQueueFromZkToTableProcedure.java | 78 ++++---- .../replication/ReplicationPeerManager.java | 2 +- .../hbase/procedure2/ProcedureFutureUtil.java | 112 ++++++++++++ .../master/assignment/MockMasterServices.java | 4 +- .../assignment/TestAssignmentManagerUtil.java | 3 +- .../TestOpenRegionProcedureBackoff.java | 7 +- .../assignment/TestRaceBetweenSCPAndTRSP.java | 13 +- .../assignment/TestRegionStateNodeLock.java | 139 +++++++++++++++ .../master/assignment/TestRollbackSCP.java | 8 +- .../procedure/TestProcedurePriority.java | 20 ++- 20 files changed, 879 insertions(+), 219 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3099c64e00f6..5aa11811122b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -32,8 +32,10 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -237,6 +239,12 @@ public interface ProcedureExecutorListener { */ private TimeoutExecutorThread workerMonitorExecutor; + private ExecutorService forceUpdateExecutor; + + // A thread pool for executing some asynchronous tasks for procedures, you can find references to + // getAsyncTaskExecutor to see the usage + private ExecutorService asyncTaskExecutor; + private int corePoolSize; private int maxPoolSize; @@ -247,9 +255,6 @@ public interface ProcedureExecutorListener { */ private final ProcedureScheduler scheduler; - private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); - private final AtomicLong lastProcId = new AtomicLong(-1); private final AtomicLong workerId = new AtomicLong(0); private final AtomicInteger activeExecutorCount = new AtomicInteger(0); @@ -317,19 +322,6 @@ public ProcedureExecutor(final Configuration conf, final TEnvironment environmen this.conf = conf; this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); refreshConfiguration(conf); - store.registerListener(new ProcedureStoreListener() { - - @Override - public void forceUpdate(long[] procIds) { - Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { - try { - forceUpdateProcedure(procId); - } catch (IOException e) { - LOG.warn("Failed to force update procedure with pid={}", procId); - } - })); - } - }); } private void load(final boolean abortOnCorruption) throws IOException { @@ -614,6 +606,28 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException { this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); + int size = Math.max(2, Runtime.getRuntime().availableProcessors()); + ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, + new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d").build()); + executor.allowCoreThreadTimeOut(true); + this.asyncTaskExecutor = executor; + forceUpdateExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); + store.registerListener(new ProcedureStoreListener() { + + @Override + public void forceUpdate(long[] procIds) { + Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { + try { + forceUpdateProcedure(procId); + } catch (IOException e) { + LOG.warn("Failed to force update procedure with pid={}", procId); + } + })); + } + }); + // Create the workers workerId.set(0); workerThreads = new CopyOnWriteArrayList<>(); @@ -678,6 +692,8 @@ public void stop() { scheduler.stop(); timeoutExecutor.sendStopSignal(); workerMonitorExecutor.sendStopSignal(); + forceUpdateExecutor.shutdown(); + asyncTaskExecutor.shutdown(); } public void join() { @@ -2055,6 +2071,13 @@ public IdLock getProcExecutionLock() { return procExecutionLock; } + /** + * Get a thread pool for executing some asynchronous tasks + */ + public ExecutorService getAsyncTaskExecutor() { + return asyncTaskExecutor; + } + // ========================================================================== // Worker Thread // ========================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 804757959d5c..474b95a2a69b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; @@ -1989,71 +1991,78 @@ public RegionInfo getRegionInfo(final String encodedRegionName) { // Should only be called in TransitRegionStateProcedure(and related procedures), as the locking // and pre-assumptions are very tricky. // ============================================================================================ - private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState, - RegionState.State... expectedStates) throws IOException { + private CompletableFuture transitStateAndUpdate(RegionStateNode regionNode, + RegionState.State newState, RegionState.State... expectedStates) { RegionState.State state = regionNode.getState(); - regionNode.transitionState(newState, expectedStates); - boolean succ = false; try { - regionStateStore.updateRegionLocation(regionNode); - succ = true; - } finally { - if (!succ) { + regionNode.transitionState(newState, expectedStates); + } catch (UnexpectedStateException e) { + return FutureUtils.failedFuture(e); + } + CompletableFuture future = regionStateStore.updateRegionLocation(regionNode); + FutureUtils.addListener(future, (r, e) -> { + if (e != null) { // revert regionNode.setState(state); } - } + }); + return future; } // should be called within the synchronized block of RegionStateNode - void regionOpening(RegionStateNode regionNode) throws IOException { + CompletableFuture regionOpening(RegionStateNode regionNode) { // As in SCP, for performance reason, there is no TRSP attached with this region, we will not // update the region state, which means that the region could be in any state when we want to // assign it after a RS crash. So here we do not pass the expectedStates parameter. - transitStateAndUpdate(regionNode, State.OPENING); - regionStates.addRegionToServer(regionNode); - // update the operation count metrics - metrics.incrementOperationCounter(); + return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> { + regionStates.addRegionToServer(regionNode); + // update the operation count metrics + metrics.incrementOperationCounter(); + }); } // should be called under the RegionStateNode lock // The parameter 'giveUp' means whether we will try to open the region again, if it is true, then // we will persist the FAILED_OPEN state into hbase:meta. - void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException { + CompletableFuture regionFailedOpen(RegionStateNode regionNode, boolean giveUp) { RegionState.State state = regionNode.getState(); ServerName regionLocation = regionNode.getRegionLocation(); - if (giveUp) { - regionNode.setState(State.FAILED_OPEN); - regionNode.setRegionLocation(null); - boolean succ = false; - try { - regionStateStore.updateRegionLocation(regionNode); - succ = true; - } finally { - if (!succ) { - // revert - regionNode.setState(state); - regionNode.setRegionLocation(regionLocation); - } + if (!giveUp) { + if (regionLocation != null) { + regionStates.removeRegionFromServer(regionLocation, regionNode); } + return CompletableFuture.completedFuture(null); } - if (regionLocation != null) { - regionStates.removeRegionFromServer(regionLocation, regionNode); - } + regionNode.setState(State.FAILED_OPEN); + regionNode.setRegionLocation(null); + CompletableFuture future = regionStateStore.updateRegionLocation(regionNode); + FutureUtils.addListener(future, (r, e) -> { + if (e == null) { + if (regionLocation != null) { + regionStates.removeRegionFromServer(regionLocation, regionNode); + } + } else { + // revert + regionNode.setState(state); + regionNode.setRegionLocation(regionLocation); + } + }); + return future; } // should be called under the RegionStateNode lock - void regionClosing(RegionStateNode regionNode) throws IOException { - transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING); - - RegionInfo hri = regionNode.getRegionInfo(); - // Set meta has not initialized early. so people trying to create/edit tables will wait - if (isMetaRegion(hri)) { - setMetaAssigned(hri, false); - } - regionStates.addRegionToServer(regionNode); - // update the operation count metrics - metrics.incrementOperationCounter(); + CompletableFuture regionClosing(RegionStateNode regionNode) { + return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING) + .thenAccept(r -> { + RegionInfo hri = regionNode.getRegionInfo(); + // Set meta has not initialized early. so people trying to create/edit tables will wait + if (isMetaRegion(hri)) { + setMetaAssigned(hri, false); + } + regionStates.addRegionToServer(regionNode); + // update the operation count metrics + metrics.incrementOperationCounter(); + }); } // for open and close, they will first be persist to the procedure store in @@ -2062,7 +2071,8 @@ void regionClosing(RegionStateNode regionNode) throws IOException { // RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta. // should be called under the RegionStateNode lock - void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { + void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) + throws UnexpectedStateException { regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN); RegionInfo regionInfo = regionNode.getRegionInfo(); regionStates.addRegionToServer(regionNode); @@ -2070,7 +2080,8 @@ void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOEx } // should be called under the RegionStateNode lock - void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException { + void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) + throws UnexpectedStateException { ServerName regionLocation = regionNode.getRegionLocation(); regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED); regionNode.setRegionLocation(null); @@ -2080,40 +2091,41 @@ void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOEx } } + // should be called under the RegionStateNode lock + CompletableFuture persistToMeta(RegionStateNode regionNode) { + return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> { + RegionInfo regionInfo = regionNode.getRegionInfo(); + if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { + // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it + // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager + // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state + // on table that contains state. + setMetaAssigned(regionInfo, true); + } + }); + } + // should be called under the RegionStateNode lock // for SCP - public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException { + public CompletableFuture regionClosedAbnormally(RegionStateNode regionNode) { RegionState.State state = regionNode.getState(); ServerName regionLocation = regionNode.getRegionLocation(); - regionNode.transitionState(State.ABNORMALLY_CLOSED); + regionNode.setState(State.ABNORMALLY_CLOSED); regionNode.setRegionLocation(null); - boolean succ = false; - try { - regionStateStore.updateRegionLocation(regionNode); - succ = true; - } finally { - if (!succ) { + CompletableFuture future = regionStateStore.updateRegionLocation(regionNode); + FutureUtils.addListener(future, (r, e) -> { + if (e == null) { + if (regionLocation != null) { + regionNode.setLastHost(regionLocation); + regionStates.removeRegionFromServer(regionLocation, regionNode); + } + } else { // revert regionNode.setState(state); regionNode.setRegionLocation(regionLocation); } - } - if (regionLocation != null) { - regionNode.setLastHost(regionLocation); - regionStates.removeRegionFromServer(regionLocation, regionNode); - } - } - - void persistToMeta(RegionStateNode regionNode) throws IOException { - regionStateStore.updateRegionLocation(regionNode); - RegionInfo regionInfo = regionNode.getRegionInfo(); - if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) { - // Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it - // can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager - // which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state - // on table that contains state. - setMetaAssigned(regionInfo, true); - } + }); + return future; } // ============================================================================================ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java index 6b6da9e33965..d27e0068b0ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; @@ -73,6 +75,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure future; + protected RegionRemoteProcedureBase() { } @@ -268,11 +272,21 @@ private void unattach(MasterProcedureEnv env) { getParent(env).unattachRemoteProc(this); } + private CompletableFuture getFuture() { + return future; + } + + private void setFuture(CompletableFuture f) { + future = f; + } + @Override protected Procedure[] execute(MasterProcedureEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { RegionStateNode regionNode = getRegionNode(env); - regionNode.lock(); + if (future == null) { + regionNode.lock(this); + } try { switch (state) { case REGION_REMOTE_PROCEDURE_DISPATCH: { @@ -294,16 +308,29 @@ protected Procedure[] execute(MasterProcedureEnv env) throw new ProcedureSuspendedException(); } case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED: - env.getAssignmentManager().persistToMeta(regionNode); - unattach(env); + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + () -> unattach(env)) + ) { + return null; + } + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env)); return null; case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL: // the remote call is failed so we do not need to change the region state, just return. unattach(env); return null; case REGION_REMOTE_PROCEDURE_SERVER_CRASH: - env.getAssignmentManager().regionClosedAbnormally(regionNode); - unattach(env); + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + () -> unattach(env)) + ) { + return null; + } + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getAssignmentManager().regionClosedAbnormally(regionNode), env, + () -> unattach(env)); return null; default: throw new IllegalStateException("Unknown state: " + state); @@ -314,12 +341,11 @@ protected Procedure[] execute(MasterProcedureEnv env) } long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e); - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); - throw new ProcedureSuspendedException(); + throw suspend(Math.toIntExact(backoff), true); } finally { - regionNode.unlock(); + if (future == null) { + regionNode.unlock(this); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java index 91c0222facd1..de00ca92e4c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java @@ -19,8 +19,6 @@ import java.util.Arrays; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -30,6 +28,7 @@ import org.apache.hadoop.hbase.exceptions.UnexpectedStateException; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; @@ -75,7 +74,7 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) { } } - final Lock lock = new ReentrantLock(); + private final RegionStateNodeLock lock; private final RegionInfo regionInfo; private final ProcedureEvent event; private final ConcurrentMap ritMap; @@ -106,6 +105,7 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) { this.regionInfo = regionInfo; this.event = new AssignmentProcedureEvent(regionInfo); this.ritMap = ritMap; + this.lock = new RegionStateNodeLock(regionInfo); } /** @@ -319,6 +319,9 @@ public void checkOnline() throws DoNotRetryRegionException { } } + // The below 3 methods are for normal locking operation, where the thread owner is the current + // thread. Typically you just need to use these 3 methods, and use try..finally to release the + // lock in the finally block public void lock() { lock.lock(); } @@ -330,4 +333,28 @@ public boolean tryLock() { public void unlock() { lock.unlock(); } + + // The below 3 methods are for locking region state node when executing procedures, where we may + // do some time consuming work under the lock, for example, updating meta. As we may suspend the + // procedure while holding the lock and then release it when the procedure is back, in another + // thread, so we need to use the procedure itself as owner, instead of the current thread. You can + // see the usage in TRSP, SCP, and RegionRemoteProcedureBase for more details. + // Notice that, this does not mean you must use these 3 methods when locking region state node in + // procedure, you are free to use the above 3 methods if you do not want to hold the lock when + // suspending the procedure. + public void lock(Procedure proc) { + lock.lock(proc); + } + + public boolean tryLock(Procedure proc) { + return lock.tryLock(proc); + } + + public void unlock(Procedure proc) { + lock.unlock(proc); + } + + boolean isLocked() { + return lock.isLocked(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java new file mode 100644 index 000000000000..a672425c8ed2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNodeLock.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A lock implementation which supports unlock by another thread. + *

+ * This is because we need to hold region state node lock while updating region state to meta(for + * keeping consistency), so it is better to yield the procedure to release the procedure worker. But + * after waking up the procedure, we may use another procedure worker to execute the procedure, + * which means we need to unlock by another thread. See HBASE-28196 for more details. + */ +@InterfaceAudience.Private +class RegionStateNodeLock { + + // for better logging message + private final RegionInfo regionInfo; + + private final Lock lock = new ReentrantLock(); + + private final Condition cond = lock.newCondition(); + + private Object owner; + + private int count; + + RegionStateNodeLock(RegionInfo regionInfo) { + this.regionInfo = regionInfo; + } + + private void lock0(Object lockBy) { + lock.lock(); + try { + for (;;) { + if (owner == null) { + owner = lockBy; + count = 1; + return; + } + if (owner == lockBy) { + count++; + return; + } + cond.awaitUninterruptibly(); + } + } finally { + lock.unlock(); + } + } + + private boolean tryLock0(Object lockBy) { + if (!lock.tryLock()) { + return false; + } + try { + if (owner == null) { + owner = lockBy; + count = 1; + return true; + } + if (owner == lockBy) { + count++; + return true; + } + return false; + } finally { + lock.unlock(); + } + } + + private void unlock0(Object unlockBy) { + lock.lock(); + try { + if (owner == null) { + throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is not locked"); + } + if (owner != unlockBy) { + throw new IllegalMonitorStateException("RegionStateNode " + regionInfo + " is locked by " + + owner + ", can not be unlocked by " + unlockBy); + } + count--; + if (count == 0) { + owner = null; + cond.signal(); + } + } finally { + lock.unlock(); + } + } + + /** + * Normal lock, will set the current thread as owner. Typically you should use try...finally to + * call unlock in the finally block. + */ + void lock() { + lock0(Thread.currentThread()); + } + + /** + * Normal tryLock, will set the current thread as owner. Typically you should use try...finally to + * call unlock in the finally block. + */ + boolean tryLock() { + return tryLock0(Thread.currentThread()); + } + + /** + * Normal unLock, will use the current thread as owner. Typically you should use try...finally to + * call unlock in the finally block. + */ + void unlock() { + unlock0(Thread.currentThread()); + } + + /** + * Lock by a procedure. You can release the lock in another thread. + */ + void lock(Procedure proc) { + lock0(proc); + } + + /** + * TryLock by a procedure. You can release the lock in another thread. + */ + boolean tryLock(Procedure proc) { + return tryLock0(proc); + } + + /** + * Unlock by a procedure. You do not need to call this method in the same thread with lock. + */ + void unlock(Procedure proc) { + unlock0(proc); + } + + boolean isLocked() { + lock.lock(); + try { + return owner != null; + } finally { + lock.unlock(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index 3561e0cd055b..4d506365f238 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -176,7 +176,7 @@ public static void visitMetaEntry(final RegionStateVisitor visitor, final Result } } - void updateRegionLocation(RegionStateNode regionStateNode) throws IOException { + private Put generateUpdateRegionLocationPut(RegionStateNode regionStateNode) throws IOException { long time = EnvironmentEdgeManager.currentTime(); long openSeqNum = regionStateNode.getState() == State.OPEN ? regionStateNode.getOpenSeqNum() @@ -221,11 +221,34 @@ && hasGlobalReplicationScope(regionInfo.getTable()) .setTimestamp(put.getTimestamp()).setType(Cell.Type.Put).setValue(Bytes.toBytes(state.name())) .build()); LOG.info(info.toString()); - updateRegionLocation(regionInfo, state, put); + return put; + } + + CompletableFuture updateRegionLocation(RegionStateNode regionStateNode) { + Put put; + try { + put = generateUpdateRegionLocationPut(regionStateNode); + } catch (IOException e) { + return FutureUtils.failedFuture(e); + } + RegionInfo regionInfo = regionStateNode.getRegionInfo(); + State state = regionStateNode.getState(); + CompletableFuture future = updateRegionLocation(regionInfo, state, put); if (regionInfo.isMetaRegion() && regionInfo.isFirst()) { // mirror the meta location to zookeeper - mirrorMetaLocation(regionInfo, regionLocation, state); + // we store meta location in master local region which means the above method is + // synchronous(we just wrap the result with a CompletableFuture to make it look like + // asynchronous), so it is OK to just call this method directly here + assert future.isDone(); + if (!future.isCompletedExceptionally()) { + try { + mirrorMetaLocation(regionInfo, regionStateNode.getRegionLocation(), state); + } catch (IOException e) { + return FutureUtils.failedFuture(e); + } + } } + return future; } private void mirrorMetaLocation(RegionInfo regionInfo, ServerName serverName, State state) @@ -249,25 +272,31 @@ private void removeMirrorMetaLocation(int oldReplicaCount, int newReplicaCount) } } - private void updateRegionLocation(RegionInfo regionInfo, State state, Put put) - throws IOException { - try { - if (regionInfo.isMetaRegion()) { + private CompletableFuture updateRegionLocation(RegionInfo regionInfo, State state, + Put put) { + CompletableFuture future; + if (regionInfo.isMetaRegion()) { + try { masterRegion.update(r -> r.put(put)); - } else { - try (Table table = master.getConnection().getTable(TableName.META_TABLE_NAME)) { - table.put(put); - } + future = CompletableFuture.completedFuture(null); + } catch (Exception e) { + future = FutureUtils.failedFuture(e); } - } catch (IOException e) { - // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host! - // In tests we abort the Master! - String msg = String.format("FAILED persisting region=%s state=%s", - regionInfo.getShortNameToLog(), state); - LOG.error(msg, e); - master.abort(msg, e); - throw e; + } else { + AsyncTable table = master.getAsyncConnection().getTable(TableName.META_TABLE_NAME); + future = table.put(put); } + FutureUtils.addListener(future, (r, e) -> { + if (e != null) { + // TODO: Revist!!!! Means that if a server is loaded, then we will abort our host! + // In tests we abort the Master! + String msg = String.format("FAILED persisting region=%s state=%s", + regionInfo.getShortNameToLog(), state); + LOG.error(msg, e); + master.abort(msg, e); + } + }); + return future; } private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java index 81397915647d..911c0f3111e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java @@ -24,6 +24,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.ServerName; @@ -38,11 +39,13 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -137,6 +140,8 @@ public class TransitRegionStateProcedure private long forceRetainmentTotalWait; + private CompletableFuture future; + public TransitRegionStateProcedure() { } @@ -268,21 +273,54 @@ private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode) } } - private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode) throws IOException { + private CompletableFuture getFuture() { + return future; + } + + private void setFuture(CompletableFuture f) { + future = f; + } + + private void openRegionAfterUpdatingMeta(ServerName loc) { + addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc)); + setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED); + } + + private void openRegion(MasterProcedureEnv env, RegionStateNode regionNode) + throws IOException, ProcedureSuspendedException { ServerName loc = regionNode.getRegionLocation(); + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + () -> openRegionAfterUpdatingMeta(loc)) + ) { + return; + } if (loc == null || BOGUS_SERVER_NAME.equals(loc)) { LOG.warn("No location specified for {}, jump back to state {} to get one", getRegion(), RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE); setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE); throw new HBaseIOException("Failed to open region, the location is null or bogus."); } - env.getAssignmentManager().regionOpening(regionNode); - addChildProcedure(new OpenRegionProcedure(this, getRegion(), loc)); - setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED); + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getAssignmentManager().regionOpening(regionNode), env, + () -> openRegionAfterUpdatingMeta(loc)); + } + + private void regionFailedOpenAfterUpdatingMeta(MasterProcedureEnv env, + RegionStateNode regionNode) { + setFailure(getClass().getSimpleName(), new RetriesExhaustedException( + "Max attempts " + env.getAssignmentManager().getAssignMaxAttempts() + " exceeded")); + regionNode.unsetProcedure(this); } private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) - throws IOException { + throws IOException, ProcedureSuspendedException { + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + () -> regionFailedOpenAfterUpdatingMeta(env, regionNode)) + ) { + return Flow.NO_MORE_STATE; + } if (regionNode.isInState(State.OPEN)) { retryCounter = null; if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) { @@ -306,14 +344,16 @@ private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) LOG.info("Retry={} of max={}; {}; {}", retries, maxAttempts, this, regionNode.toShortString()); if (retries >= maxAttempts) { - env.getAssignmentManager().regionFailedOpen(regionNode, true); - setFailure(getClass().getSimpleName(), new RetriesExhaustedException( - "Max attempts " + env.getAssignmentManager().getAssignMaxAttempts() + " exceeded")); - regionNode.unsetProcedure(this); + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getAssignmentManager().regionFailedOpen(regionNode, true), env, + () -> regionFailedOpenAfterUpdatingMeta(env, regionNode)); return Flow.NO_MORE_STATE; } - env.getAssignmentManager().regionFailedOpen(regionNode, false); + // if not giving up, we will not update meta, so the returned CompletableFuture should be a fake + // one, which should have been completed already + CompletableFuture future = env.getAssignmentManager().regionFailedOpen(regionNode, false); + assert future.isDone(); // we failed to assign the region, force a new plan forceNewPlan = true; regionNode.setRegionLocation(null); @@ -329,17 +369,29 @@ private Flow confirmOpened(MasterProcedureEnv env, RegionStateNode regionNode) } } - private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) throws IOException { + private void closeRegionAfterUpdatingMeta(RegionStateNode regionNode) { + CloseRegionProcedure closeProc = isSplit + ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, + true) + : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), assignCandidate, + evictCache); + addChildProcedure(closeProc); + setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED); + } + + private void closeRegion(MasterProcedureEnv env, RegionStateNode regionNode) + throws IOException, ProcedureSuspendedException { + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + () -> closeRegionAfterUpdatingMeta(regionNode)) + ) { + return; + } if (regionNode.isInState(State.OPEN, State.CLOSING, State.MERGING, State.SPLITTING)) { // this is the normal case - env.getAssignmentManager().regionClosing(regionNode); - CloseRegionProcedure closeProc = isSplit - ? new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), - assignCandidate, true) - : new CloseRegionProcedure(this, getRegion(), regionNode.getRegionLocation(), - assignCandidate, evictCache); - addChildProcedure(closeProc); - setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED); + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getAssignmentManager().regionClosing(regionNode), env, + () -> closeRegionAfterUpdatingMeta(regionNode)); } else { forceNewPlan = true; regionNode.setRegionLocation(null); @@ -393,11 +445,18 @@ protected Procedure[] execute(MasterProcedureEnv env) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { RegionStateNode regionNode = env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(getRegion()); - regionNode.lock(); + if (future == null) { + // if future is not null, we will not release the regionNode lock, so do not need to lock it + // again + regionNode.lock(this); + } try { return super.execute(env); } finally { - regionNode.unlock(); + if (future == null) { + // release the lock if there is no pending updating meta operation + regionNode.unlock(this); + } } } @@ -452,10 +511,7 @@ protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionSta "Failed transition, suspend {}secs {}; {}; waiting on rectified condition fixed " + "by other Procedure or operator intervention", backoff / 1000, this, regionNode.toShortString(), e); - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); - throw new ProcedureSuspendedException(); + throw suspend(Math.toIntExact(backoff), true); } } @@ -492,15 +548,25 @@ public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, } // Should be called with RegionStateNode locked - public void serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, - ServerName serverName, boolean forceNewPlan) throws IOException { + public CompletableFuture serverCrashed(MasterProcedureEnv env, RegionStateNode regionNode, + ServerName serverName, boolean forceNewPlan) { this.forceNewPlan = forceNewPlan; if (remoteProc != null) { // this means we are waiting for the sub procedure, so wake it up - remoteProc.serverCrashed(env, regionNode, serverName); + try { + remoteProc.serverCrashed(env, regionNode, serverName); + } catch (Exception e) { + return FutureUtils.failedFuture(e); + } + return CompletableFuture.completedFuture(null); } else { - // we are in RUNNING state, just update the region state, and we will process it later. - env.getAssignmentManager().regionClosedAbnormally(regionNode); + if (regionNode.isInState(State.ABNORMALLY_CLOSED)) { + // should be a retry, where we have already changed the region state to abnormally closed + return CompletableFuture.completedFuture(null); + } else { + // we are in RUNNING state, just update the region state, and we will process it later. + return env.getAssignmentManager().regionClosedAbnormally(regionNode); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index f7f4146bd0d5..218d3096d8df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -86,6 +87,13 @@ public MasterProcedureEnv(final MasterServices master, this.remoteDispatcher = remoteDispatcher; } + /** + * Get a thread pool for executing some asynchronous tasks + */ + public ExecutorService getAsyncTaskExecutor() { + return master.getMasterProcedureExecutor().getAsyncTaskExecutor(); + } + public User getRequestUser() { return RpcServer.getRequestUser().orElse(Superusers.getSystemUser()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 97976756d828..901cc38a7be7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -107,6 +109,10 @@ public class ServerCrashProcedure extends // progress will not update the state because the actual state is overwritten by its next state private ServerCrashState currentRunningState = getInitialState(); + private CompletableFuture updateMetaFuture; + + private int processedRegions = 0; + /** * Call this constructor queuing up a Procedure. * @param serverName Name of the crashed server. @@ -532,6 +538,14 @@ protected boolean isMatchingRegionLocation(RegionStateNode rsn) { return this.serverName.equals(rsn.getRegionLocation()); } + private CompletableFuture getUpdateMetaFuture() { + return updateMetaFuture; + } + + private void setUpdateMetaFuture(CompletableFuture f) { + updateMetaFuture = f; + } + /** * Assign the regions on the crashed RS to other Rses. *

@@ -542,14 +556,30 @@ protected boolean isMatchingRegionLocation(RegionStateNode rsn) { * We will also check whether the table for a region is enabled, if not, we will skip assigning * it. */ - private void assignRegions(MasterProcedureEnv env, List regions) throws IOException { + private void assignRegions(MasterProcedureEnv env, List regions) + throws IOException, ProcedureSuspendedException { AssignmentManager am = env.getMasterServices().getAssignmentManager(); boolean retainAssignment = env.getMasterConfiguration().getBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, DEFAULT_MASTER_SCP_RETAIN_ASSIGNMENT); - for (RegionInfo region : regions) { + // Since we may suspend in the middle of this loop, so here we use processedRegions to record + // the progress, so next time we can locate the correct region + // We do not need to persist the processedRegions when serializing the procedure, as when master + // restarts, the sub procedure list will be cleared when rescheduling this SCP again, so we need + // to start from beginning. + for (int n = regions.size(); processedRegions < n; processedRegions++) { + RegionInfo region = regions.get(processedRegions); RegionStateNode regionNode = am.getRegionStates().getOrCreateRegionStateNode(region); - regionNode.lock(); + if (updateMetaFuture == null) { + regionNode.lock(this); + } try { + if ( + ProcedureFutureUtil.checkFuture(this, this::getUpdateMetaFuture, + this::setUpdateMetaFuture, () -> { + }) + ) { + continue; + } // This is possible, as when a server is dead, TRSP will fail to schedule a RemoteProcedure // and then try to assign the region to a new RS. And before it has updated the region // location to the new RS, we may have already called the am.getRegionsOnServer so we will @@ -572,8 +602,10 @@ private void assignRegions(MasterProcedureEnv env, List regions) thr } if (regionNode.getProcedure() != null) { LOG.info("{} found RIT {}; {}", this, regionNode.getProcedure(), regionNode); - regionNode.getProcedure().serverCrashed(env, regionNode, getServerName(), - !retainAssignment); + ProcedureFutureUtil.suspendIfNecessary(this, this::setUpdateMetaFuture, regionNode + .getProcedure().serverCrashed(env, regionNode, getServerName(), !retainAssignment), env, + () -> { + }); continue; } if ( @@ -583,7 +615,9 @@ private void assignRegions(MasterProcedureEnv env, List regions) thr // We need to change the state here otherwise the TRSP scheduled by DTP will try to // close the region from a dead server and will never succeed. Please see HBASE-23636 // for more details. - env.getAssignmentManager().regionClosedAbnormally(regionNode); + ProcedureFutureUtil.suspendIfNecessary(this, this::setUpdateMetaFuture, + env.getAssignmentManager().regionClosedAbnormally(regionNode), env, () -> { + }); LOG.info("{} found table disabling for region {}, set it state to ABNORMALLY_CLOSED.", this, regionNode); continue; @@ -599,11 +633,20 @@ private void assignRegions(MasterProcedureEnv env, List regions) thr TransitRegionStateProcedure proc = TransitRegionStateProcedure.assign(env, region, !retainAssignment, null); regionNode.setProcedure(proc); + // It is OK to still use addChildProcedure even if we suspend in the middle of this loop, as + // the subProcList will only be cleared when we successfully returned from the + // executeFromState method. This means we will submit all the TRSPs after we successfully + // finished this loop addChildProcedure(proc); } finally { - regionNode.unlock(); + if (updateMetaFuture == null) { + regionNode.unlock(this); + } } } + // we will call this method two times if the region server carries meta, so we need to reset it + // to 0 after successfully finished the above loop + processedRegions = 0; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java index 5e907c1681ac..9730391baf22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateRegionProcedure.java @@ -110,8 +110,8 @@ assert getRegion().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID || isFailed() private void deleteRegionFromFileSystem(final MasterProcedureEnv env) throws IOException { RegionStateNode regionNode = env.getAssignmentManager().getRegionStates().getRegionStateNode(getRegion()); + regionNode.lock(); try { - regionNode.lock(); final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName()); HRegionFileSystem.deleteRegionFromFileSystem(env.getMasterConfiguration(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java index c88d613e5260..cff1b3879360 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; @@ -43,8 +44,6 @@ import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; -import org.apache.hadoop.hbase.util.FutureUtils; -import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; @@ -73,7 +72,7 @@ public class MigrateReplicationQueueFromZkToTableProcedure private List disabledPeerIds; - private CompletableFuture future; + private CompletableFuture future; private ExecutorService executor; @@ -84,6 +83,14 @@ public String getGlobalId() { return getClass().getSimpleName(); } + private CompletableFuture getFuture() { + return future; + } + + private void setFuture(CompletableFuture f) { + future = f; + } + private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer) throws ProcedureSuspendedException { if (retryCounter == null) { @@ -153,6 +160,12 @@ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSu LOG.info("No pending peer procedures found, continue..."); } + private void finishMigartion() { + shutdownExecutorService(); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); + resetRetry(); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, MigrateReplicationQueueFromZkToTableState state) @@ -195,52 +208,23 @@ protected Flow executeFromState(MasterProcedureEnv env, setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: - if (future != null) { - // should have finished when we arrive here - assert future.isDone(); - try { - future.get(); - } catch (Exception e) { - future = null; - throw suspend(env.getMasterConfiguration(), - backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later", - backoff / 1000, e)); + try { + if ( + ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture, + this::finishMigartion) + ) { + return Flow.HAS_MORE_STATE; } - shutdownExecutorService(); - setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); - resetRetry(); - return Flow.HAS_MORE_STATE; + ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture, + env.getReplicationPeerManager() + .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()), + env, this::finishMigartion); + } catch (IOException e) { + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later", + backoff / 1000, e)); } - future = env.getReplicationPeerManager() - .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()); - FutureUtils.addListener(future, (r, e) -> { - // should acquire procedure execution lock to make sure that the procedure executor has - // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be - // race and cause unexpected result - IdLock procLock = - env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock(); - IdLock.Entry lockEntry; - try { - lockEntry = procLock.getLockEntry(getProcId()); - } catch (IOException ioe) { - LOG.error("Error while acquiring execution lock for procedure {}" - + " when trying to wake it up, aborting...", this, ioe); - env.getMasterServices().abort("Can not acquire procedure execution lock", e); - return; - } - try { - setTimeoutFailure(env); - } finally { - procLock.releaseLockEntry(lockEntry); - } - }); - // here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us - setTimeout(-1); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - // skip persistence is a must now since when restarting, if the procedure is in - // WAITING_TIMEOUT state and has -1 as timeout, it will block there forever... - skipPersistence(); - throw new ProcedureSuspendedException(); + return Flow.HAS_MORE_STATE; case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: long rsWithLowerVersion = env.getMasterServices().getServerManager().getOnlineServers().values().stream() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 988c519f781d..322b5bb7fc78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -797,7 +797,7 @@ private CompletableFuture runAsync(ExceptionalRunnable task, ExecutorService /** * Submit the migration tasks to the given {@code executor}. */ - CompletableFuture migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { + CompletableFuture migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) { // the replication queue table creation is asynchronous and will be triggered by addPeer, so // here we need to manually initialize it since we will not call addPeer. try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java new file mode 100644 index 000000000000..8ca4cba245da --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IdLock; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A helper class for switching procedure out(yielding) while it is doing some time consuming + * operation, such as updating meta, where we can get a {@link CompletableFuture} about the + * operation. + */ +@InterfaceAudience.Private +public final class ProcedureFutureUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ProcedureFutureUtil.class); + + private ProcedureFutureUtil() { + } + + public static boolean checkFuture(Procedure proc, Supplier> getFuture, + Consumer> setFuture, Runnable actionAfterDone) throws IOException { + CompletableFuture future = getFuture.get(); + if (future == null) { + return false; + } + // reset future + setFuture.accept(null); + FutureUtils.get(future); + actionAfterDone.run(); + return true; + } + + public static void suspendIfNecessary(Procedure proc, + Consumer> setFuture, CompletableFuture future, + MasterProcedureEnv env, Runnable actionAfterDone) + throws IOException, ProcedureSuspendedException { + MutableBoolean completed = new MutableBoolean(false); + Thread currentThread = Thread.currentThread(); + FutureUtils.addListener(future, (r, e) -> { + if (Thread.currentThread() == currentThread) { + LOG.debug("The future has completed while adding callback, give up suspending procedure {}", + proc); + // this means the future has already been completed, as we call the callback directly while + // calling addListener, so here we just set completed to true without doing anything + completed.setTrue(); + return; + } + LOG.debug("Going to wake up procedure {} because future has completed", proc); + // This callback may be called inside netty's event loop, so we should not block it for a long + // time. The worker executor will hold the execution lock while executing the procedure, and + // we may persist the procedure state inside the lock, which is a time consuming operation. + // And what makes things worse is that, we persist procedure state to master local region, + // where the AsyncFSWAL implementation will use the same netty's event loop for dealing with + // I/O, which could even cause dead lock. + env.getAsyncTaskExecutor().execute(() -> { + // should acquire procedure execution lock to make sure that the procedure executor has + // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be + // race and cause unexpected result + IdLock procLock = + env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock(); + IdLock.Entry lockEntry; + try { + lockEntry = procLock.getLockEntry(proc.getProcId()); + } catch (IOException ioe) { + LOG.error("Error while acquiring execution lock for procedure {}" + + " when trying to wake it up, aborting...", proc, ioe); + env.getMasterServices().abort("Can not acquire procedure execution lock", e); + return; + } + try { + env.getProcedureScheduler().addFront(proc); + } finally { + procLock.releaseLockEntry(lockEntry); + } + }); + }); + if (completed.getValue()) { + FutureUtils.get(future); + actionAfterDone.run(); + } else { + // suspend the procedure + setFuture.accept(future); + proc.skipPersistence(); + throw new ProcedureSuspendedException(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java index c601425e5f0a..6bc4c9d14e6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CoordinatedStateManager; @@ -305,7 +306,8 @@ public MockRegionStateStore(MasterServices master, MasterRegion masterRegion) { } @Override - public void updateRegionLocation(RegionStateNode regionNode) throws IOException { + public CompletableFuture updateRegionLocation(RegionStateNode regionNode) { + return CompletableFuture.completedFuture(null); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java index eb6f069474a9..69381b37e38c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManagerUtil.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -80,7 +79,7 @@ public void tearDownAfterTest() throws IOException { for (RegionInfo region : UTIL.getAdmin().getRegions(TABLE_NAME)) { RegionStateNode regionNode = AM.getRegionStates().getRegionStateNode(region); // confirm that we have released the lock - assertFalse(((ReentrantLock) regionNode.lock).isLocked()); + assertFalse(regionNode.isLocked()); TransitRegionStateProcedure proc = regionNode.getProcedure(); if (proc != null) { regionNode.unsetProcedure(proc); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java index 2757e0dd9f20..2f88f6087dd4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestOpenRegionProcedureBackoff.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -61,11 +62,11 @@ public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion } @Override - void persistToMeta(RegionStateNode regionNode) throws IOException { + CompletableFuture persistToMeta(RegionStateNode regionNode) { if (FAIL) { - throw new IOException("Inject Error!"); + return FutureUtils.failedFuture(new IOException("Inject Error!")); } - super.persistToMeta(regionNode); + return super.persistToMeta(regionNode); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java index 1d13912fb72c..05179c5eadb7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRaceBetweenSCPAndTRSP.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -76,8 +78,14 @@ public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion } @Override - void regionOpening(RegionStateNode regionNode) throws IOException { - super.regionOpening(regionNode); + CompletableFuture regionOpening(RegionStateNode regionNode) { + CompletableFuture future = super.regionOpening(regionNode); + try { + // wait until the operation done, then trigger later processing, to make the test more + // stable + FutureUtils.get(future); + } catch (IOException e) { + } if (regionNode.getRegionInfo().getTable().equals(NAME) && ARRIVE_REGION_OPENING != null) { ARRIVE_REGION_OPENING.countDown(); ARRIVE_REGION_OPENING = null; @@ -86,6 +94,7 @@ void regionOpening(RegionStateNode regionNode) throws IOException { } catch (InterruptedException e) { } } + return future; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java new file mode 100644 index 000000000000..c308b69c98cf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRegionStateNodeLock.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.assignment; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.AtomicUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestRegionStateNodeLock { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionStateNodeLock.class); + + private final RegionInfo regionInfo = + RegionInfoBuilder.newBuilder(TableName.valueOf("test")).build(); + + private RegionStateNodeLock lock; + + @Before + public void setUp() { + lock = new RegionStateNodeLock(regionInfo); + } + + @Test + public void testLockByThread() { + assertFalse(lock.isLocked()); + assertThrows(IllegalMonitorStateException.class, () -> lock.unlock()); + lock.lock(); + assertTrue(lock.isLocked()); + // reentrant + assertTrue(lock.tryLock()); + lock.unlock(); + assertTrue(lock.isLocked()); + lock.unlock(); + assertFalse(lock.isLocked()); + } + + @Test + public void testLockByProc() { + NoopProcedure proc = new NoopProcedure(); + assertFalse(lock.isLocked()); + assertThrows(IllegalMonitorStateException.class, () -> lock.unlock(proc)); + lock.lock(proc); + assertTrue(lock.isLocked()); + // reentrant + assertTrue(lock.tryLock(proc)); + lock.unlock(proc); + assertTrue(lock.isLocked()); + lock.unlock(proc); + assertFalse(lock.isLocked()); + } + + @Test + public void testLockProcThenThread() { + NoopProcedure proc = new NoopProcedure(); + assertFalse(lock.isLocked()); + lock.lock(proc); + assertFalse(lock.tryLock()); + assertThrows(IllegalMonitorStateException.class, () -> lock.unlock()); + long startNs = System.nanoTime(); + new Thread(() -> { + Threads.sleepWithoutInterrupt(2000); + lock.unlock(proc); + }).start(); + lock.lock(); + long costNs = System.nanoTime() - startNs; + assertThat(TimeUnit.NANOSECONDS.toMillis(costNs), greaterThanOrEqualTo(1800L)); + assertTrue(lock.isLocked()); + lock.unlock(); + assertFalse(lock.isLocked()); + } + + @Test + public void testLockMultiThread() throws InterruptedException { + int nThreads = 10; + AtomicLong concurrency = new AtomicLong(0); + AtomicLong maxConcurrency = new AtomicLong(0); + Thread[] threads = new Thread[nThreads]; + for (int i = 0; i < nThreads; i++) { + threads[i] = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + lock.lock(); + try { + long c = concurrency.incrementAndGet(); + AtomicUtils.updateMax(maxConcurrency, c); + concurrency.decrementAndGet(); + } finally { + lock.unlock(); + } + Threads.sleepWithoutInterrupt(1); + } + }); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + assertEquals(0, concurrency.get()); + assertEquals(1, maxConcurrency.get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java index 3d1a2c4caa94..cd73e09af6db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestRollbackSCP.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -85,7 +87,7 @@ public AssignmentManagerForTest(MasterServices master, MasterRegion masterRegion } @Override - void persistToMeta(RegionStateNode regionNode) throws IOException { + CompletableFuture persistToMeta(RegionStateNode regionNode) { TransitRegionStateProcedure proc = regionNode.getProcedure(); if (!regionNode.getRegionInfo().isMetaRegion() && proc.hasParent()) { Procedure p = @@ -96,10 +98,10 @@ void persistToMeta(RegionStateNode regionNode) throws IOException { ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback( getMaster().getMasterProcedureExecutor(), true); } - throw new RuntimeException("inject code bug"); + return FutureUtils.failedFuture(new RuntimeException("inject code bug")); } } - super.persistToMeta(regionNode); + return super.persistToMeta(regionNode); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java index 3319a761eb4c..d1e7dc147615 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -55,10 +56,16 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + /** * Test to ensure that the priority for procedures and stuck checker can partially solve the problem * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain * period of time. + *

+ * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test + * adding procedure worker now, but it could still be used to make sure that we could make progress + * when meta is gone and we have a lot of pending TRSPs. */ @Category({ MasterTests.class, LargeTests.class }) public class TestProcedurePriority { @@ -129,6 +136,7 @@ public static void setUp() throws Exception { } UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build()); UTIL.waitUntilNoRegionsInTransition(); + UTIL.getAdmin().balancerSwitch(false, true); } @AfterClass @@ -144,22 +152,26 @@ public void test() throws Exception { HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer()); FAIL = true; UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName()); - // wait until all the worker thread are stuck, which means that the stuck checker will start to - // add new worker thread. ProcedureExecutor executor = UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + // wait until we have way more TRSPs than the core pool size, and then make sure we can recover + // normally UTIL.waitFor(60000, new ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return executor.getWorkerThreadCount() > CORE_POOL_SIZE; + return executor.getProcedures().stream().filter(p -> !p.isFinished()) + .filter(p -> p.getState() != ProcedureState.INITIALIZING) + .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE; } @Override public String explainFailure() throws Exception { - return "Stuck checker does not add new worker thread"; + return "Not enough TRSPs scheduled"; } }); + // sleep more time to make sure the TRSPs have been executed + Thread.sleep(10000); UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName()); rsWithMetaThread.join(); FAIL = false;