From 2aacdcd4e8a6f2dcc5dca83a28267cd0e44f6cd6 Mon Sep 17 00:00:00 2001 From: joerg1985 <16140691+joerg1985@users.noreply.github.com> Date: Mon, 10 Jun 2024 20:39:11 +0200 Subject: [PATCH] Fixed a deadlock in the PoolBasedSequentialScheduledExecutorService (#4247) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörg Sautter --- ...sedSequentialScheduledExecutorService.java | 94 ++++++++++++++----- .../core/common/ThreadPoolManagerTest.java | 32 +++++++ 2 files changed, 104 insertions(+), 22 deletions(-) diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java index 3b49427cf62..4e6f6574d94 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/PoolBasedSequentialScheduledExecutorService.java @@ -18,6 +18,7 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Set; +import java.util.WeakHashMap; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -31,9 +32,11 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -50,10 +53,13 @@ * @author Jörg Sautter - Initial contribution */ @NonNullByDefault -class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService { +final class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService { + + private static final WeakHashMap PENDING_BY_POOL = new WeakHashMap<>(); private final WorkQueueEntry empty; private final ScheduledThreadPoolExecutor pool; + private final AtomicInteger pending; private final List> scheduled; private final ScheduledFuture cleaner; private @Nullable WorkQueueEntry tail; @@ -75,6 +81,20 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p tail = empty; + // we need one pending counter per pool + synchronized (PENDING_BY_POOL) { + AtomicInteger fromCache = PENDING_BY_POOL.get(pool); + + if (fromCache == null) { + // set to one does ensure at least one thread more than tasks running + fromCache = new AtomicInteger(1); + + PENDING_BY_POOL.put(pool, fromCache); + } + + pending = fromCache; + } + // clean up to ensure we do not keep references to old tasks cleaner = this.scheduleWithFixedDelay(() -> { synchronized (this) { @@ -100,22 +120,24 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p tail = empty; } } - }, 2, 4, TimeUnit.SECONDS); + }, + // avoid cleaners of promptly created instances to run at the same time + (System.nanoTime() % 13), 8, TimeUnit.SECONDS); } @Override public ScheduledFuture schedule(@Nullable Runnable command, long delay, @Nullable TimeUnit unit) { return schedule((origin) -> pool.schedule(() -> { - // we block the thread here, in worst case new threads are spawned - submitToWorkQueue(origin.join(), command).join(); + // we might block the thread here, in worst case new threads are spawned + submitToWorkQueue(origin.join(), command, true).join(); }, delay, unit)); } @Override public ScheduledFuture schedule(@Nullable Callable callable, long delay, @Nullable TimeUnit unit) { return schedule((origin) -> pool.schedule(() -> { - // we block the thread here, in worst case new threads are spawned - return submitToWorkQueue(origin.join(), callable).join(); + // we might block the thread here, in worst case new threads are spawned + return submitToWorkQueue(origin.join(), callable, true).join(); }, delay, unit)); } @@ -126,13 +148,13 @@ public ScheduledFuture scheduleAtFixedRate(@Nullable Runnable command, long i CompletableFuture submitted; try { - // we block the thread here, in worst case new threads are spawned - submitted = submitToWorkQueue(origin.join(), command); + submitted = submitToWorkQueue(origin.join(), command, true); } catch (RejectedExecutionException ex) { // the pool has been shutdown, scheduled tasks should cancel return; } + // we might block the thread here, in worst case new threads are spawned submitted.join(); }, initialDelay, period, unit)); } @@ -144,13 +166,13 @@ public ScheduledFuture scheduleWithFixedDelay(@Nullable Runnable command, lon CompletableFuture submitted; try { - // we block the thread here, in worst case new threads are spawned - submitted = submitToWorkQueue(origin.join(), command); + submitted = submitToWorkQueue(origin.join(), command, true); } catch (RejectedExecutionException ex) { // the pool has been shutdown, scheduled tasks should cancel return; } + // we might block the thread here, in worst case new threads are spawned submitted.join(); }, initialDelay, delay, unit)); } @@ -255,20 +277,21 @@ public boolean awaitTermination(long timeout, @Nullable TimeUnit unit) throws In @Override public Future submit(@Nullable Callable task) { - return submitToWorkQueue(null, task); + return submitToWorkQueue(null, task, false); } - private CompletableFuture submitToWorkQueue(RunnableFuture origin, @Nullable Runnable task) { + private CompletableFuture submitToWorkQueue(RunnableFuture origin, @Nullable Runnable task, boolean inPool) { Callable callable = () -> { task.run(); return null; }; - return submitToWorkQueue(origin, callable); + return submitToWorkQueue(origin, callable, inPool); } - private CompletableFuture submitToWorkQueue(@Nullable RunnableFuture origin, @Nullable Callable task) { + private CompletableFuture submitToWorkQueue(@Nullable RunnableFuture origin, @Nullable Callable task, + boolean inPool) { BiFunction action = (result, error) -> { // ignore result & error, they are from the previous task try { @@ -278,22 +301,45 @@ private CompletableFuture submitToWorkQueue(@Nullable RunnableFuture o } catch (Exception ex) { // a small hack to throw the Exception unchecked throw PoolBasedSequentialScheduledExecutorService.unchecked(ex); + } finally { + pending.decrementAndGet(); } }; + RunnableCompletableFuture cf; + boolean runNow; + synchronized (this) { if (tail == null) { throw new RejectedExecutionException("this scheduled executor has been shutdown before"); } - RunnableCompletableFuture cf = tail.future.handleAsync(action, pool); + // set the core pool size even if it does not change, this triggers idle threads to stop + pool.setCorePoolSize(pending.incrementAndGet()); - cf.setCallable(task); + // avoid waiting for one pool thread to finish inside a pool thread + runNow = inPool && tail.future.isDone(); - tail = new WorkQueueEntry(tail, origin, cf); + if (runNow) { + cf = new RunnableCompletableFuture<>(task); + tail = new WorkQueueEntry(null, origin, cf); + } else { + cf = tail.future.handleAsync(action, pool); + cf.setCallable(task); + tail = new WorkQueueEntry(tail, origin, cf); + } + } - return cf; + if (runNow) { + // ensure we do not wait for one pool thread to finish inside another pool thread + try { + cf.run(); + } finally { + pending.decrementAndGet(); + } } + + return cf; } private static E unchecked(Exception ex) throws E { @@ -306,7 +352,7 @@ public Future submit(@Nullable Runnable task, T result) { task.run(); return result; - }); + }, false); } @Override @@ -343,7 +389,7 @@ public List> invokeAll(@Nullable Collection> futures = new ArrayList<>(); for (Callable task : tasks) { - futures.add(submitToWorkQueue(null, task).orTimeout(timeout, unit)); + futures.add(submitToWorkQueue(null, task, false).orTimeout(timeout, unit)); } // wait for all futures to complete @@ -381,7 +427,7 @@ private T invokeAny(@Nullable Collection> ta List> futures = new ArrayList<>(); for (Callable task : tasks) { - futures.add(submitToWorkQueue(null, task)); + futures.add(submitToWorkQueue(null, task, false)); } // wait for any future to complete @@ -452,7 +498,11 @@ static class RunnableCompletableFuture extends CompletableFuture implement private @Nullable Callable callable; public RunnableCompletableFuture() { - callable = null; + this.callable = null; + } + + public RunnableCompletableFuture(@Nullable Callable callable) { + this.callable = callable; } public void setCallable(@Nullable Callable callable) { diff --git a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java index d3b90550db6..e1c31abd59c 100644 --- a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java +++ b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java @@ -174,6 +174,38 @@ public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService() assertFalse(latch.await(100, TimeUnit.MILLISECONDS)); } + @Test + public void testSchedulingDoesSpawnNewThreads() throws InterruptedException { + ScheduledExecutorService serviceA = ThreadPoolManager + .getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-7"); + ScheduledExecutorService serviceB = ThreadPoolManager + .getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-8"); + + for (int j = 0; j < 3; j++) { + CountDownLatch block = new CountDownLatch(1); + CountDownLatch check = new CountDownLatch(20); + + for (int i = 0; i < 20; i++) { + serviceA.schedule(() -> { + try { + block.await(); + } catch (InterruptedException e) { + + } + check.countDown(); + }, 1, TimeUnit.MILLISECONDS); + } + + Thread.sleep(80); + + serviceB.schedule(() -> { + block.countDown(); + }, 20, TimeUnit.MILLISECONDS); + + assertTrue(check.await(80, TimeUnit.MILLISECONDS)); + } + } + @Test public void testGetScheduledPool() { ThreadPoolExecutor result = ThreadPoolManager.getScheduledPoolUnwrapped("test1");