Skip to content

Commit

Permalink
Introduce new module rsocket-internal-io which is a support module …
Browse files Browse the repository at this point in the history
…for core and transport implementations (#247)

This allows to drop using internal declarations, as well as not exposing it to library consumers, until explicitly requested
  • Loading branch information
whyoleg authored Mar 2, 2024
1 parent e5e292b commit 45f7158
Show file tree
Hide file tree
Showing 25 changed files with 151 additions and 74 deletions.
2 changes: 2 additions & 0 deletions rsocket-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ kotlin {
sourceSets {
commonMain {
dependencies {
implementation(projects.rsocketInternalIo)

api(libs.kotlinx.coroutines.core)
api(libs.ktor.io)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,16 +19,17 @@ package io.rsocket.kotlin.frame.io
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*

internal fun ByteReadPacket.readMetadata(pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
val length = readLength()
val length = readInt24()
return readPacket(pool, length)
}

internal fun BytePacketBuilder.writeMetadata(metadata: ByteReadPacket?) {
metadata?.let {
writeLength(it.remaining.toInt())
writeInt24(it.remaining.toInt())
writePacket(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,17 +28,6 @@ internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
}
}

private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close

@Suppress("FunctionName")
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> =
Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)

internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
trySend(element).onFailure { element.close() }
}

internal fun Channel<out Closeable>.fullClose(cause: Throwable?) {
close(cause) // close channel to provide right cause
cancel() // force call of onUndeliveredElement to release buffered elements
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,15 +17,16 @@
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

private val selectFrame: suspend (Frame) -> Frame = { it }

internal class Prioritizer {
private val priorityChannel = SafeChannel<Frame>(Channel.UNLIMITED)
private val commonChannel = SafeChannel<Frame>(Channel.UNLIMITED)
private val priorityChannel = channelForCloseable<Frame>(Channel.UNLIMITED)
private val commonChannel = channelForCloseable<Frame>(Channel.UNLIMITED)

suspend fun send(frame: Frame) {
currentCoroutineContext().ensureActive()
Expand All @@ -43,7 +44,7 @@ internal class Prioritizer {
}

fun close(error: Throwable?) {
priorityChannel.fullClose(error)
commonChannel.fullClose(error)
priorityChannel.cancelWithCause(error)
commonChannel.cancelWithCause(error)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.handler.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand Down Expand Up @@ -77,7 +78,7 @@ internal class RSocketRequester(

val id = streamsStorage.nextId()

val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)
val handler = RequesterRequestStreamFrameHandler(id, streamsStorage, channel, pool)
streamsStorage.save(id, handler)

Expand All @@ -93,7 +94,7 @@ internal class RSocketRequester(

val id = streamsStorage.nextId()

val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)
val limiter = Limiter(0)
val payloadsJob = Job(this@RSocketRequester.coroutineContext.job)
val handler = RequesterRequestChannelFrameHandler(id, streamsStorage, limiter, payloadsJob, channel, pool)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand All @@ -42,7 +43,7 @@ internal class RequesterRequestChannelFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
sender.cancel("Request failed", cause)
}

Expand All @@ -55,7 +56,7 @@ internal class RequesterRequestChannelFrameHandler(
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
sender.cancel("Connection closed", cause)
}

Expand All @@ -78,7 +79,7 @@ internal class RequesterRequestChannelFrameHandler(
if (sender.isCancelled) return false

val isFailed = streamsStorage.remove(id) != null
if (isFailed) channel.fullClose(cause)
if (isFailed) channel.cancelWithCause(cause)
return isFailed
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.channels.*

Expand All @@ -39,11 +40,11 @@ internal class RequesterRequestStreamFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun onReceiveComplete() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand All @@ -32,7 +33,7 @@ internal class ResponderRequestChannelFrameHandler(
pool: ObjectPool<ChunkBuffer>
) : ResponderFrameHandler(pool), ReceiveFrameHandler {
val limiter = Limiter(initialRequest)
val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)

@OptIn(ExperimentalStreamsApi::class)
override fun start(payload: Payload): Job = responder.handleRequestChannel(payload, id, this)
Expand All @@ -47,13 +48,13 @@ internal class ResponderRequestChannelFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun handleCancel() {
streamsStorage.remove(id)
val cancelError = CancellationException("Request cancelled")
channel.fullClose(cancelError)
channel.cancelWithCause(cancelError)
job?.cancel(cancelError)
}

Expand All @@ -62,7 +63,7 @@ internal class ResponderRequestChannelFrameHandler(
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun onSendComplete() {
Expand All @@ -72,7 +73,7 @@ internal class ResponderRequestChannelFrameHandler(

override fun onSendFailed(cause: Throwable): Boolean {
val isFailed = streamsStorage.remove(id) != null
if (isFailed) channel.fullClose(cause)
if (isFailed) channel.cancelWithCause(cause)
return isFailed
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

@ExperimentalMetadataApi
public fun CompositeMetadata(vararg entries: Metadata): CompositeMetadata =
Expand All @@ -39,7 +40,7 @@ public sealed interface CompositeMetadata : Metadata {
override fun BytePacketBuilder.writeSelf() {
entries.forEach {
writeMimeType(it.mimeType)
writeLength(it.content.remaining.toInt()) //write metadata length
writeInt24(it.content.remaining.toInt()) //write metadata length
writePacket(it.content) //write metadata content
}
}
Expand All @@ -58,7 +59,7 @@ public sealed interface CompositeMetadata : Metadata {
val list = mutableListOf<Entry>()
while (isNotEmpty) {
val type = readMimeType()
val length = readLength()
val length = readInt24()
val packet = readPacket(pool, length)
list.add(Entry(type, packet))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
Expand All @@ -37,13 +37,13 @@ class TestConnection : Connection, ClientTransport {
override val coroutineContext: CoroutineContext =
Job() + Dispatchers.Unconfined + TestExceptionHandler

private val sendChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
private val receiveChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
private val sendChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)
private val receiveChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

init {
coroutineContext.job.invokeOnCompletion {
sendChannel.close(it)
receiveChannel.fullClose(it)
receiveChannel.cancelWithCause(it)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,17 +18,18 @@ package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.test.*
import kotlin.test.*

internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
val packet = toPacket(InUseTrackingPool)
writeLength(packet.remaining.toInt())
writeInt24(packet.remaining.toInt())
writePacket(packet)
}

internal fun ByteReadPacket.toFrameWithLength(): Frame {
val length = readLength()
val length = readInt24()
assertEquals(length, remaining.toInt())
return readFrame(InUseTrackingPool)
}
Expand Down
10 changes: 10 additions & 0 deletions rsocket-internal-io/api/rsocket-internal-io.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
public final class io/rsocket/kotlin/internal/io/ChannelsKt {
public static final fun cancelWithCause (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Throwable;)V
public static final fun channelForCloseable (I)Lkotlinx/coroutines/channels/Channel;
}

public final class io/rsocket/kotlin/internal/io/Int24Kt {
public static final fun readInt24 (Lio/ktor/utils/io/core/ByteReadPacket;)I
public static final fun writeInt24 (Lio/ktor/utils/io/core/BytePacketBuilder;I)V
}

33 changes: 33 additions & 0 deletions rsocket-internal-io/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("rsocket.template.library")
id("rsocket.target.all")
}

kotlin {
sourceSets {
commonMain {
dependencies {
api(libs.kotlinx.coroutines.core)
api(libs.ktor.io)
}
}
}
}

description = "rsocket-kotlin internal IO support"
Loading

0 comments on commit 45f7158

Please sign in to comment.