diff --git a/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java b/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java index 16c1644..15347e0 100644 --- a/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java +++ b/vertx-async-await-incubator/src/main/java/io/vertx/await/Async.java @@ -1,16 +1,16 @@ package io.vertx.await; import io.netty.channel.EventLoop; +import io.vertx.await.impl.DefaultScheduler; import io.vertx.await.impl.EventLoopScheduler; import io.vertx.await.impl.Scheduler; import io.vertx.await.impl.VirtualThreadContext; -import io.vertx.await.impl.DefaultScheduler; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.impl.ContextInternal; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.Lock; public class Async { @@ -58,7 +58,7 @@ public static void lock(Lock lock) { ctx.lock(lock); } - public static T await(CompletionStage future) { + public static T await(CompletableFuture future) { VirtualThreadContext ctx = virtualThreadContext(); return ctx.await(future); } diff --git a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java index 2f0894c..3408a1d 100644 --- a/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java +++ b/vertx-async-await-incubator/src/main/java/io/vertx/await/impl/VirtualThreadContext.java @@ -158,7 +158,14 @@ public void lock(Lock lock) { } } - public T await(CompletionStage fut) { + public T await(CompletableFuture fut) { + if (fut.state() == java.util.concurrent.Future.State.SUCCESS) { + return fut.resultNow(); + } + if (fut.state() == java.util.concurrent.Future.State.FAILED) { + throwAsUnchecked(fut.exceptionNow()); + return null; + } inThread.remove(); Consumer cont = scheduler.unschedule(); CompletableFuture latch = new CompletableFuture<>(); diff --git a/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java b/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java index 42293cc..8d2aec0 100644 --- a/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java +++ b/vertx-async-await-incubator/src/test/java/io/vertx/await/VirtualThreadContextTestBase.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public abstract class VirtualThreadContextTestBase extends VertxTestBase { @@ -109,6 +110,41 @@ public void testAwaitCompoundFuture() { await(); } + @Test + public void testImmediateCompletedFuture() { + var flag = new AtomicInteger(); + async.run(v -> { + var completed = Future.succeededFuture("HELLO"); + vertx.getOrCreateContext().runOnContext(v2 -> { + assertEquals(2, flag.incrementAndGet()); + testComplete(); + }); + Async.await(completed); + assertEquals(1, flag.incrementAndGet()); + }); + await(); + } + + @Test + public void testImmediateFailedFuture() { + var flag = new AtomicInteger(); + async.run(v -> { + var completed = Future.failedFuture("FAILED"); + vertx.getOrCreateContext().runOnContext(v2 -> { + assertEquals(2, flag.incrementAndGet()); + testComplete(); + }); + try { + Async.await(completed); + } catch (Throwable t) { + assertEquals(1, flag.incrementAndGet()); + return; + } + fail("shouldn't reach"); + }); + await(); + } + @Test public void testDuplicateUseSameThread() { int num = 1000; @@ -116,7 +152,7 @@ public void testDuplicateUseSameThread() { async.run(v -> { ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); Thread th = Thread.currentThread(); - for (int i = 0;i < num;i++) { + for (int i = 0; i < num; i++) { ContextInternal duplicate = context.duplicate(); duplicate.runOnContext(v2 -> { // assertSame(th, Thread.currentThread()); @@ -135,7 +171,7 @@ public void testDuplicateConcurrentAwait() { ContextInternal context = (ContextInternal) vertx.getOrCreateContext(); Object lock = new Object(); List> list = new ArrayList<>(); - for (int i = 0;i < num;i++) { + for (int i = 0; i < num; i++) { ContextInternal duplicate = context.duplicate(); duplicate.runOnContext(v2 -> { Promise promise = duplicate.promise();