diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java index 17bc078cd88..b3b04d5b743 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java @@ -60,6 +60,22 @@ static CompletableFuture invokeStatic(Supplier supplier) { return create().invoke(supplier); } + /** + * Convenience method to avoid having to call {@link #create()}. Also accepts + * an {@code onStart} future to inform of async task startup. + * + * @param supplier supplier of value (or a method reference) + * @param onStart future completed when async task starts + * @param type of returned value + * @return a future that is a "promise" of the future result + */ + static CompletableFuture invokeStatic(Supplier supplier, CompletableFuture onStart) { + return new Builder() + .onStart(onStart) + .build() + .invoke(supplier); + } + /** * A new builder to build a customized {@link Async} instance. * @return a new builder @@ -73,6 +89,7 @@ static Builder builder() { */ class Builder implements io.helidon.common.Builder { private LazyValue executor = FaultTolerance.executor(); + private CompletableFuture onStart; private Builder() { } @@ -104,8 +121,23 @@ public Builder executor(ExecutorService executor) { return this; } + /** + * Configure future that shall be completed when async task starts. + * + * @param onStart future completed when async task starts + * @return updated builder instance + */ + public Builder onStart(CompletableFuture onStart) { + this.onStart = onStart; + return this; + } + LazyValue executor() { return executor; } + + CompletableFuture onStart() { + return onStart; + } } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java index 24db300d475..ffed5e097ee 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java @@ -32,6 +32,7 @@ */ class AsyncImpl implements Async { private final LazyValue executor; + private final CompletableFuture onStart; AsyncImpl() { this(Async.builder()); @@ -39,6 +40,7 @@ class AsyncImpl implements Async { AsyncImpl(Builder builder) { this.executor = builder.executor(); + this.onStart = builder.onStart(); } @Override @@ -54,6 +56,9 @@ public boolean cancel(boolean mayInterruptIfRunning) { Future future = executor.get().submit(() -> { Thread thread = Thread.currentThread(); thread.setName(thread.getName() + ": async"); + if (onStart != null) { + onStart.complete(this); + } try { T t = supplier.get(); result.complete(t); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index d2aac3bd65a..d28560a916d 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -25,8 +25,6 @@ import io.helidon.logging.common.LogConfig; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static java.lang.System.Logger.Level.INFO; @@ -42,24 +40,18 @@ class BulkheadTest { private static final System.Logger LOGGER = System.getLogger(BulkheadTest.class.getName()); - private static final long WAIT_TIMEOUT_MILLIS = 10000; - - private CountDownLatch enqueuedSubmitted; + private static final long WAIT_TIMEOUT_MILLIS = 5000; @BeforeAll static void setupTest() { LogConfig.configureRuntime(); } - @BeforeEach - void resetLatch() { - enqueuedSubmitted = new CountDownLatch(1); - } - - @Disabled + @Test void testBulkhead() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException { // Create bulkhead of 1 with queue length 1 String name = "unit:testBulkhead"; + CountDownLatch enqueuedSubmitted = new CountDownLatch(1); Bulkhead bulkhead = Bulkhead.builder() .limit(1) .queueLength(1) @@ -87,15 +79,19 @@ public void enqueueing(Supplier supplier) { CompletableFuture enqueuedResult = Async.invokeStatic( () -> bulkhead.invoke(enqueued::run)); - // Wait until previous task is "likely" queued + // Wait until previous task is queued if (!enqueuedSubmitted.await(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { fail("Task enqueued never submitted"); } + assertEventually(() -> bulkhead.stats().waitingQueueSize() == 1, WAIT_TIMEOUT_MILLIS); // Submit new task that should be rejected Task rejected = new Task(2); + CompletableFuture asyncRejected = new CompletableFuture<>(); CompletableFuture rejectedResult = Async.invokeStatic( - () -> bulkhead.invoke(rejected::run)); + () -> bulkhead.invoke(rejected::run), + asyncRejected); + asyncRejected.get(); // waits for async to start assertThat(inProgress.isStarted(), is(true)); assertThat(inProgress.isBlocked(), is(true)); @@ -258,4 +254,15 @@ CompletableFuture future() { return future; } } + + private static void assertEventually(Supplier predicate, long millis) throws InterruptedException { + long start = System.currentTimeMillis(); + do { + if (predicate.get()) { + return; + } + Thread.sleep(100); + } while (System.currentTimeMillis() - start <= millis); + fail("Predicate failed after " + millis + " milliseconds"); + } }