Skip to content

Commit

Permalink
Fixed a deadlock in the PoolBasedSequentialScheduledExecutorService (o…
Browse files Browse the repository at this point in the history
…penhab#4247)

Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
  • Loading branch information
joerg1985 authored Jun 10, 2024
1 parent 8e3ca9d commit 2aacdcd
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,9 +32,11 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;

Expand All @@ -50,10 +53,13 @@
* @author Jörg Sautter - Initial contribution
*/
@NonNullByDefault
class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {
final class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {

private static final WeakHashMap<ScheduledThreadPoolExecutor, @NonNull AtomicInteger> PENDING_BY_POOL = new WeakHashMap<>();

private final WorkQueueEntry empty;
private final ScheduledThreadPoolExecutor pool;
private final AtomicInteger pending;
private final List<RunnableFuture<?>> scheduled;
private final ScheduledFuture<?> cleaner;
private @Nullable WorkQueueEntry tail;
Expand All @@ -75,6 +81,20 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p

tail = empty;

// we need one pending counter per pool
synchronized (PENDING_BY_POOL) {
AtomicInteger fromCache = PENDING_BY_POOL.get(pool);

if (fromCache == null) {
// set to one does ensure at least one thread more than tasks running
fromCache = new AtomicInteger(1);

PENDING_BY_POOL.put(pool, fromCache);
}

pending = fromCache;
}

// clean up to ensure we do not keep references to old tasks
cleaner = this.scheduleWithFixedDelay(() -> {
synchronized (this) {
Expand All @@ -100,22 +120,24 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p
tail = empty;
}
}
}, 2, 4, TimeUnit.SECONDS);
},
// avoid cleaners of promptly created instances to run at the same time
(System.nanoTime() % 13), 8, TimeUnit.SECONDS);
}

@Override
public ScheduledFuture<?> schedule(@Nullable Runnable command, long delay, @Nullable TimeUnit unit) {
return schedule((origin) -> pool.schedule(() -> {
// we block the thread here, in worst case new threads are spawned
submitToWorkQueue(origin.join(), command).join();
// we might block the thread here, in worst case new threads are spawned
submitToWorkQueue(origin.join(), command, true).join();
}, delay, unit));
}

@Override
public <V> ScheduledFuture<V> schedule(@Nullable Callable<V> callable, long delay, @Nullable TimeUnit unit) {
return schedule((origin) -> pool.schedule(() -> {
// we block the thread here, in worst case new threads are spawned
return submitToWorkQueue(origin.join(), callable).join();
// we might block the thread here, in worst case new threads are spawned
return submitToWorkQueue(origin.join(), callable, true).join();
}, delay, unit));
}

Expand All @@ -126,13 +148,13 @@ public ScheduledFuture<?> scheduleAtFixedRate(@Nullable Runnable command, long i
CompletableFuture<?> submitted;

try {
// we block the thread here, in worst case new threads are spawned
submitted = submitToWorkQueue(origin.join(), command);
submitted = submitToWorkQueue(origin.join(), command, true);
} catch (RejectedExecutionException ex) {
// the pool has been shutdown, scheduled tasks should cancel
return;
}

// we might block the thread here, in worst case new threads are spawned
submitted.join();
}, initialDelay, period, unit));
}
Expand All @@ -144,13 +166,13 @@ public ScheduledFuture<?> scheduleWithFixedDelay(@Nullable Runnable command, lon
CompletableFuture<?> submitted;

try {
// we block the thread here, in worst case new threads are spawned
submitted = submitToWorkQueue(origin.join(), command);
submitted = submitToWorkQueue(origin.join(), command, true);
} catch (RejectedExecutionException ex) {
// the pool has been shutdown, scheduled tasks should cancel
return;
}

// we might block the thread here, in worst case new threads are spawned
submitted.join();
}, initialDelay, delay, unit));
}
Expand Down Expand Up @@ -255,20 +277,21 @@ public boolean awaitTermination(long timeout, @Nullable TimeUnit unit) throws In

@Override
public <T> Future<T> submit(@Nullable Callable<T> task) {
return submitToWorkQueue(null, task);
return submitToWorkQueue(null, task, false);
}

