Skip to content

Commit

Permalink
Coroutine execution performed with dispatch in executeBlocking (#250)
Browse files Browse the repository at this point in the history
When a coroutine is launched in executeBlocking, execution must be performed with dispatch.

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont authored Oct 17, 2023
1 parent f8a8204 commit e94596a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private class ContextCoroutineDispatcher(val vertxContext: ContextInternal) : Co

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
val current = ContextInternal.current() ?: return true
return current != vertxContext && current.unwrap() != vertxContext
return (current != vertxContext && current.unwrap() != vertxContext) || !vertxContext.inThread()
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.vertx.core.http.HttpClientOptions
import io.vertx.core.http.HttpMethod
import io.vertx.core.http.HttpServerOptions
import io.vertx.core.http.RequestOptions
import io.vertx.core.impl.ContextInternal
import io.vertx.core.impl.VertxInternal
import io.vertx.ext.unit.TestContext
import io.vertx.ext.unit.junit.RunTestOnContext
Expand Down Expand Up @@ -92,15 +93,15 @@ class VertxCoroutineTest {
val server = vertx.createHttpServer(HttpServerOptions().setPort(8080))
server.requestHandler { req ->
GlobalScope.launch(vertx.dispatcher()) {
val res = awaitResult<String> { ai.methodWithParamsAndHandlerNoReturn("oranges", 23, it) }
val res = awaitResult { ai.methodWithParamsAndHandlerNoReturn("oranges", 23, it) }
assertEquals("oranges23", res)
req.response().end()
}
}
server.listen().onComplete { res ->
assertTrue(res.succeeded())
val client = vertx.createHttpClient(HttpClientOptions().setDefaultPort(8080))
client.request(HttpMethod.GET, "/somepath").onComplete() { ar1 ->
client.request(HttpMethod.GET, "/somepath").onComplete { ar1 ->
assertTrue(ar1.succeeded())
val req = ar1.result()
req.send().onComplete { ar2 ->
Expand All @@ -119,7 +120,7 @@ class VertxCoroutineTest {
val async = testContext.async()
val th = Thread.currentThread()
GlobalScope.launch(vertx.dispatcher()) {
val res = awaitResult<String> { ai.methodWithParamsAndHandlerNoReturn("oranges", 23, it) }
val res = awaitResult { ai.methodWithParamsAndHandlerNoReturn("oranges", 23, it) }
assertEquals("oranges23", res)
assertSame(Thread.currentThread(), th)
async.complete()
Expand All @@ -130,7 +131,7 @@ class VertxCoroutineTest {
fun `test synchronous execution of methodWithNoParamsAndHandlerNoReturn`(testContext: TestContext) {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
val res = awaitResult<String> { ai.methodWithNoParamsAndHandlerNoReturn(it) }
val res = awaitResult { ai.methodWithNoParamsAndHandlerNoReturn(it) }
assertEquals("wibble", res)
async.complete()
}
Expand All @@ -140,7 +141,7 @@ class VertxCoroutineTest {
fun `test synchronous execution of methodWithParamsAndHandlerWithReturn`(testContext: TestContext) {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
val res = awaitResult<String> { ai.methodWithParamsAndHandlerWithReturn("oranges", 23, it) }
val res = awaitResult { ai.methodWithParamsAndHandlerWithReturn("oranges", 23, it) }
assertEquals("oranges23", res)
async.complete()
}
Expand All @@ -150,7 +151,7 @@ class VertxCoroutineTest {
fun `test synchronous execution of methodWithNoParamsAndHandlerWithReturn`(testContext: TestContext) {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
val res = awaitResult<String> { ai.methodWithNoParamsAndHandlerWithReturn(it) }
val res = awaitResult { ai.methodWithNoParamsAndHandlerWithReturn(it) }
assertEquals("wibble", res)
async.complete()
}
Expand All @@ -161,7 +162,7 @@ class VertxCoroutineTest {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
val res = withTimeout(2000) {
awaitResult<String> { ai.methodWithNoParamsAndHandlerWithReturnTimeout(it, 1000) }
awaitResult { ai.methodWithNoParamsAndHandlerWithReturnTimeout(it, 1000) }
}
testContext.assertEquals("wibble", res)
async.complete()
Expand All @@ -174,7 +175,7 @@ class VertxCoroutineTest {
val async = testContext.async()
try {
withTimeout(500) {
awaitResult<String> { ai.methodWithNoParamsAndHandlerWithReturnTimeout(it, 1000) }
awaitResult { ai.methodWithNoParamsAndHandlerWithReturnTimeout(it, 1000) }
}
testContext.fail()
} catch (e: CancellationException) {
Expand All @@ -188,9 +189,9 @@ class VertxCoroutineTest {
fun `test synchronous execution of methodWithParamsAndHandlerInterface`(testContext: TestContext) {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
val returned = awaitResult<ReturnedInterface> { ai.methodWithParamsAndHandlerInterface("apples", 123, it) }
val returned = awaitResult { ai.methodWithParamsAndHandlerInterface("apples", 123, it) }
assertNotNull(returned)
val res = awaitResult<String> { returned.methodWithParamsAndHandlerNoReturn("bananas", 100, it) }
val res = awaitResult { returned.methodWithParamsAndHandlerNoReturn("bananas", 100, it) }
testContext.assertEquals(res, "bananas100")
async.complete()
}
Expand All @@ -201,7 +202,7 @@ class VertxCoroutineTest {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
try {
awaitResult<String> { ai.methodThatFails("oranges", it) }
awaitResult { ai.methodThatFails("oranges", it) }
testContext.fail("Should throw exception")
} catch (e: Exception) {
testContext.assertEquals("oranges", e.message)
Expand All @@ -215,7 +216,7 @@ class VertxCoroutineTest {
GlobalScope.launch(vertx.dispatcher()) {
val async = testContext.async()
try {
awaitResult<String> { ai.methodThatThrowsException("oranges", it) }
awaitResult { ai.methodThatThrowsException("oranges", it) }
testContext.fail("Should throw exception")
} catch (e: Exception) {
testContext.assertEquals("ouch", e.message)
Expand Down Expand Up @@ -429,18 +430,20 @@ class VertxCoroutineTest {
delay(500)
throw java.lang.RuntimeException("Boom")
}
future.onComplete(testContext.asyncAssertFailure() {
future.onComplete(testContext.asyncAssertFailure {
testContext.assertEquals(it.message, "Boom")
})
}

@Test
fun `test no StackOverflowError caused by two yield calls`(testContext: TestContext) {
val latch = testContext.async()
GlobalScope.launch(vertx.dispatcher()) {
repeat(1000) {
yield()
yield()
}
latch.complete()
}
}

Expand All @@ -461,4 +464,21 @@ class VertxCoroutineTest {
}
}
}

@Test
fun `test Coroutine execution performed with dispatch in executeBlocking`(testContext: TestContext) {
val latch = testContext.async(2)
GlobalScope.launch(Vertx.currentContext().dispatcher()) {
vertx.executeBlocking {
val captured = ContextInternal.current()
testContext.assertFalse(captured.isWorkerContext)
launch {
testContext.assertEquals(captured, ContextInternal.current())
testContext.assertTrue(captured.inThread())
latch.countDown()
}
}.await()
latch.countDown()
}
}
}

0 comments on commit e94596a

Please sign in to comment.