Skip to content

Commit

Permalink
Fix flaky tests in AsyncTest (#1076)
Browse files Browse the repository at this point in the history
Ensure that the tests all pass consistently by canceling the futures in the tests
where we expect to timeout before the tasks complete. Without canceling them,
tests which call methods in Async that use the common ForkJoinPool can be
blocked by tasks that are still executing from previous tests! So, in the three tests
where waitFor/waitForAll/waitForAllIgnoringTypes time out before the tasks complete,
wrap the assertions in a try/finally and cancel the futures in the finally block. This
commit also removes the RetryingTest annotations on the four tests which had them
and replaces them with the standard JUnit Test annotation. Hopefully this will be
the last time we ever need to deal with flaky tests in Async.

Closes #1070
  • Loading branch information
sleberknight authored Nov 18, 2023
1 parent 6ea5385 commit 7b13e28
Showing 1 changed file with 102 additions and 78 deletions.
180 changes: 102 additions & 78 deletions src/test/java/org/kiwiproject/concurrent/AsyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import static org.assertj.core.api.Assertions.catchThrowable;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_HUNDRED_MILLISECONDS;
import static org.kiwiproject.base.KiwiStrings.f;

import lombok.extern.slf4j.Slf4j;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.RetryingTest;
import org.junit.jupiter.api.TestInfo;
import org.kiwiproject.base.DefaultEnvironment;
import org.kiwiproject.base.KiwiEnvironment;
import org.kiwiproject.concurrent.Async.Mode;
Expand All @@ -35,6 +36,15 @@ class AsyncTest {

private static final KiwiEnvironment ENV = new DefaultEnvironment();

private String testName;

@BeforeEach
void setUp(TestInfo info) {
testName = f("{}#{}",
info.getTestClass().orElseThrow().getSimpleName(),
info.getTestMethod().orElseThrow().getName());
}

@AfterEach
void tearDown() {
Async.setUnitTestAsyncMode(Mode.ENABLED);
Expand All @@ -58,7 +68,7 @@ class DoAsyncWithRunnable {

@Test
void shouldNotBlock_WhenAsyncModeIsEnabled() {
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Void> future = Async.runAsync(task::run);

// verify that immediately after triggering run, the count is still 0
Expand All @@ -74,7 +84,7 @@ void shouldNotBlock_WhenAsyncModeIsEnabled() {
@Test
void shouldNotThrowExceptionWhenCompletesExceptionally() {
var ex = new RuntimeException("oops");
var task = new ConcurrentTask().withException(ex);
var task = new ConcurrentTask(testName).withException(ex);
CompletableFuture<Void> future = Async.runAsync(task::run);

// verify that immediately after triggering run, the count is still 0
Expand All @@ -90,7 +100,7 @@ void shouldNotThrowExceptionWhenCompletesExceptionally() {
@Test
void shouldBlock_WhenAsyncModeIsDisabled() {
Async.setUnitTestAsyncMode(Mode.DISABLED);
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Void> future = Async.runAsync(task::run);

// verify that immediately after triggering run with mode=DISABLED, the count is 1
Expand Down Expand Up @@ -123,7 +133,7 @@ void tearDown() {

@Test
void shouldNotBlock_WhenAsyncModeIsEnabled() {
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Void> future = Async.runAsync(task::run, executor);

// verify that immediately after triggering run, the count is still 0
Expand All @@ -136,7 +146,7 @@ void shouldNotBlock_WhenAsyncModeIsEnabled() {
@Test
void shouldBlock_WhenAsyncModeIsDisabled() {
Async.setUnitTestAsyncMode(Mode.DISABLED);
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Void> future = Async.runAsync(task::run, executor);

// verify that immediately after triggering run with mode=DISABLED, the count is 1
Expand Down Expand Up @@ -192,7 +202,7 @@ class DoAsyncWithSupplier {

@Test
void shouldNotBlock_WhenAsyncModeIsEnabled() {
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Integer> future = Async.supplyAsync(task::supply);

// verify that immediately after triggering run, the count is still 0
Expand All @@ -206,7 +216,7 @@ void shouldNotBlock_WhenAsyncModeIsEnabled() {
@Test
void shouldBlock_WhenAsyncModeIsDisabled() {
Async.setUnitTestAsyncMode(Mode.DISABLED);
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Integer> future = Async.supplyAsync(task::supply);

// verify that immediately after triggering run with mode=DISABLED, the count is 1
Expand Down Expand Up @@ -237,7 +247,7 @@ void tearDown() {

@Test
void shouldNotBlock_WhenAsyncModeIsEnabled() {
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Integer> future = Async.supplyAsync(task::supply, executor);

// verify that immediately after triggering run, the count is still 0
Expand All @@ -251,7 +261,7 @@ void shouldNotBlock_WhenAsyncModeIsEnabled() {
@Test
void shouldBlock_WhenAsyncModeIsDisabled() {
Async.setUnitTestAsyncMode(Mode.DISABLED);
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Integer> future = Async.supplyAsync(task::supply, executor);

// verify that immediately after triggering run with mode=DISABLED, the count is 1
Expand All @@ -265,13 +275,9 @@ void shouldBlock_WhenAsyncModeIsDisabled() {
@Nested
class WaitFor {

/**
* @implNote This test has been failing intermittently running in GitHub actions, mainly on JDK 21 but
* sometimes on JDK 17. For now, making it a "retrying test". Also, see issue #1070.
*/
@RetryingTest(3)
@Test
void shouldSucceed_WhenTheFutureCompletes_BeforeTimeout() {
var task = new ConcurrentTask();
var task = new ConcurrentTask(testName);
CompletableFuture<Integer> future = Async.doAsync(task::supply);

Async.waitFor(future, 250, TimeUnit.MILLISECONDS);
Expand All @@ -281,22 +287,22 @@ void shouldSucceed_WhenTheFutureCompletes_BeforeTimeout() {
assertThat(future).isCompleted();
}

/*
* @implNote This is a "retrying" test with a higher task duration because we have seen this test
* fail (see issue #1065) when run individually, i.e. in an IDE.
*/
@RetryingTest(3)
@Test
void shouldThrowAsyncException_WhenTimesOut_BeforeTheFutureCompletes() {
var duration = Duration.ofMillis(100);
var task = new ConcurrentTask(duration);
var task = new ConcurrentTask(testName, duration);
CompletableFuture<Integer> future = Async.doAsync(task::supply);

assertThatThrownBy(() -> Async.waitFor(future, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task.getCurrentCount()).isZero();
try {
assertThatThrownBy(() -> Async.waitFor(future, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task.getCurrentCount()).isZero();
} finally {
cancel(future);
}
}
}

Expand All @@ -305,13 +311,13 @@ class WaitForAll {

@Test
void shouldSucceed_WhenAllTheFuturesComplete_BeforeTimeout() {
var task1 = new ConcurrentTask();
var task1 = new ConcurrentTask(testName + "#task1");
CompletableFuture<Integer> future1 = Async.doAsync(task1::supply);

var task2 = new ConcurrentTask();
var task2 = new ConcurrentTask(testName + "#task2");
CompletableFuture<Integer> future2 = Async.doAsync(task2::supply);

var task3 = new ConcurrentTask();
var task3 = new ConcurrentTask(testName + "#task3");
CompletableFuture<Integer> future3 = Async.doAsync(task3::supply);

var futures = List.of(future1, future2, future3);
Expand All @@ -326,47 +332,49 @@ void shouldSucceed_WhenAllTheFuturesComplete_BeforeTimeout() {
assertThat(future3).isCompleted();
}

/*
* @implNote This is a "retrying" test with a higher task duration because we have seen this test
* fail (see issue #1065) when run individually, i.e. in an IDE.
*/
@RetryingTest(3)
@Test
void shouldThrowAsyncException_WhenTimesOut_BeforeAllFuturesComplete() {
var duration = Duration.ofMillis(200);
var task1 = new ConcurrentTask(duration);
var task1 = new ConcurrentTask(testName + "#task1", duration);
CompletableFuture<Integer> future1 = Async.doAsync(task1::supply);

var task2 = new ConcurrentTask(duration);
var task2 = new ConcurrentTask(testName + "#task2", duration);
CompletableFuture<Integer> future2 = Async.doAsync(task2::supply);

var task3 = new ConcurrentTask(duration);
var task3 = new ConcurrentTask(testName + "#task3", duration);
CompletableFuture<Integer> future3 = Async.doAsync(task3::supply);

var futures = List.of(future1, future2, future3);
assertThatThrownBy(() -> Async.waitForAll(futures, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task1.getCurrentCount()).isZero();
assertThat(task2.getCurrentCount()).isZero();
assertThat(task3.getCurrentCount()).isZero();
try {
var futures = List.of(future1, future2, future3);
assertThatThrownBy(() -> Async.waitForAll(futures, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task1.getCurrentCount()).isZero();
assertThat(task2.getCurrentCount()).isZero();
assertThat(task3.getCurrentCount()).isZero();
} finally {
cancel(future1);
cancel(future2);
cancel(future3);
}
}
}

@Nested
class WaitForAllIgnoringType {

@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void shouldSucceed_WhenAllTheFuturesComplete_BeforeTimeout() {
var task1 = new ConcurrentTask();
var task1 = new ConcurrentTask(testName + "#task1");
CompletableFuture future1 = Async.doAsync(task1::supply);

var task2 = new ConcurrentTask();
var task2 = new ConcurrentTask(testName + "#task2");
CompletableFuture future2 = Async.doAsync(task2::supply);

var task3 = new ConcurrentTask();
var task3 = new ConcurrentTask(testName + "#task3");
CompletableFuture future3 = Async.doAsync(task3::supply);

var futures = List.of(future1, future2, future3);
Expand All @@ -381,32 +389,34 @@ void shouldSucceed_WhenAllTheFuturesComplete_BeforeTimeout() {
assertThat(future3).isCompleted();
}

/*
* @implNote This is a "retrying" test with a higher task duration because we have seen this test
* fail (see issue #1065) when run individually, i.e. in an IDE.
*/
@SuppressWarnings("rawtypes")
@RetryingTest(3)
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void shouldThrowAsyncException_WhenTimesOut_BeforeAllFuturesComplete() {
var duration = Duration.ofMillis(200);
var task1 = new ConcurrentTask(duration);
var task1 = new ConcurrentTask(testName + "#task1", duration);
CompletableFuture future1 = Async.doAsync(task1::supply);

var task2 = new ConcurrentTask(duration);
var task2 = new ConcurrentTask(testName + "#task2", duration);
CompletableFuture future2 = Async.doAsync(task2::supply);

var task3 = new ConcurrentTask(duration);
var task3 = new ConcurrentTask(testName + "#task3", duration);
CompletableFuture future3 = Async.doAsync(task3::supply);

var futures = List.of(future1, future2, future3);
assertThatThrownBy(() -> Async.waitForAllIgnoringType(futures, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task1.getCurrentCount()).isZero();
assertThat(task2.getCurrentCount()).isZero();
assertThat(task3.getCurrentCount()).isZero();
try {
var futures = List.of(future1, future2, future3);
assertThatThrownBy(() -> Async.waitForAllIgnoringType(futures, 1, TimeUnit.MILLISECONDS))
.isExactlyInstanceOf(AsyncException.class)
.hasMessage("TimeoutException occurred (maximum wait was specified as 1 MILLISECONDS)")
.hasCauseInstanceOf(TimeoutException.class);

assertThat(task1.getCurrentCount()).isZero();
assertThat(task2.getCurrentCount()).isZero();
assertThat(task3.getCurrentCount()).isZero();
} finally {
cancel(future1);
cancel(future2);
cancel(future3);
}
}
}

Expand All @@ -415,7 +425,7 @@ class WithMaxTimeout {

@Test
void shouldTimeout_WhenTaskTakesLongerThan_MaxTimeout() {
var task = new ConcurrentTask(Duration.ofSeconds(10));
var task = new ConcurrentTask(testName, Duration.ofSeconds(10));
CompletableFuture<Integer> future = Async.doAsync(task::supply);
CompletableFuture<Integer> futureWithTimeout = Async.withMaxTimeout(future, 5, TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -445,7 +455,19 @@ void shouldTimeout_WhenTaskTakesLongerThan_MaxTimeout() {
}
}

private void confirmCompletion(ConcurrentTask task) {
/**
* Cancel the given CompletableFuture. This should be called by tests that are testing timeout situations
* such as when testing {@link Async#waitFor(CompletableFuture, long, TimeUnit)} and the other similar
* methods that wrap a CompletableFuture and apply a timeout. If the test is verifying that waitFor methods
* work as expected when a CompletableFuture has not completed when the timeout period expires, then that
* test should call this method to ensure the outstanding tasks are canceled.
*/
private static <T> void cancel(CompletableFuture<T> future) {
var wasCancelled = future.cancel(true);
assertThat(wasCancelled).isTrue();
}

private static void confirmCompletion(ConcurrentTask task) {
awaitAtMost500msWith25MsPoll().until(() -> task.getCurrentCount() == 1);
}

Expand All @@ -462,16 +484,18 @@ private static ConditionFactory awaitAtMost500msWith25MsPoll() {
@Slf4j
static class ConcurrentTask {

private final String name;
private final AtomicInteger counter;
private final long durationMillis;

private RuntimeException exceptionToThrow;

ConcurrentTask() {
this(Duration.ofMillis(10));
ConcurrentTask(String name) {
this(name, Duration.ofMillis(10));
}

ConcurrentTask(Duration duration) {
ConcurrentTask(String name, Duration duration) {
this.name = name;
this.counter = new AtomicInteger();
this.durationMillis = duration.toMillis();
}
Expand All @@ -486,15 +510,15 @@ void run() {
}

Integer supply() {
LOG.debug("executing concurrent task with duration of: {}ms", durationMillis);
LOG.debug("executing concurrent task {} with duration of: {}ms", name, durationMillis);
try {
var startTime = System.nanoTime();
performWait();
long endTime = System.nanoTime();
long elapsed = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);

var completionStatus = isNull(exceptionToThrow) ? "successfully" : "exceptionally";
LOG.debug("performed task {} in: {}ms", completionStatus, elapsed);
LOG.debug("performed task {} {} in: {}ms", name, completionStatus, elapsed);

var updatedCount = counter.incrementAndGet();

Expand All @@ -505,7 +529,7 @@ Integer supply() {
return updatedCount;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.debug("Wait interrupted", e);
LOG.debug("Wait interrupted for task {}", name, e);
}

return counter.get();
Expand Down

0 comments on commit 7b13e28

Please sign in to comment.