Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that repeatable tasks are always cancellable #164

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2536,11 +2536,12 @@ public String toString() {

static final int ASF_ST_WAITING = 0;
static final int ASF_ST_CANCELLED = 1;
static final int ASF_ST_SUBMITTED = 2;
static final int ASF_ST_RUNNING = 3;
static final int ASF_ST_FINISHED = 4;
static final int ASF_ST_FAILED = 5;
static final int ASF_ST_REJECTED = 6;
static final int ASF_ST_CANCEL_PENDING = 2;
static final int ASF_ST_SUBMITTED = 3;
static final int ASF_ST_RUNNING = 4;
static final int ASF_ST_FINISHED = 5;
static final int ASF_ST_FAILED = 6;
static final int ASF_ST_REJECTED = 7;

static final AbstractScheduledFuture<?>[] NO_FUTURES = new AbstractScheduledFuture<?>[0];

Expand Down Expand Up @@ -2597,6 +2598,7 @@ public long getDelay(final TimeUnit unit) {
}

public boolean isCancelled() {
int state = this.state;
return state == ASF_ST_CANCELLED;
}

Expand All @@ -2617,11 +2619,16 @@ public boolean cancel(final boolean mayInterruptIfRunning) {
doCancel();
return true;
}
case ASF_ST_CANCEL_PENDING:
case ASF_ST_RUNNING: {
this.state = ASF_ST_CANCEL_PENDING;
if (mayInterruptIfRunning) {
liveThread.interrupt();
Thread liveThread = this.liveThread;
if (liveThread != null) {
liveThread.interrupt();
}
}
return false;
return true;
}
case ASF_ST_CANCELLED: {
return true;
Expand All @@ -2643,6 +2650,7 @@ public V get() throws InterruptedException, ExecutionException {
for (;;) {
state = this.state;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand All @@ -2659,6 +2667,7 @@ public V get() throws InterruptedException, ExecutionException {
throw new ExecutionException((Throwable) result);
}
case ASF_ST_FINISHED: {
// never happens for repeatable tasks
return (V) result;
}
}
Expand All @@ -2675,6 +2684,7 @@ public V get(final long timeout, final TimeUnit unit) throws InterruptedExceptio
for (;;) {
state = this.state;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand Down Expand Up @@ -2817,6 +2827,7 @@ void reject(RejectedExecutionException e) {
void fail(Throwable t) {
synchronized (this) {
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_WAITING:
case ASF_ST_SUBMITTED:
case ASF_ST_RUNNING: {
Expand All @@ -2840,11 +2851,13 @@ void fail(Throwable t) {
void finish(V result) {
// overridden in subclasses where the task repeats
synchronized (this) {
liveThread = null;
switch (state) {
case ASF_ST_CANCEL_PENDING:
case ASF_ST_RUNNING: {
// for non-repeating tasks, a pending cancel does not invalidate finishing the task
this.result = result;
this.state = ASF_ST_FINISHED;
liveThread = null;
notifyAll();
return;
}
Expand Down Expand Up @@ -2936,29 +2949,31 @@ abstract class RepeatingScheduledFuture<V> extends AbstractScheduledFuture<V> {
*/
abstract void adjustTime();

public void run() {
super.run();
// if an exception is thrown, we will have failed already anyway
adjustTime();
void finish(final V result) {
synchronized (this) {
liveThread = null;
switch (state) {
case ASF_ST_CANCEL_PENDING: {
this.state = ASF_ST_CANCELLED;
notifyAll();
return;
}
case ASF_ST_RUNNING: {
// repeating tasks never actually finish
adjustTime();
state = ASF_ST_WAITING;
schedulerTask.schedule(this);
return;
}
default: {
// in all other cases, we failed so the task should not be rescheduled
// invalid state
fail(badState());
return;
}
}
}
}

void finish(final V result) {
// repeating tasks never actually finish
}

StringBuilder toString(final StringBuilder b) {
return super.toString(b.append("repeating "));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.junit.jupiter.api.Assertions.*;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -39,21 +41,16 @@ public void testCancelWhileRunning() throws Exception {
ScheduledFuture<Boolean> future = eqe.schedule(() -> { latch.countDown(); Thread.sleep(1_000_000_000L); return Boolean.TRUE; }, 1, TimeUnit.NANOSECONDS);
assertTrue(latch.await(5, TimeUnit.SECONDS), "Timely task execution");
assertFalse(future.isCancelled());
// task is running; cancel will fail
assertFalse(future.cancel(false));
// task is running
assertTrue(future.cancel(false));
assertFalse(future.isCancelled());
assertFalse(future.isDone());
// now try to interrupt it (cancel still fails but the interrupt should be delivered)
assertFalse(future.cancel(true));
// now try to interrupt it
assertTrue(future.cancel(true));
assertFalse(future.isCancelled());
// now get it
try {
future.get(100L, TimeUnit.MILLISECONDS);
fail("Expected exception");
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
assertTrue(cause instanceof InterruptedException, "Expected " + cause + " to be an InterruptedException");
}
Throwable cause = assertThrows(ExecutionException.class, () -> future.get(100L, TimeUnit.MILLISECONDS)).getCause();
assertInstanceOf(InterruptedException.class, cause);
assertTrue(future.isDone());
eqe.shutdown();
assertTrue(eqe.awaitTermination(5, TimeUnit.SECONDS), "Timely shutdown");
Expand Down Expand Up @@ -146,6 +143,22 @@ public void testFixedDelayExecution() throws Exception {
}
}

@Test
public void testThatFixedDelayTerminatesTask() {
EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
var r = new Runnable() {
final ScheduledFuture<?> future = eqe.scheduleWithFixedDelay(this, 0, 100, TimeUnit.MILLISECONDS);
final ArrayList<LocalDateTime> times = new ArrayList<>();
public void run() {
times.add(LocalDateTime.now());
if (times.size() >= 5) {
future.cancel(false);
}
}
};
assertThrows(CancellationException.class, () -> r.future.get(5, TimeUnit.SECONDS));
}

@Test
public void testCancelOnShutdown() throws Exception {
EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
Expand Down