Skip to content

Commit

Permalink
fix stop
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Dec 7, 2023
1 parent 933211b commit 5bd001a
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,8 @@ public Thread newThread(Runnable r) {
.setThreadFactory(backingThreadFactory).build());
executor.allowCoreThreadTimeOut(true);
this.asyncTaskExecutor = executor;
forceUpdateExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());
forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build());
store.registerListener(new ProcedureStoreListener() {

@Override
Expand Down Expand Up @@ -694,10 +693,10 @@ public void startWorkers() throws IOException {
}

public void stop() {
if (!running.getAndSet(false)) {
return;
}

// it is possible that we fail in init, while loading procedures, so we will not set running to
// true but we should have already started the ProcedureScheduler, and also the two
// ExecutorServices, so here we do not check running state, just stop them
running.set(false);
LOG.info("Stopping");
scheduler.stop();
timeoutExecutor.sendStopSignal();
Expand All @@ -719,13 +718,17 @@ public void join() {
worker.awaitTermination();
}
try {
forceUpdateExecutor.awaitTermination(30, TimeUnit.SECONDS);
if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("There are still pending tasks in forceUpdateExecutor");
}
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e);
Thread.currentThread().interrupt();
}
try {
asyncTaskExecutor.awaitTermination(30, TimeUnit.SECONDS);
if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("There are still pending tasks in asyncTaskExecutor");
}
} catch (InterruptedException e) {
LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e);
Thread.currentThread().interrupt();
Expand All @@ -736,8 +739,7 @@ public void join() {
try {
threadGroup.destroy();
} catch (IllegalThreadStateException e) {
LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup,
e.getMessage());
LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, e);
// This dumps list of threads on STDOUT.
this.threadGroup.list();
}
Expand Down

0 comments on commit 5bd001a

Please sign in to comment.