Skip to content

Commit

Permalink
Refactor coroutine eventbus
Browse files Browse the repository at this point in the history
Follows implementation of coroutine router in vert-x3#253

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont committed Oct 24, 2023
1 parent 6ffe3b8 commit 4297619
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 125 deletions.
15 changes: 12 additions & 3 deletions vertx-lang-kotlin-coroutines/src/main/asciidoc/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,24 @@ You can read more about it in https://github.com/Kotlin/kotlinx.coroutines/tree/

== Event bus

You can get a coroutine-aware `EventBus` by invoking `Vertx.coEventBus()`.

The `io.vertx.kotlin.coroutines.CoroutineEventBus` delegates to the underlying `EventBus` for non-suspending cases, and implements suspending cases.
The Vert.x `EventBus` and `MessageConsumer` objects are extended with support for coroutines inside a `coroutineEventBus` scope function:

[source,kotlin,indent=0]
----
include::Example.kt[tags=usingCoroutineEventBus]
----

[TIP]
====
The scope function is not necessary if the surrounding type implements `io.vertx.kotlin.coroutines.CoroutineEventBusSupport`
For example, with a coroutine verticle:
[source,kotlin,indent=0]
----
include::Example.kt[tags=VerticleWithCoroutineEventBusSupport]
----
====

== Vert.x Web

The Vert.x Web `Router` and `Route` objects are extended with support for coroutines inside a `coroutineRouter` scope function:
Expand Down
21 changes: 17 additions & 4 deletions vertx-lang-kotlin-coroutines/src/main/java/examples/Example.kt
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,27 @@ class ExampleVerticle : CoroutineVerticle() {

fun usingCoroutineEventBus() {
// tag::usingCoroutineEventBus[]
val bus = vertx.coEventBus()
bus.consumer<String>("some-address") {
computeSomethingWithSuspendingFunction()
it.reply("done")
val bus = vertx.eventBus()
coroutineEventBus {
bus.coConsumer<String>("some-address") {
computeSomethingWithSuspendingFunction()
it.reply("done")
}
}
// end::usingCoroutineEventBus[]
}

// tag::VerticleWithCoroutineEventBusSupport[]
class VerticleWithCoroutineEventBusSupport : CoroutineVerticle(), CoroutineEventBusSupport {
override suspend fun start() {
val bus = vertx.eventBus()
bus.coConsumer<String>("some-address") {
// call suspending functions and do something
}
}
}
// end::VerticleWithCoroutineEventBusSupport[]

fun usingCoroutineRouter() {
// tag::usingCoroutineRouter[]
val router = Router.router(vertx)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.vertx.kotlin.coroutines

import io.vertx.core.eventbus.EventBus
import io.vertx.core.eventbus.Message
import io.vertx.core.eventbus.MessageConsumer
import io.vertx.core.eventbus.ReplyFailure.RECIPIENT_FAILURE
import io.vertx.core.impl.ContextInternal
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Calls the specified function [block] with a [CoroutineEventBusSupport] object as its receiver.
*
* The receiver's scope is the [CoroutineScope] of the caller.
*/
fun CoroutineScope.coroutineEventBus(block: CoroutineEventBusSupport.() -> Unit) {
val receiver = object : CoroutineEventBusSupport {
override val coroutineContext = this@coroutineEventBus.coroutineContext
}
with(receiver) {
block()
}
}

/**
* Adds support for suspending functions to the Vert.x [EventBus].
*
* Objects of this type implement [CoroutineScope] to define a scope for new coroutines.
* Typically, this is the scope of a [CoroutineVerticle].
*/
interface CoroutineEventBusSupport : CoroutineScope {

/**
* Similar to [EventBus.consumer] but using a suspending [handler].
*
* The coroutine context is inherited from the [CoroutineScope].
* Additional context elements can be specified with the [context] argument.
*
* @param context additional context elements, [EmptyCoroutineContext] by default
*/
fun <T> EventBus.coConsumer(
address: String,
context: CoroutineContext = EmptyCoroutineContext,
handler: suspend (Message<T>) -> Unit
): MessageConsumer<T> = consumer<T>(address).coHandler(context, handler)

/**
* Similar to [MessageConsumer.handler] but using a suspending [handler].
*
* The coroutine context is inherited from the [CoroutineScope].
* Additional context elements can be specified with the [context] argument.
*
* @param context additional context elements, [EmptyCoroutineContext] by default
*/
fun <T> MessageConsumer<T>.coHandler(
context: CoroutineContext = EmptyCoroutineContext,
handler: suspend (Message<T>) -> Unit
): MessageConsumer<T> = handler {
launch((ContextInternal.current()?.dispatcher() ?: EmptyCoroutineContext) + context) {
try {
handler(it)
} catch (e: Exception) {
it.fail(RECIPIENT_FAILURE.toInt(), e.message)
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ import io.vertx.core.Vertx
import io.vertx.core.eventbus.Message
import io.vertx.core.eventbus.ReplyException
import io.vertx.core.eventbus.ReplyFailure
import io.vertx.core.impl.ContextInternal
import io.vertx.ext.unit.TestContext
import io.vertx.ext.unit.junit.VertxUnitRunner
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import org.junit.After
import org.junit.Before
import org.junit.Test
Expand Down Expand Up @@ -54,31 +52,35 @@ class EventBusTest {
@Test
fun `test EventBus consumer with handler supports suspending functions`(testContext: TestContext) {
val async = testContext.async()
val bus = vertx.coEventBus()
bus.consumer<String>("some-address") {
// Making sure that we have some kind of suspending function here
delay(10)
async.complete()
val bus = vertx.eventBus()
GlobalScope.coroutineEventBus {
bus.coConsumer<String>("some-address") {
// Making sure that we have some kind of suspending function here
withContext(Dispatchers.IO) {
Thread.sleep(100)
}
testContext.assertTrue(ContextInternal.current().isDuplicate)
async.complete()
}
}

bus.send("some-address", "some message")

async.awaitSuccess(1000)
}

@Test
fun `test EventBus consumer supports suspending functions`(testContext: TestContext) {
val async = testContext.async(1)
val bus: CoroutineEventBus = vertx.coEventBus()
bus.consumer<String>("some-address").handler {
// Making sure that we have some kind of suspending function here
delay(10)
async.countDown()
val async = testContext.async()
val bus = vertx.eventBus()
GlobalScope.coroutineEventBus {
bus.consumer<String>("some-address").coHandler {
// Making sure that we have some kind of suspending function here
withContext(Dispatchers.IO) {
Thread.sleep(100)
}
testContext.assertTrue(ContextInternal.current().isDuplicate)
async.complete()
}
}

bus.send("some-address", "some message")

async.awaitSuccess(1000)
}

@Test
Expand Down

0 comments on commit 4297619

Please sign in to comment.