diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 5aa11811122b..e01a27d74675 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -605,15 +606,23 @@ public void init(int numThreads, boolean abortOnCorruption) throws IOException { this.threadGroup = new ThreadGroup("PEWorkerGroup"); this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); + ThreadFactory backingThreadFactory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(threadGroup, r); + } + }; int size = Math.max(2, Runtime.getRuntime().availableProcessors()); - ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue(), new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d").build()); + ThreadPoolExecutor executor = + new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d") + .setThreadFactory(backingThreadFactory).build()); executor.allowCoreThreadTimeOut(true); this.asyncTaskExecutor = executor; - forceUpdateExecutor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); + forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build()); store.registerListener(new ProcedureStoreListener() { @Override @@ -684,10 +693,10 @@ public void startWorkers() throws IOException { } public void stop() { - if (!running.getAndSet(false)) { - return; - } - + // it is possible that we fail in init, while loading procedures, so we will not set running to + // true but we should have already started the ProcedureScheduler, and also the two + // ExecutorServices, so here we do not check running state, just stop them + running.set(false); LOG.info("Stopping"); scheduler.stop(); timeoutExecutor.sendStopSignal(); @@ -708,14 +717,29 @@ public void join() { for (WorkerThread worker : workerThreads) { worker.awaitTermination(); } + try { + if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("There are still pending tasks in forceUpdateExecutor"); + } + } catch (InterruptedException e) { + LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e); + Thread.currentThread().interrupt(); + } + try { + if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("There are still pending tasks in asyncTaskExecutor"); + } + } catch (InterruptedException e) { + LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e); + Thread.currentThread().interrupt(); + } // Destroy the Thread Group for the executors // TODO: Fix. #join is not place to destroy resources. try { threadGroup.destroy(); } catch (IllegalThreadStateException e) { - LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, - e.getMessage()); + LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, e); // This dumps list of threads on STDOUT. this.threadGroup.list(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java index 8ca4cba245da..997063c3097d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFutureUtil.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.commons.lang3.mutable.MutableBoolean; @@ -61,6 +62,14 @@ public static void suspendIfNecessary(Procedure proc, throws IOException, ProcedureSuspendedException { MutableBoolean completed = new MutableBoolean(false); Thread currentThread = Thread.currentThread(); + // This is for testing. In ProcedureTestingUtility, we will restart a ProcedureExecutor and + // reuse it, for performance, so we need to make sure that all the procedure have been stopped. + // But here, the callback of this future is not executed in a PEWorker, so in ProcedureExecutor + // we have no way to stop it. So here, we will get the asyncTaskExecutor first, in the PEWorker + // thread, where the ProcedureExecutor should have not been stopped yet, then when calling the + // callback, if the ProcedureExecutor have already been stopped and restarted, the + // asyncTaskExecutor will also be shutdown so we can not add anything back to the scheduler. + ExecutorService asyncTaskExecutor = env.getAsyncTaskExecutor(); FutureUtils.addListener(future, (r, e) -> { if (Thread.currentThread() == currentThread) { LOG.debug("The future has completed while adding callback, give up suspending procedure {}", @@ -77,7 +86,7 @@ public static void suspendIfNecessary(Procedure proc, // And what makes things worse is that, we persist procedure state to master local region, // where the AsyncFSWAL implementation will use the same netty's event loop for dealing with // I/O, which could even cause dead lock. - env.getAsyncTaskExecutor().execute(() -> { + asyncTaskExecutor.execute(() -> { // should acquire procedure execution lock to make sure that the procedure executor has // finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be // race and cause unexpected result @@ -89,7 +98,7 @@ public static void suspendIfNecessary(Procedure proc, } catch (IOException ioe) { LOG.error("Error while acquiring execution lock for procedure {}" + " when trying to wake it up, aborting...", proc, ioe); - env.getMasterServices().abort("Can not acquire procedure execution lock", e); + env.getMasterServices().abort("Can not acquire procedure execution lock", ioe); return; } try {