diff --git a/vertx-lang-kotlin-coroutines/src/main/asciidoc/index.adoc b/vertx-lang-kotlin-coroutines/src/main/asciidoc/index.adoc index d8822858..16d99f75 100644 --- a/vertx-lang-kotlin-coroutines/src/main/asciidoc/index.adoc +++ b/vertx-lang-kotlin-coroutines/src/main/asciidoc/index.adoc @@ -267,15 +267,24 @@ You can read more about it in https://github.com/Kotlin/kotlinx.coroutines/tree/ == Event bus -You can get a coroutine-aware `EventBus` by invoking `Vertx.coEventBus()`. - -The `io.vertx.kotlin.coroutines.CoroutineEventBus` delegates to the underlying `EventBus` for non-suspending cases, and implements suspending cases. +The Vert.x `EventBus` and `MessageConsumer` objects are extended with support for coroutines inside a `coroutineEventBus` scope function: [source,kotlin,indent=0] ---- include::Example.kt[tags=usingCoroutineEventBus] ---- +[TIP] +==== +The scope function is not necessary if the surrounding type implements `io.vertx.kotlin.coroutines.CoroutineEventBusSupport` +For example, with a coroutine verticle: + +[source,kotlin,indent=0] +---- +include::Example.kt[tags=VerticleWithCoroutineEventBusSupport] +---- +==== + == Vert.x Web The Vert.x Web `Router` and `Route` objects are extended with support for coroutines inside a `coroutineRouter` scope function: diff --git a/vertx-lang-kotlin-coroutines/src/main/java/examples/Example.kt b/vertx-lang-kotlin-coroutines/src/main/java/examples/Example.kt index 0abc9892..1bbc380d 100644 --- a/vertx-lang-kotlin-coroutines/src/main/java/examples/Example.kt +++ b/vertx-lang-kotlin-coroutines/src/main/java/examples/Example.kt @@ -380,14 +380,27 @@ class ExampleVerticle : CoroutineVerticle() { fun usingCoroutineEventBus() { // tag::usingCoroutineEventBus[] - val bus = vertx.coEventBus() - bus.consumer("some-address") { - computeSomethingWithSuspendingFunction() - it.reply("done") + val bus = vertx.eventBus() + coroutineEventBus { + bus.coConsumer("some-address") { + computeSomethingWithSuspendingFunction() + it.reply("done") + } } // end::usingCoroutineEventBus[] } + // tag::VerticleWithCoroutineEventBusSupport[] + class VerticleWithCoroutineEventBusSupport : CoroutineVerticle(), CoroutineEventBusSupport { + override suspend fun start() { + val bus = vertx.eventBus() + bus.coConsumer("some-address") { + // call suspending functions and do something + } + } + } + // end::VerticleWithCoroutineEventBusSupport[] + fun usingCoroutineRouter() { // tag::usingCoroutineRouter[] val router = Router.router(vertx) diff --git a/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBus.kt b/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBus.kt deleted file mode 100644 index 8ba9bf90..00000000 --- a/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBus.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2019 Red Hat, Inc. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.kotlin.coroutines - -import io.vertx.core.Vertx -import io.vertx.core.eventbus.EventBus -import io.vertx.core.eventbus.Message -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch - - -fun Vertx.coEventBus(): CoroutineEventBus { - return CoroutineEventBusImpl(this) -} - -/** - * Proxy around Vert.x [EventBus] that provides support for suspending functions - * - * @author [Alexey Soshin](https://github.com/alexeysoshin) - */ -abstract class CoroutineEventBus(protected val eventBus: EventBus) : EventBus by eventBus { - abstract fun consumer(address: String, function: suspend (Message) -> Unit): CoroutineMessageConsumer - abstract override fun consumer(address: String): CoroutineMessageConsumer -} - -class CoroutineEventBusImpl(vertx: Vertx) : CoroutineEventBus(vertx.eventBus()) { - - private val scope = CoroutineScope(vertx.dispatcher()) - - override fun consumer(address: String): CoroutineMessageConsumer { - return CoroutineMessageConsumerImpl(scope, eventBus.consumer(address)) - } - - override fun consumer(address: String, function: suspend (Message) -> Unit): CoroutineMessageConsumer { - return CoroutineMessageConsumerImpl(scope, eventBus.consumer(address) { - scope.launch { - function(it) - } - }) - } -} diff --git a/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBusSupport.kt b/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBusSupport.kt new file mode 100644 index 00000000..9235ca02 --- /dev/null +++ b/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineEventBusSupport.kt @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Red Hat, Inc. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.kotlin.coroutines + +import io.vertx.core.eventbus.EventBus +import io.vertx.core.eventbus.Message +import io.vertx.core.eventbus.MessageConsumer +import io.vertx.core.eventbus.ReplyFailure.RECIPIENT_FAILURE +import io.vertx.core.impl.ContextInternal +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.launch +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Calls the specified function [block] with a [CoroutineEventBusSupport] object as its receiver. + * + * The receiver's scope is the [CoroutineScope] of the caller. + */ +fun CoroutineScope.coroutineEventBus(block: CoroutineEventBusSupport.() -> Unit) { + val receiver = object : CoroutineEventBusSupport { + override val coroutineContext = this@coroutineEventBus.coroutineContext + } + with(receiver) { + block() + } +} + +/** + * Adds support for suspending functions to the Vert.x [EventBus]. + * + * Objects of this type implement [CoroutineScope] to define a scope for new coroutines. + * Typically, this is the scope of a [CoroutineVerticle]. + */ +interface CoroutineEventBusSupport : CoroutineScope { + + /** + * Similar to [EventBus.consumer] but using a suspending [handler]. + * + * The coroutine context is inherited from the [CoroutineScope]. + * Additional context elements can be specified with the [context] argument. + * + * @param context additional context elements, [EmptyCoroutineContext] by default + */ + fun EventBus.coConsumer( + address: String, + context: CoroutineContext = EmptyCoroutineContext, + handler: suspend (Message) -> Unit + ): MessageConsumer = consumer(address).coHandler(context, handler) + + /** + * Similar to [MessageConsumer.handler] but using a suspending [handler]. + * + * The coroutine context is inherited from the [CoroutineScope]. + * Additional context elements can be specified with the [context] argument. + * + * @param context additional context elements, [EmptyCoroutineContext] by default + */ + fun MessageConsumer.coHandler( + context: CoroutineContext = EmptyCoroutineContext, + handler: suspend (Message) -> Unit + ): MessageConsumer = handler { + launch((ContextInternal.current()?.dispatcher() ?: EmptyCoroutineContext) + context) { + try { + handler(it) + } catch (e: Exception) { + it.fail(RECIPIENT_FAILURE.toInt(), e.message) + } + } + } +} diff --git a/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineMessageConsumer.kt b/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineMessageConsumer.kt deleted file mode 100644 index 8456e0bb..00000000 --- a/vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineMessageConsumer.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2019 Red Hat, Inc. - * - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.kotlin.coroutines - -import io.vertx.core.eventbus.Message -import io.vertx.core.eventbus.MessageConsumer -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch - -/** - * Proxy around Vert.x [MessageConsumer] that provides support for suspending functions - * - * @author [Alexey Soshin](https://github.com/alexeysoshin) - */ -abstract class CoroutineMessageConsumer(consumer: MessageConsumer) : MessageConsumer by consumer { - abstract fun handler(function: suspend (Message) -> Unit): CoroutineMessageConsumer -} - -class CoroutineMessageConsumerImpl( - private val scope: CoroutineScope, - private val consumer: MessageConsumer -) : CoroutineMessageConsumer(consumer) { - override fun handler(function: suspend (Message) -> Unit) = apply { - consumer.handler { message -> - scope.launch { - function(message) - } - } - } -} diff --git a/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/EventBusTest.kt b/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/EventBusTest.kt index 0d33205e..b5d50a60 100644 --- a/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/EventBusTest.kt +++ b/vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/EventBusTest.kt @@ -19,12 +19,10 @@ import io.vertx.core.Vertx import io.vertx.core.eventbus.Message import io.vertx.core.eventbus.ReplyException import io.vertx.core.eventbus.ReplyFailure +import io.vertx.core.impl.ContextInternal import io.vertx.ext.unit.TestContext import io.vertx.ext.unit.junit.VertxUnitRunner -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import org.junit.After import org.junit.Before import org.junit.Test @@ -54,31 +52,35 @@ class EventBusTest { @Test fun `test EventBus consumer with handler supports suspending functions`(testContext: TestContext) { val async = testContext.async() - val bus = vertx.coEventBus() - bus.consumer("some-address") { - // Making sure that we have some kind of suspending function here - delay(10) - async.complete() + val bus = vertx.eventBus() + GlobalScope.coroutineEventBus { + bus.coConsumer("some-address") { + // Making sure that we have some kind of suspending function here + withContext(Dispatchers.IO) { + Thread.sleep(100) + } + testContext.assertTrue(ContextInternal.current().isDuplicate) + async.complete() + } } - bus.send("some-address", "some message") - - async.awaitSuccess(1000) } @Test fun `test EventBus consumer supports suspending functions`(testContext: TestContext) { - val async = testContext.async(1) - val bus: CoroutineEventBus = vertx.coEventBus() - bus.consumer("some-address").handler { - // Making sure that we have some kind of suspending function here - delay(10) - async.countDown() + val async = testContext.async() + val bus = vertx.eventBus() + GlobalScope.coroutineEventBus { + bus.consumer("some-address").coHandler { + // Making sure that we have some kind of suspending function here + withContext(Dispatchers.IO) { + Thread.sleep(100) + } + testContext.assertTrue(ContextInternal.current().isDuplicate) + async.complete() + } } - bus.send("some-address", "some message") - - async.awaitSuccess(1000) } @Test