-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a Vert.x Web Router that provides support for suspending functions #253
Conversation
…nctions See vert-x3#194 This router implements CoroutineScope to define a scope for new coroutines. Typically, this is the scope of a CoroutineVerticle. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
@bbyk would you mind taking a look at this one? |
I can add more context from our experience with coroutines and point out possible challenges.
For example, we're wrapping all client calls in "proxies" (transparent for user via custom suspend fun <T> onVertxContext(vertx: Vertx, block: suspend () -> T): T {
val ctx = coroutineContext.tracingContext()
val vertxCtx = when (val currentContext = Vertx.currentContext()) {
null -> (vertx as VertxInternal).getOrCreateContext()
else -> currentContext
}.dispatcher()
return withContext(vertxCtx) {
VertxTracingUtils.setContext(ctx)
try {
block()
} finally {
VertxTracingUtils.clearContext()
}
}
} Otherwise vertx HTTP/SQL client will not propagate context. There's @Override
public <R> Span sendRequest(
final Context context,
final SpanKind kind,
final TracingPolicy policy,
final R request,
final String operation,
final BiConsumer<String, String> headers,
final TagExtractor<R> tagExtractor
) {
if (TracingPolicy.IGNORE.equals(policy) || request == null) {
return null;
}
io.opentelemetry.context.Context propagatedContext = context.getLocal(ACTIVE_CONTEXT);
// ... Point is, adding coroutine support is not so easy as adding router, and if done w/o preparations it'll make code non-compatible with other modules :( |
@tsegismont I am curious if you considered going extension route. Basically, for the
So then you can use with block to have both receivers e.g.
Could be less code this way |
@bbyk thanks for the tip, I thought about this then: fun withCoroutineSupport(
vertx: Vertx,
scope: CoroutineScope,
block: CoroutineVertxSupport.() -> Unit
) {
val coroutineSupport = object : CoroutineVertxSupport {
override fun getVertx() = vertx
override val coroutineContext = scope.coroutineContext
}
with(coroutineSupport) { block() }
}
interface CoroutineVertxSupport : CoroutineScope {
fun getVertx(): Vertx
fun Router.coErrorHandler(statusCode: Int, errorHandler: suspend (RoutingContext) -> Unit): Router =
errorHandler(statusCode) {
launch {
try {
errorHandler(it)
} catch (e: Exception) {
it.fail(e)
}
}
}
fun Route.coHandler(fn: suspend (RoutingContext) -> Unit): Route = handler {
launch {
try {
fn(it)
} catch (e: Exception) {
it.fail(e)
}
}
}
fun Route.coFailureHandler(fn: suspend (RoutingContext) -> Unit): Route = failureHandler {
launch {
try {
fn(it)
} catch (e: Exception) {
it.fail(e)
}
}
}
fun <T> Route.coRespond(fn: suspend (RoutingContext) -> T): Route = respond {
val vertx = it.vertx() as VertxInternal
val promise = vertx.promise<T>()
launch {
try {
promise.complete(fn.invoke(it))
} catch (e: Exception) {
promise.fail(e)
}
}
promise.future()
}
} We can make abstract class CoroutineVerticle : Verticle, CoroutineVertxSupport And we still get a good UX when implementing the webapp: class TestVerticle : CoroutineVerticle() {
@Volatile
var actualPort: Int = 0
override suspend fun start() {
val router = Router.router(vertx)
router.coErrorHandler(404) { rc ->
delay(100)
rc.response().setStatusCode(404).end("Too bad...")
}
router.route().handler { rc ->
rc.put("capturedContext", ContextInternal.current())
rc.next()
}
router.get("/suspendingHandler").coHandler { rc ->
delay(100)
val current = ContextInternal.current()
if (current.isDuplicate && current == rc.get("capturedContext")) {
rc.end()
} else {
rc.fail(500)
}
}
router.get("/suspendingRespond").coRespond { rc ->
delay(100)
val current = ContextInternal.current()
if (!current.isDuplicate || current != rc.get("capturedContext")) {
throw RuntimeException()
}
"foobar"
}
router.get("/suspendingFailureHandler").coHandler { it.fail(RuntimeException()) }.coFailureHandler { rc ->
delay(100)
val current = ContextInternal.current()
if (current.isDuplicate && current == rc.get("capturedContext")) {
rc.end("baz")
} else {
rc.response().setStatusCode(500).end()
}
}
val externalRouteHandler = ExternalRouteHandler()
router.get("/externalRoute").coHandler { externalRouteHandler.handle(it) }
router.route("/parent/*").subRouter(createSubRouter(vertx, this))
val httpServer = vertx.createHttpServer()
.requestHandler(router)
.listen(0)
.await()
actualPort = httpServer.actualPort()
}
}
class ExternalRouteHandler {
suspend fun handle(rc: RoutingContext) {
delay(100)
val current = ContextInternal.current()
if (!current.isDuplicate || current != rc.get("capturedContext")) {
rc.fail(500)
}
rc.end("someone kicked the ball")
}
}
suspend fun createSubRouter(vertx: Vertx, scope: CoroutineScope): Router {
val router = Router.router(vertx)
withCoroutineSupport(vertx, scope) {
router.get("/child").coRespond { rc ->
delay(100)
val current = ContextInternal.current()
if (!current.isDuplicate || current != rc.get("capturedContext")) {
throw RuntimeException()
}
"Hello, IT"
}
}
return router
} It's very similar to your proposal except that:
|
@tsegismont It is great! A couple of minor notes:
|
so that you would just have
|
@bbyk I tried your proposal with fun CoroutineScope.withCoroutineEventBus(
vertx: Vertx,
block: CoroutineEventBusSupport.() -> Unit
) {
val coroutineSupport = object : CoroutineEventBusSupport {
override fun getVertx() = vertx
override val coroutineContext = this@withCoroutineEventBus.coroutineContext
}
with(coroutineSupport) { block() }
}
interface CoroutineEventBusSupport : CoroutineScope {
fun getVertx(): Vertx
fun <T> MessageConsumer<T>.coHandler(block: suspend (Message<T>) -> Unit): MessageConsumer<T> = handler {
launch {
try {
block(it)
} catch (e: Exception) {
it.fail(RECIPIENT_FAILURE.toInt(), e.message)
}
}
}
fun <T> EventBus.coConsumer(address: String, block: suspend (Message<T>) -> Unit): MessageConsumer<T> =
consumer<T>(address).coHandler(block)
} This is a quite compact form compared to the existing code |
Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
So in the second iteration, I pushed this: fun CoroutineScope.coroutineRouter(
vertx: Vertx,
block: CoroutineRouterSupport.() -> Unit
) = with(object : CoroutineRouterSupport {
override fun getVertx() = vertx
override val coroutineContext = this@coroutineRouter.coroutineContext
}) { block() }
/**
* Adds support for suspending function in the Vert.x Web [Router].
*
* Objects of this type implement [CoroutineScope] to define a scope for new coroutines.
* Typically, this is the scope of [CoroutineVerticle].
*/
interface CoroutineRouterSupport : CoroutineScope {
/**
* The [Vertx] instance related to this scope.
*/
fun getVertx(): Vertx
/**
* The [CoroutineDispatcher] used to dispatch new coroutines.
*
* By default, this is the [Vertx.dispatcher].
*/
fun getDispatcher(): CoroutineDispatcher = getVertx().dispatcher()
/**
* Similar to [Router.errorHandler] but using a suspended [errorHandler].
*/
fun Router.coErrorHandler(statusCode: Int, errorHandler: suspend (RoutingContext) -> Unit): Router =
errorHandler(statusCode) {
launch(getDispatcher()) {
try {
errorHandler(it)
} catch (t: Throwable) {
it.fail(t)
}
}
}
/**
* Similar to [Route.handler] but using a suspended [requestHandler].
*/
fun Route.coHandler(requestHandler: suspend (RoutingContext) -> Unit): Route = handler {
launch(getDispatcher()) {
try {
requestHandler(it)
} catch (t: Throwable) {
it.fail(t)
}
}
}
/**
* Similar to [Route.failureHandler] but using a suspended [failureHandler].
*/
fun Route.coFailureHandler(failureHandler: suspend (RoutingContext) -> Unit): Route = failureHandler {
launch(getDispatcher()) {
try {
failureHandler(it)
} catch (t: Throwable) {
it.fail(t)
}
}
}
/**
* Similar to [Route.respond] but using a suspended [function].
*/
fun <T> Route.coRespond(function: suspend (RoutingContext) -> T): Route = respond {
val vertx = it.vertx() as VertxInternal
val promise = vertx.promise<T>()
launch(getDispatcher()) {
try {
promise.complete(function.invoke(it))
} catch (t: Throwable) {
it.fail(t)
}
}
promise.future()
}
} Then, a user only has to add the class TestVerticle : CoroutineVerticle(), CoroutineRouterSupport By default, |
vertx-lang-kotlin-coroutines/src/main/java/io/vertx/kotlin/coroutines/CoroutineRouterSupport.kt
Outdated
Show resolved
Hide resolved
thanks, that looks exactly what I've been waiting for -- a bridge for vertx and coroutine on the library level, though I can't see changes for context restoration (eg ThreadContextElement) For example:
I suppose to solve such cases one still should implement its own helper methods, right? |
I wonder if it makes sense to double down on some more idiomatic Kotlin coroutine patterns here. For example, with the following code:
you can basically drop And
If your
Or if you want to run on different dispatcher, you can always do this
|
I might be wrong on this one:
It seems that the
|
@sanyarnd Possibly, yes. Would you mind sharing with me a small reproducer on a public Git repo? So I fully understand the case you're talking about. Thanks |
It might not be a problem, thanks to the changes made recently. A dispatcher created for a base context shall take into account duplicated context when evaluating if dispatch is needed and when dispatching. |
Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
@bbyk I've given your proposal some thought today. I agree, we can drop the I believe your idea of creating a coroutine context element key for the Vert.x context is a good one. However, it can be done in another pull-request and is not required to implement a Vert.x Web Router that provides support for suspending functions. You're welcome to make another review if you have time for this. I'm going to merge the PR tomorrow so that we can get this enhancement in Vert.x 4.5 Thanks again for your very good feedback. It is much appreciated. |
vertx-lang-kotlin-coroutines/src/test/kotlin/io/vertx/kotlin/coroutines/CoroutineRouterTest.kt
Show resolved
Hide resolved
@tsegismont Thank you! I still think the correct implementation might be to always launch on current context to cover all corner cases. Please see my comment on the PR: https://github.com/vert-x3/vertx-lang-kotlin/pull/253/files#r1369034860 . For example, |
@tsegismont Sorry for the off-topic: please check out #256 for the upcoming release |
Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
The default behavior should be to use the coroutine dispatcher of the Vert.x context. Users can override this by providing additional context elements. Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
context: CoroutineContext = EmptyCoroutineContext, | ||
requestHandler: suspend (RoutingContext) -> Unit | ||
): Route = handler { | ||
launch(it.vertx().dispatcher() + context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea! You probably want to use newCoroutineContext
instead of plus
as it does a little bit more corner case analysis. As far as I understand, plus could be used directly when you plus something more specific as in + CoroutineName("...")
and if it's a generic context then it's probably best to delegate to newCoroutineContext
launch(it.vertx().dispatcher().newCoroutineContext(context)) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newCoroutineContext
will be called by launch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You're right. Even if the context
arg of coHandler
had, say, CopyableThreadContextElement
the plus would get the element into the resulting context without copying but newCoroutineContext
in launch
will copy from the element since it'd be on the addedContext
side of the fold.
…nctions Backported from #253 Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
Follows implementation of coroutine router in vert-x3#253 Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
Follows implementation of coroutine router in #253 Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
Follows implementation of coroutine router in #253 Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
…-kotlin#253 / vert-x3/vertx-lang-kotlin@e841975 There is no noticeable performance degradation in the "plaintext" and "JSON serialization" tests.
…-kotlin-coroutines" (#9374) * Revamp the "vertx-web-kotlinx" portion project Changes: 1. bump the Gradle versions and the dependency versions to the latest 1. bump the JVM version to 21 with the new `jvmToolchain` DSL 1. resolve deprecations 1. update the dockerfile to use the `installDist` Gradle task so the time for archiving and unarchiving is saved * Run the revamped benchmark There are no build or runtime errors. Single query performance seems to have been improved by 2% with the bumped versions (I think most likely due to Java 21). Vagrant reduces performance by about 20% compared to running directly with Docker. * Enable io_uring and run the benchmark The single query performance is improved by 5% - 10%. * Try fixing the performance issues of the "vertx-web-kotlinx" portion in the "single query" and "JSON serialization" tests in the [Continuous Benchmarking results](https://tfb-status.techempower.com/) by using static kotlinx.serialization serializers The "vertx-web-kotlinx" results in the Continuous Benchmarking results are much lower than those of the "vertx-web-kotlin-coroutines" portion. See [the latest results](https://www.techempower.com/benchmarks/#section=test&runid=592cab59-a9db-463b-a9c9-33d2f9484e92&hw=ph&test=db) for example. Looking at the "single query" results, I first suspected that it was caused by there being not enough memory for the JVM runtime, so I added some logging code that prints the memory usage using `Runtime.totalMemory` and `Runtime.maxMemory`. It showed that there was about 7 GB max memory available during the benchmark runs, and the program only used 400 MB to 1 GB. I then tried allocating a 4 GB array during the run to ensure that the memory was usable and it worked with no problem. Then looking at the "JSON serialization" results again, I saw that "vertx-web-kotlinx" performs a lot worse in this test too, and decided that this is more likely to be the bottleneck. Therefore, the static serializers are provided explicitly and the performance is improved slightly as tested on my machine. (Also, see commit 315b4e3 for an attempt before.) I then copied the "JSON serialization" test code from "vertx-web-kotlin-coroutines" and ran the benchmark to see if there were other factors, such as project configuration differences, affecting the performance, and the answer was no. On my machine, the "JSON serialization" performance of "vertx-web-kotlinx" is about 80% - 85% of that of "vertx-web-kotlin-coroutines". And I think the bottleneck possibly lies in kotlinx.serialization serializing an object to a byte array first and then copying it to a Vert.x buffer. Remove the broken tag in "vertx-web-kotlin-coroutines" BTW, which was added in commit e53e026, for the benchmark runs without problems now as I tested. * Update README.md correspondingly * Add `--no-daemon` to the Gradle command in the Dockerfiles * Update the "Connection reset" exception to the io_uring one that's ignored in logging * Use `encodeToBufferedSink` in "kotlinx-serialization-json-okio" with a custom-implemented `VertxBufferSink` in JSON serialization The "JSON serialization" performance is on par with using `io.vertx.core.json.Json.encodeToBuffer` as tested on my machine after this change. * Replace Okio with kotlinx-io and remove unneeded code The "JSON serialization" performance seems to be slightly less. * Rename 2 classes to clarify * Use to the new Vert.x coroutine APIs introduced in vert-x3/vertx-lang-kotlin#253 / vert-x3/vertx-lang-kotlin@e841975 There is no noticeable performance degradation in the "plaintext" and "JSON serialization" tests. * Simply the code introduced in the previous commit by making `MainVerticle` implement `CoroutineRouterSupport` I didn't go through [the docs](https://vertx.io/docs/vertx-lang-kotlin-coroutines/kotlin/#_vert_x_web) thoroughly before implementing this. * Revamp the "vertx-web-kotlin-coroutines" portion project too following the changes made to the "vertx-web-kotlinx" portion The "gradlew" script somehow had incorrect access permissions and is fixed by bumping the Gradle wrapper. To keep the dependencies consistent with the "vert-web" portion, some dependencies are not updated to the latest versions. Remove 2 useless `COPY`s in "vertx-web-kotlin-coroutines-postgres.dockerfile" BTW. * Remove unneeded and incorrect comments Wrapping something as a Vert.x `Buffer` by implementing the `Buffer` interface is not viable because `BufferImpl` contains casts from a `Buffer` to a `BufferImpl`.
See #194
This router implements
CoroutineScope
to define a scope for new coroutines. Typically, this is the scope of aCoroutineVerticle
.