Skip to content

Commit

Permalink
netty TCP and WS
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Apr 12, 2024
1 parent ad2f644 commit d80c65c
Show file tree
Hide file tree
Showing 19 changed files with 1,486 additions and 0 deletions.
12 changes: 12 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ kotlinx-bcv = "0.14.0"

ktor = "2.3.8"

netty = "4.1.107.Final"
netty-quic = "0.0.60.Final"

# for netty TLS tests
bouncycastle = "1.77"

turbine = "1.0.0"

rsocket-java = "1.1.3"
Expand Down Expand Up @@ -44,6 +50,12 @@ ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor" }

netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" }
netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" }
netty-codec-quic = { module = "io.netty.incubator:netty-incubator-codec-native-quic", version.ref = "netty-quic" }

bouncycastle = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" }

turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" }

rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
public final class io/rsocket/kotlin/transport/netty/internal/CoroutinesKt {
public static final fun awaitFuture (Lio/netty/util/concurrent/Future;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun callOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;)V
}

35 changes: 35 additions & 0 deletions rsocket-transports/netty-internal/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rsocketbuild.*

plugins {
id("rsocketbuild.multiplatform-library")
}

description = "rsocket-kotlin Netty transport utils"

kotlin {
jvmTarget()

sourceSets {
jvmMain.dependencies {
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.netty.handler)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.internal

import io.netty.util.concurrent.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

@Suppress("UNCHECKED_CAST")
public suspend inline fun <T> Future<T>.awaitFuture(): T = suspendCancellableCoroutine { cont ->
addListener {
when {
it.isSuccess -> cont.resume(it.now as T)
else -> cont.resumeWithException(it.cause())
}
}
cont.invokeOnCancellation {
cancel(true)
}
}

// it should be used only for cleanup and so should not really block, only suspend
public inline fun CoroutineScope.callOnCancellation(crossinline block: suspend () -> Unit) {
launch(Dispatchers.Unconfined) {
try {
awaitCancellation()
} catch (cause: Throwable) {
withContext(NonCancellable) {
try {
block()
} catch (suppressed: Throwable) {
cause.addSuppressed(suppressed)
}
}
throw cause
}
}
}
41 changes: 41 additions & 0 deletions rsocket-transports/netty-tcp/api/rsocket-transport-netty-tcp.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory;
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public abstract fun target (Ljava/net/SocketAddress;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
public abstract fun getLocalAddress ()Ljava/net/SocketAddress;
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory;
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public abstract fun target (Ljava/net/SocketAddress;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport;Ljava/net/SocketAddress;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
}

public final class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

40 changes: 40 additions & 0 deletions rsocket-transports/netty-tcp/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rsocketbuild.*

plugins {
id("rsocketbuild.multiplatform-library")
}

description = "rsocket-kotlin Netty TCP client/server transport implementation"

kotlin {
jvmTarget()

sourceSets {
jvmMain.dependencies {
implementation(projects.rsocketTransportNettyInternal)
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.netty.handler)
}
jvmTest.dependencies {
implementation(projects.rsocketTransportTests)
implementation(libs.bouncycastle)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.tcp

import io.netty.channel.*
import io.netty.channel.socket.*
import io.netty.handler.codec.*
import io.netty.handler.ssl.*
import java.net.*

internal open class NettyTcpChannelInitializer(
private val sslContext: SslContext?,
private val remoteAddress: InetSocketAddress?,
) : ChannelInitializer<DuplexChannel>() {
override fun initChannel(ch: DuplexChannel): Unit = with(ch.pipeline()) {
if (sslContext != null) {
addLast(
"ssl",
when {
remoteAddress != null -> sslContext.newHandler(ch.alloc(), remoteAddress.hostName, remoteAddress.port)
else -> sslContext.newHandler(ch.alloc())
}
)
}
addLast(
"rsocket-length-encoder",
LengthFieldPrepender(
/* lengthFieldLength = */ 3
)
)
addLast(
"rsocket-length-decoder",
LengthFieldBasedFrameDecoder(
/* maxFrameLength = */ kotlin.Int.MAX_VALUE,
/* lengthFieldOffset = */ 0,
/* lengthFieldLength = */ 3,
/* lengthAdjustment = */ 0,
/* initialBytesToStrip = */ 3
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.tcp

import io.netty.bootstrap.*
import io.netty.channel.*
import io.netty.channel.ChannelFactory
import io.netty.channel.nio.*
import io.netty.channel.socket.*
import io.netty.channel.socket.nio.*
import io.netty.handler.ssl.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.netty.internal.*
import kotlinx.coroutines.*
import java.net.*
import kotlin.coroutines.*
import kotlin.reflect.*

public sealed interface NettyTcpClientTransport : RSocketTransport {
public fun target(remoteAddress: SocketAddress): RSocketClientTarget
public fun target(host: String, port: Int): RSocketClientTarget

public companion object Factory :
RSocketTransportFactory<NettyTcpClientTransport, NettyTcpClientTransportBuilder>(::NettyTcpClientTransportBuilderImpl)
}

public sealed interface NettyTcpClientTransportBuilder : RSocketTransportBuilder<NettyTcpClientTransport> {
public fun channel(cls: KClass<out DuplexChannel>)
public fun channelFactory(factory: ChannelFactory<out DuplexChannel>)
public fun eventLoopGroup(group: EventLoopGroup, manage: Boolean)

public fun bootstrap(block: Bootstrap.() -> Unit)
public fun ssl(block: SslContextBuilder.() -> Unit)
}

private class NettyTcpClientTransportBuilderImpl : NettyTcpClientTransportBuilder {
private var channelFactory: ChannelFactory<out DuplexChannel>? = null
private var eventLoopGroup: EventLoopGroup? = null
private var manageEventLoopGroup: Boolean = true
private var bootstrap: (Bootstrap.() -> Unit)? = null
private var ssl: (SslContextBuilder.() -> Unit)? = null

override fun channel(cls: KClass<out DuplexChannel>) {
this.channelFactory = ReflectiveChannelFactory(cls.java)
}

override fun channelFactory(factory: ChannelFactory<out DuplexChannel>) {
this.channelFactory = factory
}

override fun eventLoopGroup(group: EventLoopGroup, manage: Boolean) {
this.eventLoopGroup = group
this.manageEventLoopGroup = manage
}

override fun bootstrap(block: Bootstrap.() -> Unit) {
bootstrap = block
}

override fun ssl(block: SslContextBuilder.() -> Unit) {
ssl = block
}

@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): NettyTcpClientTransport {
val sslContext = ssl?.let {
SslContextBuilder.forClient().apply(it).build()
}

val bootstrap = Bootstrap().apply {
bootstrap?.invoke(this)
channelFactory(channelFactory ?: ReflectiveChannelFactory(NioSocketChannel::class.java))
group(eventLoopGroup ?: NioEventLoopGroup())
}

return NettyTcpClientTransportImpl(
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
sslContext = sslContext,
bootstrap = bootstrap,
manageBootstrap = manageEventLoopGroup
)
}
}

private class NettyTcpClientTransportImpl(
override val coroutineContext: CoroutineContext,
private val sslContext: SslContext?,
private val bootstrap: Bootstrap,
manageBootstrap: Boolean,
) : NettyTcpClientTransport {
init {
if (manageBootstrap) callOnCancellation {
bootstrap.config().group().shutdownGracefully().awaitFuture()
}
}

override fun target(remoteAddress: SocketAddress): NettyTcpClientTargetImpl = NettyTcpClientTargetImpl(
coroutineContext = coroutineContext.supervisorContext(),
bootstrap = bootstrap.clone().remoteAddress(remoteAddress).handler(
NettyTcpChannelInitializer(sslContext, remoteAddress as? InetSocketAddress)
)
)

override fun target(host: String, port: Int): RSocketClientTarget = target(InetSocketAddress(host, port))
}

private class NettyTcpClientTargetImpl(
override val coroutineContext: CoroutineContext,
private val bootstrap: Bootstrap,
) : RSocketClientTarget {
@RSocketTransportApi
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
val future = bootstrap.connect()
future.awaitFuture()
handler.handleNettyTcpConnection(future.channel() as DuplexChannel)
}
}
Loading

0 comments on commit d80c65c

Please sign in to comment.