private CompletableFuture<?> submitToWorkQueue(RunnableFuture<?> origin, @Nullable Runnable task) {
private CompletableFuture<?> submitToWorkQueue(RunnableFuture<?> origin, @Nullable Runnable task, boolean inPool) {
Callable<?> callable = () -> {
task.run();

return null;
};

return submitToWorkQueue(origin, callable);
return submitToWorkQueue(origin, callable, inPool);
}

private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> origin, @Nullable Callable<T> task) {
private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> origin, @Nullable Callable<T> task,
boolean inPool) {
BiFunction<? super Object, Throwable, T> action = (result, error) -> {
// ignore result & error, they are from the previous task
try {
Expand All @@ -278,22 +301,45 @@ private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> o
} catch (Exception ex) {
// a small hack to throw the Exception unchecked
throw PoolBasedSequentialScheduledExecutorService.unchecked(ex);
} finally {
pending.decrementAndGet();
}
};

RunnableCompletableFuture<T> cf;
boolean runNow;

synchronized (this) {
if (tail == null) {
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
}

RunnableCompletableFuture<T> cf = tail.future.handleAsync(action, pool);
// set the core pool size even if it does not change, this triggers idle threads to stop
pool.setCorePoolSize(pending.incrementAndGet());

cf.setCallable(task);
// avoid waiting for one pool thread to finish inside a pool thread
runNow = inPool && tail.future.isDone();

tail = new WorkQueueEntry(tail, origin, cf);
if (runNow) {
cf = new RunnableCompletableFuture<>(task);
tail = new WorkQueueEntry(null, origin, cf);
} else {
cf = tail.future.handleAsync(action, pool);
cf.setCallable(task);
tail = new WorkQueueEntry(tail, origin, cf);
}
}

return cf;
if (runNow) {
// ensure we do not wait for one pool thread to finish inside another pool thread
try {
cf.run();
} finally {
pending.decrementAndGet();
}
}

return cf;
}

private static <E extends RuntimeException> E unchecked(Exception ex) throws E {
Expand All @@ -306,7 +352,7 @@ public <T> Future<T> submit(@Nullable Runnable task, T result) {
task.run();

return result;
});
}, false);
}

@Override
Expand Down Expand Up @@ -343,7 +389,7 @@ public <T> List<Future<T>> invokeAll(@Nullable Collection<? extends @Nullable Ca
List<Future<T>> futures = new ArrayList<>();

for (Callable<T> task : tasks) {
futures.add(submitToWorkQueue(null, task).orTimeout(timeout, unit));
futures.add(submitToWorkQueue(null, task, false).orTimeout(timeout, unit));
}

// wait for all futures to complete
Expand Down Expand Up @@ -381,7 +427,7 @@ private <T> T invokeAny(@Nullable Collection<? extends @Nullable Callable<T>> ta
List<CompletableFuture<T>> futures = new ArrayList<>();

for (Callable<T> task : tasks) {
futures.add(submitToWorkQueue(null, task));
futures.add(submitToWorkQueue(null, task, false));
}

// wait for any future to complete
Expand Down Expand Up @@ -452,7 +498,11 @@ static class RunnableCompletableFuture<V> extends CompletableFuture<V> implement
private @Nullable Callable<V> callable;

public RunnableCompletableFuture() {
callable = null;
this.callable = null;
}

public RunnableCompletableFuture(@Nullable Callable<V> callable) {
this.callable = callable;
}

public void setCallable(@Nullable Callable<V> callable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,38 @@ public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService()
assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
}

@Test
public void testSchedulingDoesSpawnNewThreads() throws InterruptedException {
ScheduledExecutorService serviceA = ThreadPoolManager
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-7");
ScheduledExecutorService serviceB = ThreadPoolManager
.getPoolBasedSequentialScheduledExecutorService("unit-test", "thread-8");

for (int j = 0; j < 3; j++) {
CountDownLatch block = new CountDownLatch(1);
CountDownLatch check = new CountDownLatch(20);

for (int i = 0; i < 20; i++) {
serviceA.schedule(() -> {
try {
block.await();
} catch (InterruptedException e) {

}
check.countDown();
}, 1, TimeUnit.MILLISECONDS);
}

Thread.sleep(80);

serviceB.schedule(() -> {
block.countDown();
}, 20, TimeUnit.MILLISECONDS);

assertTrue(check.await(80, TimeUnit.MILLISECONDS));
}
}

@Test
public void testGetScheduledPool() {
ThreadPoolExecutor result = ThreadPoolManager.getScheduledPoolUnwrapped("test1");
Expand Down

0 comments on commit 2aacdcd

Please sign in to comment.