From 5bd001ae2f4f6f236d29ac661b7c1b8a31ddb220 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 7 Dec 2023 12:50:32 +0800 Subject: [PATCH] fix stop --- .../hbase/procedure2/ProcedureExecutor.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index a38c08f1911a..e01a27d74675 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -621,9 +621,8 @@ public Thread newThread(Runnable r) { .setThreadFactory(backingThreadFactory).build()); executor.allowCoreThreadTimeOut(true); this.asyncTaskExecutor = executor; - forceUpdateExecutor = - Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build()); + forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build()); store.registerListener(new ProcedureStoreListener() { @Override @@ -694,10 +693,10 @@ public void startWorkers() throws IOException { } public void stop() { - if (!running.getAndSet(false)) { - return; - } - + // it is possible that we fail in init, while loading procedures, so we will not set running to + // true but we should have already started the ProcedureScheduler, and also the two + // ExecutorServices, so here we do not check running state, just stop them + running.set(false); LOG.info("Stopping"); scheduler.stop(); timeoutExecutor.sendStopSignal(); @@ -719,13 +718,17 @@ public void join() { worker.awaitTermination(); } try { - forceUpdateExecutor.awaitTermination(30, TimeUnit.SECONDS); + if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("There are still pending tasks in forceUpdateExecutor"); + } } catch (InterruptedException e) { LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e); Thread.currentThread().interrupt(); } try { - asyncTaskExecutor.awaitTermination(30, TimeUnit.SECONDS); + if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("There are still pending tasks in asyncTaskExecutor"); + } } catch (InterruptedException e) { LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e); Thread.currentThread().interrupt(); @@ -736,8 +739,7 @@ public void join() { try { threadGroup.destroy(); } catch (IllegalThreadStateException e) { - LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, - e.getMessage()); + LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, e); // This dumps list of threads on STDOUT. this.threadGroup.list(); }