Skip to content

Commit 75cb5d2

Browse files
authored
Merge pull request #37270 from mkouba/issue-37222
Vert.x: report exception for blocking message consumer methods
2 parents cd3a1d2 + 9917e6b commit 75cb5d2

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

extensions/vertx/deployment/src/test/java/io/quarkus/vertx/deployment/MessageConsumerFailureTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ public void testFailure() throws InterruptedException {
5858

5959
@Test
6060
public void testFailureNoReplyHandler() throws InterruptedException {
61+
verifyFailureNoReply("foo", "Foo is dead", IllegalStateException.class);
62+
verifyFailureNoReply("foo-blocking", "Red is dead", IllegalStateException.class);
63+
}
64+
65+
void verifyFailureNoReply(String address, String expectedMessage, Class<? extends Exception> expectedException)
66+
throws InterruptedException {
6167
Handler<Throwable> oldHandler = vertx.exceptionHandler();
6268
try {
6369
BlockingQueue<Object> synchronizer = new LinkedBlockingQueue<>();
@@ -71,10 +77,10 @@ public void handle(Throwable event) {
7177
}
7278
}
7379
});
74-
eventBus.send("foo", "bar");
80+
eventBus.send(address, "hello");
7581
Object ret = synchronizer.poll(2, TimeUnit.SECONDS);
76-
assertTrue(ret instanceof IllegalStateException);
77-
assertEquals("Foo is dead", ((IllegalStateException) ret).getMessage());
82+
assertTrue(expectedException.isAssignableFrom(ret.getClass()));
83+
assertEquals(expectedMessage, ((Throwable) ret).getMessage());
7884
} finally {
7985
vertx.exceptionHandler(oldHandler);
8086
}

extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.smallrye.common.vertx.VertxContext;
3838
import io.vertx.core.AsyncResult;
3939
import io.vertx.core.Context;
40+
import io.vertx.core.Future;
4041
import io.vertx.core.Handler;
4142
import io.vertx.core.Vertx;
4243
import io.vertx.core.eventbus.EventBus;
@@ -147,7 +148,7 @@ public void run() {
147148
}
148149
});
149150
} else {
150-
dup.executeBlocking(new Callable<Void>() {
151+
Future<Void> future = dup.executeBlocking(new Callable<Void>() {
151152
@Override
152153
public Void call() {
153154
try {
@@ -163,6 +164,7 @@ public Void call() {
163164
return null;
164165
}
165166
}, invoker.isOrdered());
167+
future.onFailure(context::reportException);
166168
}
167169
} else {
168170
// Will run on the context used for the consumer registration.

0 commit comments

Comments
 (0)