Skip to content

Commit

Permalink
Use to the new Vert.x coroutine APIs introduced in vert-x3/vertx-lang…
Browse files Browse the repository at this point in the history
…-kotlin#253 / vert-x3/vertx-lang-kotlin@e841975

There is no noticeable performance degradation in the "plaintext" and "JSON serialization" tests.
  • Loading branch information
ShreckYe committed Oct 29, 2024
1 parent 11a2473 commit fe60c70
Showing 1 changed file with 99 additions and 99 deletions.
198 changes: 99 additions & 99 deletions frameworks/Kotlin/vertx-web-kotlinx/src/main/kotlin/MainVerticle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import io.vertx.ext.web.Route
import io.vertx.ext.web.Router
import io.vertx.ext.web.RoutingContext
import io.vertx.kotlin.core.http.httpServerOptionsOf
import io.vertx.kotlin.coroutines.CoroutineRouterSupport
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.coAwait
import io.vertx.kotlin.coroutines.coroutineRouter
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
import io.vertx.pgclient.PgConnection
import io.vertx.sqlclient.PreparedQuery
import io.vertx.sqlclient.Row
import io.vertx.sqlclient.RowSet
import io.vertx.sqlclient.Tuple
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.html.*
import kotlinx.html.stream.appendHTML
import kotlinx.io.buffered
Expand All @@ -30,20 +31,6 @@ import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
handler { ctx ->
/* Some conclusions from the Plaintext test results with trailing `await()`s:
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
launch(Dispatchers.Unconfined) {
try {
requestHandler(ctx)
} catch (t: Throwable) {
ctx.fail(t)
}
}
}

// `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
lateinit var pgConnection: PgConnection
lateinit var date: String
Expand Down Expand Up @@ -118,36 +105,47 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
}

inline fun <reified T : Any> Route.jsonResponseHandler(
serializer: SerializationStrategy<T>, crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
) =
checkedCoroutineHandlerUnconfined {
it.response().run {
putJsonResponseHeader()

/*
// approach 1
end(Json.encodeToString(serializer, requestHandler(it)))/*.coAwait()*/
*/

/*
// approach 2
// java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
toRawSink().buffered().use { bufferedSink ->
@OptIn(ExperimentalSerializationApi::class)
Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
}
*/
inner /*value*/ class CoroutineRouterSupportExt(val coroutineRouterSupport: CoroutineRouterSupport) {
fun Route.coHandlerUnconfined(requestHandler: suspend (RoutingContext) -> Unit): Route =
with(coroutineRouterSupport) {
/* Some conclusions from the Plaintext test results with trailing `await()`s:
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
coHandler(Dispatchers.Unconfined, requestHandler)
}

// approach 3
end(Buffer.buffer().apply {
inline fun <reified T : Any> Route.jsonResponseCoHandler(
serializer: SerializationStrategy<T>,
crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
) =
coHandlerUnconfined {
it.response().run {
putJsonResponseHeader()

/*
// approach 1
end(Json.encodeToString(serializer, requestHandler(it)))/*.coAwait()*/
*/

/*
// approach 2
// java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
toRawSink().buffered().use { bufferedSink ->
@OptIn(ExperimentalSerializationApi::class)
Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
}
})
*/

// approach 3
end(Buffer.buffer().apply {
toRawSink().buffered().use { bufferedSink ->
@OptIn(ExperimentalSerializationApi::class)
Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
}
})
}
}
}
}

suspend fun selectRandomWorlds(queries: Int): List<World> {
val rowSets = List(queries) {
Expand All @@ -156,83 +154,85 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
return rowSets.map { it.single().toWorld() }
}

fun Router.routes() {
get("/json").jsonResponseHandler(Serializers.message) {
jsonSerializationMessage
}
fun Router.routes() = coroutineRouter {
with(CoroutineRouterSupportExt(this)) {
get("/json").jsonResponseCoHandler(Serializers.message) {
jsonSerializationMessage
}

get("/db").jsonResponseHandler(Serializers.world) {
val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).coAwait()
rowSet.single().toWorld()
}
get("/db").jsonResponseCoHandler(Serializers.world) {
val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).coAwait()
rowSet.single().toWorld()
}

get("/queries").jsonResponseHandler(Serializers.worlds) {
val queries = it.request().getQueries()
selectRandomWorlds(queries)
}
get("/queries").jsonResponseCoHandler(Serializers.worlds) {
val queries = it.request().getQueries()
selectRandomWorlds(queries)
}

get("/fortunes").checkedCoroutineHandlerUnconfined {
val fortunes = mutableListOf<Fortune>()
selectFortuneQuery.execute().coAwait()
.mapTo(fortunes) { it.toFortune() }
get("/fortunes").coHandlerUnconfined {
val fortunes = mutableListOf<Fortune>()
selectFortuneQuery.execute().coAwait()
.mapTo(fortunes) { it.toFortune() }

fortunes.add(Fortune(0, "Additional fortune added at request time."))
fortunes.sortBy { it.message }
fortunes.add(Fortune(0, "Additional fortune added at request time."))
fortunes.sortBy { it.message }

val htmlString = buildString {
append("<!DOCTYPE html>")
appendHTML(false).html {
head {
title("Fortunes")
}
body {
table {
tr {
th { +"id" }
th { +"message" }
}
for (fortune in fortunes)
val htmlString = buildString {
append("<!DOCTYPE html>")
appendHTML(false).html {
head {
title("Fortunes")
}
body {
table {
tr {
td { +fortune.id.toString() }
td { +fortune.message }
th { +"id" }
th { +"message" }
}
for (fortune in fortunes)
tr {
td { +fortune.id.toString() }
td { +fortune.message }
}
}
}
}
}
}

it.response().run {
putCommonHeaders()
putHeader(HttpHeaders.CONTENT_TYPE, "text/html; charset=utf-8")
end(htmlString)/*.coAwait()*/
it.response().run {
putCommonHeaders()
putHeader(HttpHeaders.CONTENT_TYPE, "text/html; charset=utf-8")
end(htmlString)/*.coAwait()*/
}
}
}

get("/updates").jsonResponseHandler(Serializers.worlds) {
val queries = it.request().getQueries()
val worlds = selectRandomWorlds(queries)
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
get("/updates").jsonResponseCoHandler(Serializers.worlds) {
val queries = it.request().getQueries()
val worlds = selectRandomWorlds(queries)
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }

// Approach 1
// The updated worlds need to be sorted first to avoid deadlocks.
updateWordQuery
.executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple.of(it.randomNumber, it.id) }).coAwait()
// Approach 1
// The updated worlds need to be sorted first to avoid deadlocks.
updateWordQuery
.executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple.of(it.randomNumber, it.id) }).coAwait()

/*
// Approach 2, worse performance
updatedWorlds.map {
pgPool.preparedQuery(UPDATE_WORLD_SQL).execute(Tuple.of(it.randomNumber, it.id))
}.awaitAll()
*/
/*
// Approach 2, worse performance
updatedWorlds.map {
pgPool.preparedQuery(UPDATE_WORLD_SQL).execute(Tuple.of(it.randomNumber, it.id))
}.awaitAll()
*/

updatedWorlds
}
updatedWorlds
}

get("/plaintext").checkedCoroutineHandlerUnconfined {
it.response().run {
putCommonHeaders()
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
end("Hello, World!")/*.coAwait()*/
get("/plaintext").coHandlerUnconfined {
it.response().run {
putCommonHeaders()
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
end("Hello, World!")/*.coAwait()*/
}
}
}
}
Expand Down

0 comments on commit fe60c70

Please sign in to comment.