From 0f89b16ff7770e8b763db9bac6ffebb9cfce06a0 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Sat, 18 May 2024 23:42:53 +0300 Subject: [PATCH] benchmarks setup --- benchmarks/build.gradle.kts | 116 --------------- benchmarks/rsocket-java/build.gradle.kts | 67 +++++++++ .../kotlin/LocalRSocketJavaBenchmark.kt | 35 +++++ .../jvmMain/kotlin}/RSocketJavaBenchmark.kt | 66 +++++---- .../jvmMain/kotlin/TcpRSocketJavaBenchmark.kt | 37 +++++ .../rsocket-kotlin-old/build.gradle.kts | 90 ++++++++++++ .../kotlin/KtorTcpRSocketKotlinBenchmark.kt | 55 +++++++ .../kotlin/LocalRSocketKotlinOldBenchmark.kt | 61 ++++++++ .../kotlin/RSocketKotlinOldBenchmark.kt | 92 ++++++++++++ benchmarks/rsocket-kotlin/build.gradle.kts | 126 ++++++++++++++++ .../kotlin/KtorTcpRSocketKotlinBenchmark.kt | 100 +++++++++++++ .../kotlin/LocalRSocketKotlinBenchmark.kt | 71 +++++++++ .../kotlin/RSocketKotlinBenchmark.kt | 91 ++++++++++++ .../kotlin/NettyQuicRSocketKotlinBenchmark.kt | 79 ++++++++++ .../kotlin/NettyTcpRSocketKotlinBenchmark.kt | 68 +++++++++ benchmarks/shared/build.gradle.kts | 37 +++++ .../src/commonMain/kotlin/RSocketBenchmark.kt | 100 +++++++++++++ .../kotlin/benchmarks/RSocketBenchmark.kt | 138 ------------------ .../benchmarks/RSocketKotlinBenchmark.kt | 87 ----------- gradle/libs.versions.toml | 8 +- settings.gradle.kts | 7 + 21 files changed, 1161 insertions(+), 370 deletions(-) delete mode 100644 benchmarks/build.gradle.kts create mode 100644 benchmarks/rsocket-java/build.gradle.kts create mode 100644 benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt rename benchmarks/{src/javaMain/kotlin/io/rsocket/kotlin/benchmarks => rsocket-java/src/jvmMain/kotlin}/RSocketJavaBenchmark.kt (62%) create mode 100644 benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin-old/build.gradle.kts create mode 100644 benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/LocalRSocketKotlinOldBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/RSocketKotlinOldBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin/build.gradle.kts create mode 100644 benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt create mode 100644 benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt create mode 100644 benchmarks/shared/build.gradle.kts create mode 100644 benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt delete mode 100644 benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt delete mode 100644 benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts deleted file mode 100644 index 3d070c14..00000000 --- a/benchmarks/build.gradle.kts +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import kotlinx.benchmark.gradle.* -import org.gradle.kotlin.dsl.benchmark -import org.jetbrains.kotlin.gradle.plugin.mpp.* - -plugins { - rsocket.multiplatform - - alias(libs.plugins.kotlin.allopen) - alias(libs.plugins.kotlinx.benchmark) -} - -val jmhVersionOverride: String by rootProject - -kotlin { - val jvm = jvm() //common jvm source set - val kotlinJvm = jvm("kotlin") //kotlin benchmark - val javaJvm = jvm("java") - - sourceSets { - val commonMain by getting { - dependencies { - implementation(libs.kotlinx.benchmark) - implementation(libs.kotlinx.coroutines.core) - } - } - - val jvmMain by getting - - val kotlinMain by getting { - dependsOn(jvmMain) - dependencies { - implementation(projects.rsocketCore) - implementation(projects.rsocketTransportLocal) - } - } - - val javaMain by getting { - dependsOn(jvmMain) - dependencies { - implementation(libs.kotlinx.coroutines.reactor) - implementation(libs.rsocket.java.core) - implementation(libs.rsocket.java.transport.local) - } - } - } -} - -allOpen { - annotation("org.openjdk.jmh.annotations.State") -} - -benchmark { - targets { - register("jvm") - register("kotlin") - register("java") - } - - targets.withType { jmhVersion = libs.versions.jmh.get() } -} - -tasks.register("jmhProfilers") { - group = "benchmark" - description = "Lists the available JMH profilers" - classpath = (kotlin.targets["jvm"].compilations["main"] as KotlinJvmCompilation).runtimeDependencyFiles - main = "org.openjdk.jmh.Main" - args("-lprof") -} - -fun registerJmhGCTask(target: String): TaskProvider<*> = tasks.register("${target}BenchmarkGC") { - group = "benchmark" - - val buildFolder = buildDir.resolve("benchmarks").resolve(target) - val compilation = (kotlin.targets[target].compilations["main"] as KotlinJvmCompilation) - classpath( - file(buildFolder.resolve("classes")), - file(buildFolder.resolve("resources")), - compilation.runtimeDependencyFiles, - compilation.output.allOutputs - ) - main = "org.openjdk.jmh.Main" - - dependsOn("${target}BenchmarkCompile") - args("-prof", "gc") - args("-jvmArgsPrepend", "-Xmx3072m") - args("-jvmArgsPrepend", "-Xms3072m") - args("-foe", "true") //fail-on-error - args("-v", "NORMAL") //verbosity [SILENT, NORMAL, EXTRA] - args("-rf", "json") - args("-rff", project.file("build/reports/benchmarks/$target/result.json").also { it.parentFile.mkdirs() }) -} - -val t1 = registerJmhGCTask("java") -val t2 = registerJmhGCTask("kotlin") - -tasks.register("benchmarkGC") { - group = "benchmark" - dependsOn(t1.get()) - dependsOn(t2.get()) -} diff --git a/benchmarks/rsocket-java/build.gradle.kts b/benchmarks/rsocket-java/build.gradle.kts new file mode 100644 index 00000000..f251b8ae --- /dev/null +++ b/benchmarks/rsocket-java/build.gradle.kts @@ -0,0 +1,67 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-base") + alias(libs.plugins.kotlin.plugin.allopen) + alias(libs.plugins.kotlinx.benchmark) +} + +kotlin { + jvmTarget() + + sourceSets { + jvmMain.dependencies { + implementation(projects.benchmarksShared) + + implementation(libs.kotlinx.coroutines.reactor) + implementation(libs.rsocket.java.transport.local) + implementation(libs.rsocket.java.transport.netty) + } + } +} + +allOpen { + annotation("org.openjdk.jmh.annotations.State") +} + +benchmark { + targets { + register("jvm") { + this as kotlinx.benchmark.gradle.JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + } + configurations { + configureEach { + reportFormat = "text" + } + named("main") { + // all params + param("payloadSize", "0", "64", "1024", "131072", "1048576") + } + register("localPayloadSize") { + include("LocalRSocketJavaBenchmark") + param("payloadSize", "0", "64") + } + register("tcpPayloadSize") { + include("TcpRSocketJavaBenchmark") + param("payloadSize", "0", "64") + } + } +} diff --git a/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt new file mode 100644 index 00000000..9230cee8 --- /dev/null +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.java + +import io.rsocket.transport.* +import io.rsocket.transport.local.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class LocalRSocketJavaBenchmark : RSocketJavaBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + override val serverTransport: ServerTransport<*> = LocalServerTransport.create("local") + override val clientTransport: ClientTransport = LocalClientTransport.create("local") +} diff --git a/benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt similarity index 62% rename from benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt rename to benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt index 96685d7d..05d5c6fa 100644 --- a/benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,32 +14,52 @@ * limitations under the License. */ -package io.rsocket.kotlin.benchmarks +package io.rsocket.kotlin.benchmarks.java import io.rsocket.* import io.rsocket.core.* import io.rsocket.frame.decoder.* -import io.rsocket.transport.local.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.transport.* import io.rsocket.util.* +import kotlinx.benchmark.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.reactivestreams.* import reactor.core.publisher.* import kotlin.random.* -class RSocketJavaBenchmark : RSocketBenchmark() { +abstract class RSocketJavaBenchmark : RSocketBenchmark() { + protected abstract val clientTransport: ClientTransport + protected abstract val serverTransport: ServerTransport<*> - lateinit var client: RSocket - lateinit var server: Closeable + private lateinit var payload: Payload + private lateinit var payloadMono: Mono + private lateinit var payloadsFlux: Flux + private lateinit var payloadsFlow: Flow + private lateinit var client: RSocket + private lateinit var server: Closeable - lateinit var payload: Payload - lateinit var payloadMono: Mono - lateinit var payloadsFlux: Flux - lateinit var payloadsFlow: Flow + override fun createPayload(size: Int): Payload = if (size == 0) EmptyPayload.INSTANCE else ByteBufPayload.create( + ByteArray(size / 2).also { Random.nextBytes(it) }, + ByteArray(size / 2).also { Random.nextBytes(it) } + ) + + override fun createPayloadCopy(): Payload = payload.retain() + + override fun releasePayload(payload: Payload) { + payload.release() + } + + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + + override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.retain()).awaitSingle() + override fun doRequestStream(): Flow = client.requestStream(payload.retain()).asFlow() + override fun doRequestChannel(): Flow = client.requestChannel(payloadsFlow.asPublisher()).asFlow() + @Setup override fun setup() { payload = createPayload(payloadSize) - payloadMono = Mono.fromSupplier(payload::retain) payloadsFlux = Flux.range(0, 5000).map { payload.retain() } payloadsFlow = flow { repeat(5000) { emit(payload.retain()) } } @@ -61,33 +81,27 @@ class RSocketJavaBenchmark : RSocketBenchmark() { }) } .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(LocalServerTransport.create("server")) + .bind(serverTransport) .block()!! client = RSocketConnector.create() .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(LocalClientTransport.create("server")) + .connect(clientTransport) .block()!! } + @TearDown override fun cleanup() { client.dispose() server.dispose() } - override fun createPayload(size: Int): Payload = if (size == 0) EmptyPayload.INSTANCE else ByteBufPayload.create( - ByteArray(size / 2).also { Random.nextBytes(it) }, - ByteArray(size / 2).also { Random.nextBytes(it) } - ) - - override fun releasePayload(payload: Payload) { - payload.release() - } - - override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.retain()).awaitSingle() - - override suspend fun doRequestStream(): Flow = client.requestStream(payload.retain()).asFlow() + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) - override suspend fun doRequestChannel(): Flow = client.requestChannel(payloadsFlow.asPublisher()).asFlow() + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) } diff --git a/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt new file mode 100644 index 00000000..a0d0a43d --- /dev/null +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.java + +import io.rsocket.transport.* +import io.rsocket.transport.local.* +import io.rsocket.transport.netty.client.* +import io.rsocket.transport.netty.server.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class TcpRSocketJavaBenchmark : RSocketJavaBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + override val serverTransport: ServerTransport<*> = TcpServerTransport.create(9000) + override val clientTransport: ClientTransport = TcpClientTransport.create(9000) +} diff --git a/benchmarks/rsocket-kotlin-old/build.gradle.kts b/benchmarks/rsocket-kotlin-old/build.gradle.kts new file mode 100644 index 00000000..42fc577e --- /dev/null +++ b/benchmarks/rsocket-kotlin-old/build.gradle.kts @@ -0,0 +1,90 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-base") + alias(libs.plugins.kotlin.plugin.allopen) + alias(libs.plugins.kotlinx.benchmark) +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + implementation(projects.benchmarksShared) + implementation("io.rsocket.kotlin:rsocket-transport-local:0.15.4") + implementation("io.rsocket.kotlin:rsocket-transport-ktor-tcp:0.15.4") +// implementation(projects.rsocketTransportKtorWebsocketClient) +// implementation(projects.rsocketTransportKtorWebsocketServer) + + // ktor engines +// implementation(libs.ktor.server.cio) +// implementation(libs.ktor.client.cio) + } + } +} + +allOpen { + annotation("org.openjdk.jmh.annotations.State") +} + +benchmark { + targets { + register("jvm") { + this as kotlinx.benchmark.gradle.JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + register("macosArm64") + } + + configurations { + configureEach { + reportFormat = "text" + } + named("main") { + // all params, should not be called really + include("LocalRSocketKotlinOldBenchmark") + param("payloadSize", "0") + param("dispatcher", "DEFAULT", "UNCONFINED") + } + register("fast") { + include("LocalRSocketKotlinOldBenchmark") + param("payloadSize", "0") + param("dispatcher", "UNCONFINED") + } + register("localPayloadSize") { + include("LocalRSocketKotlinOldBenchmark") + param("payloadSize", "0", "64") + param("dispatcher", "UNCONFINED") + } + register("ktorTcp") { + include("KtorTcpRSocketKotlinOldBenchmark") + param("payloadSize", "0") + } + register("ktorTcpPayloadSize") { + include("KtorTcpRSocketKotlinOldBenchmark") + param("payloadSize", "0", "64") + } + } +} diff --git a/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..ca97c5c2 --- /dev/null +++ b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt @@ -0,0 +1,55 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.sockets.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlin.random.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class KtorTcpRSocketKotlinOldBenchmark : RSocketKotlinOldBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + override val serverDispatcher: CoroutineDispatcher = Dispatchers.IO + + override val serverTransport: ServerTransport<*> by lazy { + TcpServerTransport(port = 9000 + Random.nextInt(100)) + } + + override suspend fun clientTransport(server: Any?): ClientTransport { + return TcpClientTransport( + (server as TcpServer).serverSocket.await().localAddress as InetSocketAddress + ) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + } +} diff --git a/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/LocalRSocketKotlinOldBenchmark.kt b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/LocalRSocketKotlinOldBenchmark.kt new file mode 100644 index 00000000..e29a2b87 --- /dev/null +++ b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/LocalRSocketKotlinOldBenchmark.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.local.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) +class LocalRSocketKotlinOldBenchmark : RSocketKotlinOldBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + @Param("") + var dispatcher: String = "" + + override val serverDispatcher: CoroutineDispatcher by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "IO" -> Dispatchers.IO + "UNCONFINED" -> Dispatchers.Unconfined + "1" -> newSingleThreadContext("dispatcher") + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + override val serverTransport: ServerTransport<*> by lazy { + LocalServerTransport() + } + + override suspend fun clientTransport(server: Any?): ClientTransport = server as LocalServer + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + (serverDispatcher as? CloseableCoroutineDispatcher)?.close() + } +} diff --git a/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/RSocketKotlinOldBenchmark.kt b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/RSocketKotlinOldBenchmark.kt new file mode 100644 index 00000000..e29a251b --- /dev/null +++ b/benchmarks/rsocket-kotlin-old/src/commonMain/kotlin/RSocketKotlinOldBenchmark.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.random.* + +@OptIn(ExperimentalStreamsApi::class) +abstract class RSocketKotlinOldBenchmark : RSocketBenchmark() { + protected abstract suspend fun clientTransport(server: Any?): ClientTransport + protected abstract val serverTransport: ServerTransport<*> + protected abstract val serverDispatcher: CoroutineDispatcher + + private val requestStrategy = PrefetchStrategy(64, 0) + + protected val benchJob = Job() + private lateinit var client: RSocket + private lateinit var payload: Payload + private lateinit var payloadsFlow: Flow + + override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( + data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }), + metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }) + ) + + override fun createPayloadCopy(): Payload = payload.copy() + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + override fun releasePayload(payload: Payload) = payload.close() + + override suspend fun doRequestResponse(): Payload = client.requestResponse(createPayloadCopy()) + override fun doRequestStream(): Flow = client.requestStream(createPayloadCopy()).flowOn(requestStrategy) + override fun doRequestChannel(): Flow = client.requestChannel(createPayloadCopy(), payloadsFlow).flowOn(requestStrategy) + + override fun setup(): Unit = runBlocking { + payload = createPayload(payloadSize) + payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } } + + val server = RSocketServer().bindIn(CoroutineScope(benchJob + serverDispatcher), serverTransport) { + RSocketRequestHandler { + requestResponse { + it.close() + createPayloadCopy() + } + requestStream { + it.close() + payloadsFlow + } + requestChannel { init, payloads -> + init.close() + payloads.flowOn(requestStrategy) + } + } + } + client = RSocketConnector().connect(clientTransport(server)) + } + + override fun cleanup(): Unit = runBlocking { + client.coroutineContext.job.cancelAndJoin() + benchJob.cancelAndJoin() + } + + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) + + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) +} diff --git a/benchmarks/rsocket-kotlin/build.gradle.kts b/benchmarks/rsocket-kotlin/build.gradle.kts new file mode 100644 index 00000000..da544f24 --- /dev/null +++ b/benchmarks/rsocket-kotlin/build.gradle.kts @@ -0,0 +1,126 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-base") + alias(libs.plugins.kotlin.plugin.allopen) + alias(libs.plugins.kotlinx.benchmark) +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + implementation(projects.benchmarksShared) + implementation(projects.rsocketTransportLocal) + implementation(projects.rsocketTransportKtorTcp) + implementation(projects.rsocketTransportKtorWebsocketClient) + implementation(projects.rsocketTransportKtorWebsocketServer) + + // ktor engines + implementation(libs.ktor.server.cio) + implementation(libs.ktor.client.cio) + } + jvmMain.dependencies { + implementation(projects.rsocketTransportNettyTcp) + implementation(projects.rsocketTransportNettyWebsocket) + implementation(projects.rsocketTransportNettyQuic) + implementation(libs.netty.codec.quic.map { + val javaOsName = System.getProperty("os.name") + val javaOsArch = System.getProperty("os.arch") + val suffix = when { + javaOsName.contains("mac", ignoreCase = true) -> "osx" + javaOsName.contains("linux", ignoreCase = true) -> "linux" + javaOsName.contains("windows", ignoreCase = true) -> "windows" + else -> error("Unknown os.name: $javaOsName") + } + "-" + when (javaOsArch) { + "x86_64", "amd64" -> "x86_64" + "arm64", "aarch64" -> "aarch_64" + else -> error("Unknown os.arch: $javaOsArch") + } + "$it:$suffix" + }) + implementation(libs.bouncycastle) + } + } +} + +allOpen { + annotation("org.openjdk.jmh.annotations.State") +} + +benchmark { + targets { + register("jvm") { + this as kotlinx.benchmark.gradle.JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + register("macosArm64") + } + + configurations { + configureEach { + reportFormat = "text" + } + named("main") { + // all params, should not be called really +// param("payloadSize", "0") +// param("channels", "S:BUFFERED", "S:UNLIMITED", "M:BUFFERED", "M:UNLIMITED") +// param("dispatcher", "DEFAULT", "UNCONFINED") + } + register("fast") { + include("LocalRSocketKotlinBenchmark") + param("payloadSize", "0") + param("channels", "S:UNLIMITED") + param("dispatcher", "UNCONFINED") + } + register("localPayloadSize") { + include("LocalRSocketKotlinBenchmark") + param("payloadSize", "0", "64") + param("channels", "S:UNLIMITED") + param("dispatcher", "UNCONFINED") + } + + register("ktorTcp") { + include("KtorTcpRSocketKotlinBenchmark") + param("payloadSize", "0") + param("dispatcher", "IO", "DEFAULT", "UNCONFINED") + param("selectorDispatcher", "IO", "1", "2", "4", "8", "l1", "l2", "l4", "l8") + } + register("ktorTcpPayloadSize") { + include("KtorTcpRSocketKotlinBenchmark") + param("payloadSize", "0", "64") + } + + register("nettyTcp") { + include("NettyTcpRSocketKotlinBenchmark") + param("payloadSize", "0") + param("shareGroup", "true", "false") + } + register("nettyQuic") { + include("NettyQuicRSocketKotlinBenchmark") + param("payloadSize", "0") + } + } +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..e4b2c836 --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) +class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + @Param("") + var dispatcher: String = "" + + @Param("") + var selectorDispatcher: String = "" + + private val dispatcherV by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "IO" -> Dispatchers.IO + "UNCONFINED" -> Dispatchers.Unconfined + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + + private val selectorDispatcherV by lazy { + when (selectorDispatcher) { + "DEFAULT" -> Dispatchers.Default + "IO" -> Dispatchers.IO + "1" -> newSingleThreadContext("selectorDispatcher") + "2" -> newFixedThreadPoolContext(2, "selectorDispatcher") + "4" -> newFixedThreadPoolContext(4, "selectorDispatcher") + "8" -> newFixedThreadPoolContext(8, "selectorDispatcher") + "l1" -> Dispatchers.IO.limitedParallelism(1) + "l2" -> Dispatchers.IO.limitedParallelism(2) + "l4" -> Dispatchers.IO.limitedParallelism(4) + "l8" -> Dispatchers.IO.limitedParallelism(8) + else -> error("wrong parameter 'selectorDispatcher=$selectorDispatcher'") + } + } + + private val selector by lazy { + SelectorManager(selectorDispatcherV) + } + + override val serverTarget: RSocketServerTarget<*> by lazy { + KtorTcpServerTransport(benchJob) { + dispatcher(dispatcherV) + selectorManager(selector, manage = false) + }.target(port = 9000) + } + + override val clientTarget: RSocketClientTarget by lazy { + KtorTcpClientTransport(benchJob) { + dispatcher(dispatcherV) + selectorManager(selector, manage = false) + }.target("0.0.0.0", port = 9000) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + selector.close() + if ( + selectorDispatcherV != Dispatchers.Default && + selectorDispatcherV != Dispatchers.IO && + selectorDispatcherV is CloseableCoroutineDispatcher + ) { + (selectorDispatcherV as CloseableCoroutineDispatcher).close() + } + } +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..63dacdfd --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.local.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class LocalRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + @Param("") + var channels: String = "" + + @Param("") + var dispatcher: String = "" + + private val dispatcherV by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "IO" -> Dispatchers.IO + "UNCONFINED" -> Dispatchers.Unconfined + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + override val serverTarget: RSocketServerTarget<*> by lazy { + LocalServerTransport(benchJob) { + dispatcher(dispatcherV) + when (channels) { + "S:BUFFERED" -> sequential(Channel.BUFFERED) + "S:UNLIMITED" -> sequential(Channel.UNLIMITED) + "M:BUFFERED" -> multiplexed(Channel.BUFFERED, Channel.BUFFERED) + "M:UNLIMITED" -> multiplexed(Channel.UNLIMITED, Channel.UNLIMITED) + else -> error("wrong parameter 'channels=$channels'") + } + }.target("local") + } + + override val clientTarget: RSocketClientTarget by lazy { + LocalClientTransport(benchJob) { + dispatcher(dispatcherV) + }.target("local") + } + + @Setup + override fun setup() = super.setup() + + @TearDown + override fun cleanup() = super.cleanup() +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt new file mode 100644 index 00000000..b406d39f --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.random.* + +@OptIn(ExperimentalStreamsApi::class) +abstract class RSocketKotlinBenchmark : RSocketBenchmark() { + protected abstract val clientTarget: RSocketClientTarget + protected abstract val serverTarget: RSocketServerTarget<*> + + private val requestStrategy = PrefetchStrategy(64, 0) + + protected val benchJob = Job() + private lateinit var client: RSocket + private lateinit var payload: Payload + private lateinit var payloadsFlow: Flow + + override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( + data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }), + metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }) + ) + + override fun createPayloadCopy(): Payload = payload.copy() + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + override fun releasePayload(payload: Payload) = payload.close() + + override suspend fun doRequestResponse(): Payload = client.requestResponse(createPayloadCopy()) + override fun doRequestStream(): Flow = client.requestStream(createPayloadCopy()).flowOn(requestStrategy) + override fun doRequestChannel(): Flow = client.requestChannel(createPayloadCopy(), payloadsFlow).flowOn(requestStrategy) + + override fun setup(): Unit = runBlocking { + payload = createPayload(payloadSize) + payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } } + + RSocketServer().startServer(serverTarget) { + RSocketRequestHandler { + requestResponse { + it.close() + createPayloadCopy() + } + requestStream { + it.close() + payloadsFlow + } + requestChannel { init, payloads -> + init.close() + payloads.flowOn(requestStrategy) + } + } + } + client = RSocketConnector().connect(clientTarget) + } + + override fun cleanup(): Unit = runBlocking { + client.coroutineContext.job.cancelAndJoin() + benchJob.cancelAndJoin() + } + + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) + + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) +} diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..8080d8a8 --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.netty.channel.nio.* +import io.netty.handler.ssl.util.* +import io.netty.incubator.codec.quic.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import io.rsocket.kotlin.transport.netty.quic.* +import io.rsocket.kotlin.transport.netty.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class NettyQuicRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + private val certificates = SelfSignedCertificate() + + private val protos = arrayOf("hq-29") + + private val sharedGroup by lazy { + NioEventLoopGroup() + } + + override val serverTarget: RSocketServerTarget<*> by lazy { + NettyQuicServerTransport(benchJob) { + eventLoopGroup(sharedGroup, manage = false) + ssl { + keyManager(certificates.privateKey(), null, certificates.certificate()) + applicationProtocols(*protos) + } + codec { + tokenHandler(InsecureQuicTokenHandler.INSTANCE) + } + }.target(port = 9009) + } + + override val clientTarget: RSocketClientTarget by lazy { + NettyQuicClientTransport(benchJob) { + eventLoopGroup(sharedGroup, manage = false) + ssl { + trustManager(InsecureTrustManagerFactory.INSTANCE) + applicationProtocols(*protos) + } + }.target("127.0.0.1", port = 9009) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + sharedGroup.shutdownGracefully().await(1000) + } +} diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..4bf5c5c9 --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.netty.channel.nio.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import io.rsocket.kotlin.transport.netty.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@State(Scope.Benchmark) +class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + @Param("0") + override var payloadSize: Int = 0 + + @Param("true", "false") + var shareGroup: Boolean = true + + private val sharedGroup by lazy { + if (shareGroup) NioEventLoopGroup() else null + } + + override val serverTarget: RSocketServerTarget<*> by lazy { + NettyTcpServerTransport(benchJob) { + if (sharedGroup != null) { + eventLoopGroup(sharedGroup!!, manage = false) + } + }.target(port = 9000) + } + + override val clientTarget: RSocketClientTarget by lazy { + NettyTcpClientTransport(benchJob) { + if (sharedGroup != null) { + eventLoopGroup(sharedGroup!!, manage = false) + } + }.target("0.0.0.0", port = 9000) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + sharedGroup?.shutdownGracefully()?.await(1000) + } +} diff --git a/benchmarks/shared/build.gradle.kts b/benchmarks/shared/build.gradle.kts new file mode 100644 index 00000000..09089f0b --- /dev/null +++ b/benchmarks/shared/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-base") +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + api(libs.kotlinx.coroutines.core) + api(libs.kotlinx.benchmark) + } + } +} diff --git a/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt b/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt new file mode 100644 index 00000000..347009da --- /dev/null +++ b/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +abstract class RSocketBenchmark { + + // payload operations + abstract val payloadSize: Int + abstract fun createPayload(size: Int): Payload + abstract fun createPayloadCopy(): Payload + abstract fun releasePayload(payload: Payload) + + // something wrong with compilation here + abstract fun consumePayload(bh: Blackhole, value: Payload) + + // lifecycle + + abstract fun setup() + abstract fun cleanup() + + // benchmarks + + // Benchmark annotation doesn't inherit on jvm + open fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse) + open fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse) + open fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse) + + + // operations + + abstract suspend fun doRequestResponse(): Payload + abstract fun doRequestStream(): Flow + abstract fun doRequestChannel(): Flow + + private suspend fun requestResponse(bh: Blackhole) { + doRequestResponse().also { + releasePayload(it) + consumePayload(bh, it) + } + } + + private suspend fun requestStream(bh: Blackhole) { + doRequestStream().collect { + releasePayload(it) + consumePayload(bh, it) + } + } + + private suspend fun requestChannel(bh: Blackhole) { + doRequestChannel().collect { + releasePayload(it) + consumePayload(bh, it) + } + } + + // execution strategies + + // plain blocking + private inline fun blocking( + bh: Blackhole, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking { + block(bh) + } + + // Run every request in a separate coroutine which will be dispatched on Default dispatcher (thread amount = cores amount) + private inline fun parallel( + bh: Blackhole, + p: Int, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking(Dispatchers.Default) { + repeat(p) { launch { block(bh) } } + } + + // Run every request in separate coroutine, but on single thread dispatcher + private inline fun concurrent( + bh: Blackhole, + p: Int, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking { + repeat(p) { launch { block(bh) } } + } +} diff --git a/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt b/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt deleted file mode 100644 index c00ae1ac..00000000 --- a/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@file:OptIn(DelicateCoroutinesApi::class) - -package io.rsocket.kotlin.benchmarks - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.openjdk.jmh.annotations.* -import org.openjdk.jmh.infra.* -import java.util.concurrent.locks.* - -@BenchmarkMode(Mode.Throughput) -@Fork(value = 2) -@Warmup(iterations = 5, time = 5) -@Measurement(iterations = 5, time = 5) -@State(Scope.Benchmark) -abstract class RSocketBenchmark { - - @Param("0", "64", "1024", "131072", "1048576", "15728640") - var payloadSize: Int = 0 - - @Setup - abstract fun setup() - - @TearDown - abstract fun cleanup() - - @TearDown(Level.Iteration) - fun awaitToBeConsumed() { - LockSupport.parkNanos(2000) - } - - abstract fun createPayload(size: Int): Payload - - abstract fun releasePayload(payload: Payload) - - abstract suspend fun doRequestResponse(): Payload - - abstract suspend fun doRequestStream(): Flow - - abstract suspend fun doRequestChannel(): Flow - - - @Benchmark - fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse) - - @Benchmark - fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse) - - @Benchmark - fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse) - - - @Benchmark - fun requestStreamBlocking(bh: Blackhole) = blocking(bh, ::requestStream) - - @Benchmark - fun requestStreamParallel(bh: Blackhole) = parallel(bh, 10, ::requestStream) - - @Benchmark - fun requestStreamConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestStream) - - - @Benchmark - fun requestChannelBlocking(bh: Blackhole) = blocking(bh, ::requestChannel) - - @Benchmark - fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel) - - @Benchmark - fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel) - - - private suspend fun requestResponse(bh: Blackhole) { - doRequestResponse().also { - releasePayload(it) - bh.consume(it) - } - } - - private suspend fun requestStream(bh: Blackhole) { - doRequestStream().collect { - releasePayload(it) - bh.consume(it) - } - } - - private suspend fun requestChannel(bh: Blackhole) { - doRequestChannel().collect { - releasePayload(it) - bh.consume(it) - } - } - - //plain blocking - private inline fun blocking(bh: Blackhole, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = runBlocking { - block(bh) - } - - //Run every request in separate coroutine which will be dispatched on Default dispatcher (threads amount = cores amount) - private inline fun parallel(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = - runBlocking { - (0..p).map { - GlobalScope.async { block(bh) } - }.awaitAll() - } - - //Run every request in separate coroutine, but on single thread dispatcher: - // - do request 1 - // - suspend on awaiting of result 1 - // - do request 2 - // - suspend on awaiting of result 2 - // - receive result on request 1 - // - receive result on request 2 - // - .... - //working with requests is single threaded but concurrent - private inline fun concurrent(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = - runBlocking { - (0..p).map { - async { block(bh) } - }.awaitAll() - } -} diff --git a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt deleted file mode 100644 index 310d0078..00000000 --- a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.benchmarks - -import io.ktor.utils.io.core.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* -import io.rsocket.kotlin.transport.local.* -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlin.random.* - -@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class) -class RSocketKotlinBenchmark : RSocketBenchmark() { - private val requestStrategy = PrefetchStrategy(64, 0) - - private val benchJob = Job() - lateinit var client: RSocket - - lateinit var payload: Payload - lateinit var payloadsFlow: Flow - - fun payloadCopy(): Payload = payload.copy() - - override fun setup() { - payload = createPayload(payloadSize) - payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } } - val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) { - RSocketRequestHandler { - requestResponse { - it.close() - payloadCopy() - } - requestStream { - it.close() - payloadsFlow - } - requestChannel { init, payloads -> - init.close() - payloads.flowOn(requestStrategy) - } - } - } - client = runBlocking { - RSocketConnector().connect(server) - } - } - - override fun cleanup() { - runBlocking { - client.coroutineContext.job.cancelAndJoin() - benchJob.cancelAndJoin() - } - } - - override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( - data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }), - metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }) - ) - - override fun releasePayload(payload: Payload) { - payload.close() - } - - override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy()) - - override suspend fun doRequestStream(): Flow = client.requestStream(payloadCopy()).flowOn(requestStrategy) - - override suspend fun doRequestChannel(): Flow = - client.requestChannel(payloadCopy(), payloadsFlow).flowOn(requestStrategy) - -} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 21085930..fa8c0e1d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -3,7 +3,7 @@ kotlin = "2.0.0" kotlinx-atomicfu = "0.24.0" kotlinx-coroutines = "1.8.1" -kotlinx-benchmark = "0.4.8" +kotlinx-benchmark = "0.4.10" kotlinx-bcv = "0.14.0" ktor = "2.3.11" @@ -16,9 +16,9 @@ bouncycastle = "1.78.1" turbine = "1.1.0" -rsocket-java = "1.1.3" +rsocket-java = "1.1.4" -jmh = "1.36" +jmh = "1.37" maven-publish = "0.28.0" @@ -55,6 +55,7 @@ turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" } rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" } rsocket-java-transport-local = { module = 'io.rsocket:rsocket-transport-local', version.ref = "rsocket-java" } +rsocket-java-transport-netty = { module = 'io.rsocket:rsocket-transport-netty', version.ref = "rsocket-java" } kotlin-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } kotlinx-atomicfu-gradle-plugin = { module = "org.jetbrains.kotlinx:atomicfu-gradle-plugin", version.ref = "kotlinx-atomicfu" } @@ -63,4 +64,5 @@ maven-publish-gradle-plugin = { module = "com.vanniktech:gradle-maven-publish-pl [plugins] kotlin-multiplatform = { id = "org.jetbrains.kotlin.multiplatform", version.ref = "kotlin" } +kotlin-plugin-allopen = { id = "org.jetbrains.kotlin.plugin.allopen", version.ref = "kotlin" } kotlinx-benchmark = { id = "org.jetbrains.kotlinx.benchmark", version.ref = "kotlinx-benchmark" } diff --git a/settings.gradle.kts b/settings.gradle.kts index e278615f..cf95187d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -56,4 +56,11 @@ projects("rsocket-kotlin") { module("rsocket-ktor-client") module("rsocket-ktor-server") } + + folder("benchmarks") { + module("shared") + module("rsocket-kotlin-old") + module("rsocket-kotlin") + module("rsocket-java") + } }