Skip to content

Commit

Permalink
migrate ktor websocket transport to new API
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Apr 11, 2024
1 parent 501aef0 commit f664dc9
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory;
public abstract fun target (Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public abstract fun target (Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public abstract fun target (Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;Lio/ktor/http/HttpMethod;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;)V
public abstract fun httpEngine (Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;)V
public abstract fun httpEngine (Lkotlin/jvm/functions/Function1;)V
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngine;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/client/KtorWebSocketClientTransportBuilder;Lio/ktor/client/engine/HttpClientEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
}

public final class io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransportKt {
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;Ljava/lang/Integer;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;
public static final fun WebSocketClientTransport (Lio/ktor/client/engine/HttpClientEngineFactory;Ljava/lang/String;ZLkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ClientTransport;
Expand Down
1 change: 1 addition & 0 deletions rsocket-transports/ktor-websocket-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ kotlin {
sourceSets {
commonMain.dependencies {
implementation(projects.rsocketTransportKtorWebsocketInternal)
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.ktor.client.core)
api(libs.ktor.client.websockets)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.client

import io.ktor.client.*
import io.ktor.client.engine.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface KtorWebSocketClientTransport : RSocketTransport {
public fun target(request: HttpRequestBuilder.() -> Unit): RSocketClientTarget
public fun target(urlString: String, request: HttpRequestBuilder.() -> Unit = {}): RSocketClientTarget

public fun target(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
): RSocketClientTarget

public companion object Factory :
RSocketTransportFactory<KtorWebSocketClientTransport, KtorWebSocketClientTransportBuilder>(::KtorWebSocketClientTransportBuilderImpl)
}

public sealed interface KtorWebSocketClientTransportBuilder : RSocketTransportBuilder<KtorWebSocketClientTransport> {
public fun httpEngine(configure: HttpClientEngineConfig.() -> Unit)
public fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit = {})
public fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit = {})

public fun webSocketsConfig(block: WebSockets.Config.() -> Unit)
}

private class KtorWebSocketClientTransportBuilderImpl : KtorWebSocketClientTransportBuilder {
private var httpClientFactory: HttpClientFactory = HttpClientFactory.Default
private var webSocketsConfig: WebSockets.Config.() -> Unit = {}

override fun httpEngine(configure: HttpClientEngineConfig.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromConfiguration(configure)
}

override fun httpEngine(engine: HttpClientEngine, configure: HttpClientEngineConfig.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromEngine(engine, configure)
}

override fun <T : HttpClientEngineConfig> httpEngine(factory: HttpClientEngineFactory<T>, configure: T.() -> Unit) {
this.httpClientFactory = HttpClientFactory.FromFactory(factory, configure)
}

override fun webSocketsConfig(block: WebSockets.Config.() -> Unit) {
this.webSocketsConfig = block
}

@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): KtorWebSocketClientTransport {
val httpClient = httpClientFactory.createHttpClient {
install(WebSockets, webSocketsConfig)
}
// only dispatcher of a client is used - it looks like it's Dispatchers.IO now
val newContext = context.supervisorContext() + (httpClient.coroutineContext[ContinuationInterceptor] ?: EmptyCoroutineContext)
val newJob = newContext.job
val httpClientJob = httpClient.coroutineContext.job

httpClientJob.invokeOnCompletion { newJob.cancel("HttpClient closed", it) }
newJob.invokeOnCompletion { httpClientJob.cancel("KtorWebSocketClientTransport closed", it) }

return KtorWebSocketClientTransportImpl(
coroutineContext = newContext,
httpClient = httpClient,
)
}
}

private class KtorWebSocketClientTransportImpl(
override val coroutineContext: CoroutineContext,
private val httpClient: HttpClient,
) : KtorWebSocketClientTransport {
override fun target(request: HttpRequestBuilder.() -> Unit): RSocketClientTarget = KtorWebSocketClientTargetImpl(
coroutineContext = coroutineContext,
httpClient = httpClient,
request = request
)

override fun target(
urlString: String,
request: HttpRequestBuilder.() -> Unit,
): RSocketClientTarget = target(
method = HttpMethod.Get, host = null, port = null, path = null,
request = {
url.protocol = URLProtocol.WS
url.port = port

url.takeFrom(urlString)
request()
},
)

override fun target(
method: HttpMethod,
host: String?,
port: Int?,
path: String?,
request: HttpRequestBuilder.() -> Unit,
): RSocketClientTarget = target {
this.method = method
url("ws", host, port, path)
request()
}
}

private class KtorWebSocketClientTargetImpl(
override val coroutineContext: CoroutineContext,
private val httpClient: HttpClient,
private val request: HttpRequestBuilder.() -> Unit,
) : RSocketClientTarget {

@RSocketTransportApi
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
httpClient.webSocket(request) {
handler.handleKtorWebSocketConnection(this)
}
}
}

private sealed class HttpClientFactory {
abstract fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient

object Default : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(block)
}

class FromConfiguration(
private val configure: HttpClientEngineConfig.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient {
engine(configure)
block()
}
}

class FromEngine(
private val engine: HttpClientEngine,
private val configure: HttpClientEngineConfig.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(engine) {
engine(configure)
block()
}
}

class FromFactory<T : HttpClientEngineConfig>(
private val factory: HttpClientEngineFactory<T>,
private val configure: T.() -> Unit,
) : HttpClientFactory() {
override fun createHttpClient(block: HttpClientConfig<*>.() -> Unit): HttpClient = HttpClient(factory) {
engine(configure)
block()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
public final class io/rsocket/kotlin/transport/ktor/websocket/internal/KtorWebSocketConnectionKt {
public static final fun handleKtorWebSocketConnection (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lio/ktor/websocket/WebSocketSession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/internal/WebSocketConnection : io/rsocket/kotlin/Connection, kotlinx/coroutines/CoroutineScope {
public fun <init> (Lio/ktor/websocket/WebSocketSession;)V
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ kotlin {

sourceSets {
commonMain.dependencies {
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.ktor.websockets)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.internal

import io.ktor.utils.io.core.*
import io.ktor.websocket.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

@RSocketTransportApi
public suspend fun RSocketConnectionHandler.handleKtorWebSocketConnection(webSocketSession: WebSocketSession): Unit = coroutineScope {
val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)

val senderJob = launch {
while (true) webSocketSession.send(outboundQueue.dequeueFrame()?.readBytes() ?: break)
}.onCompletion { outboundQueue.cancel() }

try {
handleConnection(KtorWebSocketConnection(outboundQueue, webSocketSession.incoming))
} finally {
webSocketSession.incoming.cancel()
outboundQueue.close()
withContext(NonCancellable) {
senderJob.join() // await all frames sent
webSocketSession.close()
webSocketSession.coroutineContext.job.join()
}
}
}

@RSocketTransportApi
private class KtorWebSocketConnection(
private val outboundQueue: PrioritizationFrameQueue,
private val inbound: ReceiveChannel<Frame>,
) : RSocketSequentialConnection {
override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend

override suspend fun sendFrame(streamId: Int, frame: ByteReadPacket) {
return outboundQueue.enqueueFrame(streamId, frame)
}

override suspend fun receiveFrame(): ByteReadPacket? {
val frame = inbound.receiveCatching().getOrNull() ?: return null
return ByteReadPacket(frame.data)
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
public abstract fun getConnectors ()Ljava/util/List;
public abstract fun getPath ()Ljava/lang/String;
public abstract fun getProtocol ()Ljava/lang/String;
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory;
public abstract fun target (Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public abstract fun target (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public abstract fun target (Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public abstract fun target (Ljava/util/List;Ljava/lang/String;Ljava/lang/String;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport;Ljava/util/List;Ljava/lang/String;Ljava/lang/String;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
}

public final class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun httpEngine (Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;)V
public static synthetic fun httpEngine$default (Lio/rsocket/kotlin/transport/ktor/websocket/server/KtorWebSocketServerTransportBuilder;Lio/ktor/server/engine/ApplicationEngineFactory;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)V
public abstract fun webSocketsConfig (Lkotlin/jvm/functions/Function1;)V
}

public final class io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransportKt {
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;
public static final fun WebSocketServerTransport (Lio/ktor/server/engine/ApplicationEngineFactory;[Lio/ktor/server/engine/EngineConnectorConfig;Ljava/lang/String;Ljava/lang/String;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/ServerTransport;
Expand Down
1 change: 1 addition & 0 deletions rsocket-transports/ktor-websocket-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ kotlin {
sourceSets {
commonMain.dependencies {
implementation(projects.rsocketTransportKtorWebsocketInternal)
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.ktor.server.host.common)
api(libs.ktor.server.websockets)
Expand Down
Loading

0 comments on commit f664dc9

Please sign in to comment.