diff --git a/rsocket-core/build.gradle.kts b/rsocket-core/build.gradle.kts index d1a0b957..f7bfabfb 100644 --- a/rsocket-core/build.gradle.kts +++ b/rsocket-core/build.gradle.kts @@ -24,6 +24,8 @@ kotlin { sourceSets { commonMain { dependencies { + implementation(projects.rsocketInternalIo) + api(libs.kotlinx.coroutines.core) api(libs.ktor.io) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt index df648af8..b1144874 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,17 @@ package io.rsocket.kotlin.frame.io import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* internal fun ByteReadPacket.readMetadata(pool: ObjectPool): ByteReadPacket { - val length = readLength() + val length = readInt24() return readPacket(pool, length) } internal fun BytePacketBuilder.writeMetadata(metadata: ByteReadPacket?) { metadata?.let { - writeLength(it.remaining.toInt()) + writeInt24(it.remaining.toInt()) writePacket(it) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt index 80cb16d4..83a228e6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,17 +28,6 @@ internal inline fun T.closeOnError(block: (T) -> R): R { } } -private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close - -@Suppress("FunctionName") -internal fun SafeChannel(capacity: Int): Channel = - Channel(capacity, onUndeliveredElement = onUndeliveredCloseable) - internal fun SendChannel.safeTrySend(element: E) { trySend(element).onFailure { element.close() } } - -internal fun Channel.fullClose(cause: Throwable?) { - close(cause) // close channel to provide right cause - cancel() // force call of onUndeliveredElement to release buffered elements -} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt index abeaab68..59d55511 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.rsocket.kotlin.internal import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.io.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* @@ -24,8 +25,8 @@ import kotlinx.coroutines.selects.* private val selectFrame: suspend (Frame) -> Frame = { it } internal class Prioritizer { - private val priorityChannel = SafeChannel(Channel.UNLIMITED) - private val commonChannel = SafeChannel(Channel.UNLIMITED) + private val priorityChannel = channelForCloseable(Channel.UNLIMITED) + private val commonChannel = channelForCloseable(Channel.UNLIMITED) suspend fun send(frame: Frame) { currentCoroutineContext().ensureActive() @@ -43,7 +44,7 @@ internal class Prioritizer { } fun close(error: Throwable?) { - priorityChannel.fullClose(error) - commonChannel.fullClose(error) + priorityChannel.cancelWithCause(error) + commonChannel.cancelWithCause(error) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index 7959b2d4..edf2fbca 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.handler.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -77,7 +78,7 @@ internal class RSocketRequester( val id = streamsStorage.nextId() - val channel = SafeChannel(Channel.UNLIMITED) + val channel = channelForCloseable(Channel.UNLIMITED) val handler = RequesterRequestStreamFrameHandler(id, streamsStorage, channel, pool) streamsStorage.save(id, handler) @@ -93,7 +94,7 @@ internal class RSocketRequester( val id = streamsStorage.nextId() - val channel = SafeChannel(Channel.UNLIMITED) + val channel = channelForCloseable(Channel.UNLIMITED) val limiter = Limiter(0) val payloadsJob = Job(this@RSocketRequester.coroutineContext.job) val handler = RequesterRequestChannelFrameHandler(id, streamsStorage, limiter, payloadsJob, channel, pool) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt index 1c3c1c78..ad5f007a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -42,7 +43,7 @@ internal class RequesterRequestChannelFrameHandler( override fun handleError(cause: Throwable) { streamsStorage.remove(id) - channel.fullClose(cause) + channel.cancelWithCause(cause) sender.cancel("Request failed", cause) } @@ -55,7 +56,7 @@ internal class RequesterRequestChannelFrameHandler( } override fun cleanup(cause: Throwable?) { - channel.fullClose(cause) + channel.cancelWithCause(cause) sender.cancel("Connection closed", cause) } @@ -78,7 +79,7 @@ internal class RequesterRequestChannelFrameHandler( if (sender.isCancelled) return false val isFailed = streamsStorage.remove(id) != null - if (isFailed) channel.fullClose(cause) + if (isFailed) channel.cancelWithCause(cause) return isFailed } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt index dc85935b..fbc1bc32 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.channels.* @@ -39,11 +40,11 @@ internal class RequesterRequestStreamFrameHandler( override fun handleError(cause: Throwable) { streamsStorage.remove(id) - channel.fullClose(cause) + channel.cancelWithCause(cause) } override fun cleanup(cause: Throwable?) { - channel.fullClose(cause) + channel.cancelWithCause(cause) } override fun onReceiveComplete() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt index f2d81b4b..45be9a63 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -32,7 +33,7 @@ internal class ResponderRequestChannelFrameHandler( pool: ObjectPool ) : ResponderFrameHandler(pool), ReceiveFrameHandler { val limiter = Limiter(initialRequest) - val channel = SafeChannel(Channel.UNLIMITED) + val channel = channelForCloseable(Channel.UNLIMITED) @OptIn(ExperimentalStreamsApi::class) override fun start(payload: Payload): Job = responder.handleRequestChannel(payload, id, this) @@ -47,13 +48,13 @@ internal class ResponderRequestChannelFrameHandler( override fun handleError(cause: Throwable) { streamsStorage.remove(id) - channel.fullClose(cause) + channel.cancelWithCause(cause) } override fun handleCancel() { streamsStorage.remove(id) val cancelError = CancellationException("Request cancelled") - channel.fullClose(cancelError) + channel.cancelWithCause(cancelError) job?.cancel(cancelError) } @@ -62,7 +63,7 @@ internal class ResponderRequestChannelFrameHandler( } override fun cleanup(cause: Throwable?) { - channel.fullClose(cause) + channel.cancelWithCause(cause) } override fun onSendComplete() { @@ -72,7 +73,7 @@ internal class ResponderRequestChannelFrameHandler( override fun onSendFailed(cause: Throwable): Boolean { val isFailed = streamsStorage.remove(id) != null - if (isFailed) channel.fullClose(cause) + if (isFailed) channel.cancelWithCause(cause) return isFailed } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt index ad0854d7..3b40dd82 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.io.* @ExperimentalMetadataApi public fun CompositeMetadata(vararg entries: Metadata): CompositeMetadata = @@ -39,7 +40,7 @@ public sealed interface CompositeMetadata : Metadata { override fun BytePacketBuilder.writeSelf() { entries.forEach { writeMimeType(it.mimeType) - writeLength(it.content.remaining.toInt()) //write metadata length + writeInt24(it.content.remaining.toInt()) //write metadata length writePacket(it.content) //write metadata content } } @@ -58,7 +59,7 @@ public sealed interface CompositeMetadata : Metadata { val list = mutableListOf() while (isNotEmpty) { val type = readMimeType() - val length = readLength() + val length = readInt24() val packet = readPacket(pool, length) list.add(Entry(type, packet)) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt index 3b8f77e1..31ebe822 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt @@ -21,7 +21,7 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* -import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* @@ -37,13 +37,13 @@ class TestConnection : Connection, ClientTransport { override val coroutineContext: CoroutineContext = Job() + Dispatchers.Unconfined + TestExceptionHandler - private val sendChannel = Channel(Channel.UNLIMITED) - private val receiveChannel = Channel(Channel.UNLIMITED) + private val sendChannel = channelForCloseable(Channel.UNLIMITED) + private val receiveChannel = channelForCloseable(Channel.UNLIMITED) init { coroutineContext.job.invokeOnCompletion { sendChannel.close(it) - receiveChannel.fullClose(it) + receiveChannel.cancelWithCause(it) } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt index 1e5dab41..9c14011b 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +18,18 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.test.* import kotlin.test.* internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) { val packet = toPacket(InUseTrackingPool) - writeLength(packet.remaining.toInt()) + writeInt24(packet.remaining.toInt()) writePacket(packet) } internal fun ByteReadPacket.toFrameWithLength(): Frame { - val length = readLength() + val length = readInt24() assertEquals(length, remaining.toInt()) return readFrame(InUseTrackingPool) } diff --git a/rsocket-internal-io/api/rsocket-internal-io.api b/rsocket-internal-io/api/rsocket-internal-io.api new file mode 100644 index 00000000..b51a5afd --- /dev/null +++ b/rsocket-internal-io/api/rsocket-internal-io.api @@ -0,0 +1,10 @@ +public final class io/rsocket/kotlin/internal/io/ChannelsKt { + public static final fun cancelWithCause (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Throwable;)V + public static final fun channelForCloseable (I)Lkotlinx/coroutines/channels/Channel; +} + +public final class io/rsocket/kotlin/internal/io/Int24Kt { + public static final fun readInt24 (Lio/ktor/utils/io/core/ByteReadPacket;)I + public static final fun writeInt24 (Lio/ktor/utils/io/core/BytePacketBuilder;I)V +} + diff --git a/rsocket-internal-io/build.gradle.kts b/rsocket-internal-io/build.gradle.kts new file mode 100644 index 00000000..d636b4d7 --- /dev/null +++ b/rsocket-internal-io/build.gradle.kts @@ -0,0 +1,33 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("rsocket.template.library") + id("rsocket.target.all") +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api(libs.kotlinx.coroutines.core) + api(libs.ktor.io) + } + } + } +} + +description = "rsocket-kotlin internal IO support" diff --git a/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt new file mode 100644 index 00000000..72718d97 --- /dev/null +++ b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Channels.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.io + +import io.ktor.utils.io.core.* +import kotlinx.coroutines.channels.* + +private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close + +public fun channelForCloseable(capacity: Int): Channel = + Channel(capacity, onUndeliveredElement = onUndeliveredCloseable) + +public fun Channel.cancelWithCause(cause: Throwable?) { + close(cause) // close channel to provide right cause + cancel() // force call of onUndeliveredElement to release buffered elements +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Int24.kt similarity index 83% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt rename to rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Int24.kt index 6dcc5e1d..ef439d5a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt +++ b/rsocket-internal-io/src/commonMain/kotlin/io/rsocket/kotlin/internal/io/Int24.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,20 +14,20 @@ * limitations under the License. */ -package io.rsocket.kotlin.frame.io +package io.rsocket.kotlin.internal.io import io.ktor.utils.io.core.* private const val lengthMask: Int = 0xFFFFFF.inv() -internal fun ByteReadPacket.readLength(): Int { +public fun ByteReadPacket.readInt24(): Int { val b = readByte().toInt() and 0xFF shl 16 val b1 = readByte().toInt() and 0xFF shl 8 val b2 = readByte().toInt() and 0xFF return b or b1 or b2 } -internal fun BytePacketBuilder.writeLength(length: Int) { +public fun BytePacketBuilder.writeInt24(length: Int) { require(length and lengthMask == 0) { "Length is larger than 24 bits" } writeByte((length shr 16).toByte()) writeByte((length shr 8).toByte()) diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts index 5f9b346b..4ed03fb4 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts @@ -24,6 +24,8 @@ kotlin { sourceSets { commonMain { dependencies { + implementation(projects.rsocketInternalIo) + api(projects.rsocketTransportKtor) api(libs.ktor.network) } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt index 1aa74f79..1fb18421 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpConnection.kt @@ -24,8 +24,7 @@ import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.Connection -import io.rsocket.kotlin.frame.io.* -import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import kotlinx.coroutines.* import kotlin.coroutines.* @@ -37,8 +36,8 @@ internal class TcpConnection( ) : Connection { private val socketConnection = socket.connection() - private val sendChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(8) - private val receiveChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(8) + private val sendChannel = channelForCloseable(8) + private val receiveChannel = channelForCloseable(8) init { launch { @@ -48,7 +47,7 @@ internal class TcpConnection( val length = packet.remaining.toInt() try { writePacket { - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") writeLength(length) + writeInt24(length) writePacket(packet) } flush() @@ -62,7 +61,7 @@ internal class TcpConnection( launch { socketConnection.input.apply { while (isActive) { - val length = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") readPacket(3).readLength() + val length = readPacket(3).readInt24() val packet = readPacket(length) try { receiveChannel.send(packet) @@ -74,8 +73,8 @@ internal class TcpConnection( } } coroutineContext.job.invokeOnCompletion { - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") sendChannel.fullClose(it) - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") receiveChannel.fullClose(it) + sendChannel.cancelWithCause(it) + receiveChannel.cancelWithCause(it) socketConnection.input.cancel(it) socketConnection.output.close(it) socketConnection.socket.close() diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts index 1cb9d0df..fd581fb3 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts @@ -23,7 +23,6 @@ kotlin { sourceSets { commonMain { dependencies { - api(projects.rsocketCore) api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket) api(libs.ktor.client.core) api(libs.ktor.client.websockets) diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts index 4c7cac81..f28d6934 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts @@ -24,7 +24,6 @@ kotlin { sourceSets { commonMain { dependencies { - api(projects.rsocketCore) api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket) api(libs.ktor.server.host.common) api(libs.ktor.server.websockets) diff --git a/rsocket-transport-local/build.gradle.kts b/rsocket-transport-local/build.gradle.kts index 39f5d38e..caa16c4d 100644 --- a/rsocket-transport-local/build.gradle.kts +++ b/rsocket-transport-local/build.gradle.kts @@ -23,6 +23,8 @@ kotlin { sourceSets { commonMain { dependencies { + implementation(projects.rsocketInternalIo) + api(projects.rsocketCore) } } diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt index ee6cb3b3..e0a34f93 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt @@ -23,14 +23,14 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.coroutines.* public fun LocalServerTransport( - pool: ObjectPool = ChunkBuffer.Pool + pool: ObjectPool = ChunkBuffer.Pool, ): ServerTransport = ServerTransport { accept -> val connections = Channel() val handlerJob = launch { @@ -46,15 +46,15 @@ public fun LocalServerTransport( public class LocalServer internal constructor( private val pool: ObjectPool, private val connections: Channel, - override val coroutineContext: CoroutineContext + override val coroutineContext: CoroutineContext, ) : ClientTransport { override suspend fun connect(): Connection { - val clientChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(Channel.UNLIMITED) - val serverChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(Channel.UNLIMITED) + val clientChannel = channelForCloseable(Channel.UNLIMITED) + val serverChannel = channelForCloseable(Channel.UNLIMITED) val connectionJob = Job(coroutineContext[Job]) connectionJob.invokeOnCompletion { - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") clientChannel.fullClose(it) - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") serverChannel.fullClose(it) + clientChannel.cancelWithCause(it) + serverChannel.cancelWithCause(it) } val connectionContext = coroutineContext + connectionJob val clientConnection = LocalConnection( diff --git a/rsocket-transport-nodejs-tcp/build.gradle.kts b/rsocket-transport-nodejs-tcp/build.gradle.kts index d0570b3d..3a0137ec 100644 --- a/rsocket-transport-nodejs-tcp/build.gradle.kts +++ b/rsocket-transport-nodejs-tcp/build.gradle.kts @@ -23,6 +23,8 @@ kotlin { sourceSets { jsMain { dependencies { + implementation(projects.rsocketInternalIo) + api(projects.rsocketCore) } } diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt index c22d4c48..5f40d17b 100644 --- a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt @@ -17,10 +17,10 @@ package io.rsocket.kotlin.transport.nodejs.tcp import io.ktor.utils.io.core.* -import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.io.* internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket { - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt()) + writeInt24(this@withLength.remaining.toInt()) writePacket(this@withLength) } @@ -36,7 +36,7 @@ internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPac while (true) when { expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length expectedFrameLength == 0 -> withTemp { // has length - expectedFrameLength = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") it.readLength() + expectedFrameLength = it.readInt24() if (it.remaining >= expectedFrameLength) build(it) // if has length and frame } packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt index c08c1890..4d49201b 100644 --- a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt @@ -21,7 +21,7 @@ import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.js.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* import io.rsocket.kotlin.transport.nodejs.tcp.internal.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* @@ -32,11 +32,11 @@ import kotlin.coroutines.* internal class TcpConnection( override val coroutineContext: CoroutineContext, override val pool: ObjectPool, - private val socket: Socket + private val socket: Socket, ) : Connection { - private val sendChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(8) - private val receiveChannel = @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") SafeChannel(Channel.UNLIMITED) + private val sendChannel = channelForCloseable(8) + private val receiveChannel = channelForCloseable(Channel.UNLIMITED) init { launch { diff --git a/settings.gradle.kts b/settings.gradle.kts index 5da1fcb8..8dc48565 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -50,6 +50,8 @@ rootProject.name = "rsocket-kotlin" //include("benchmarks") +include("rsocket-internal-io") + include("rsocket-core") include("rsocket-test")