From 3a68fb4dce82bc4400b70e47e4a976e38fe63d12 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Fri, 12 Apr 2024 22:02:03 +0300 Subject: [PATCH] fix Netty WS --- .../NettyWebSocketChannelInitializer.kt | 12 ++-- .../NettyWebSocketClientTransport.kt | 62 ++++++++++++------- .../websocket/NettyWebSocketConnection.kt | 6 +- .../NettyWebSocketServerTransport.kt | 12 ++-- 4 files changed, 57 insertions(+), 35 deletions(-) diff --git a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketChannelInitializer.kt b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketChannelInitializer.kt index e0b7b0c1..d3c641fc 100644 --- a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketChannelInitializer.kt +++ b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketChannelInitializer.kt @@ -22,13 +22,15 @@ import io.netty.handler.codec.http.* import io.netty.handler.ssl.* import java.net.* -internal open class NettyWebSocketChannelInitializer( +internal abstract class NettyWebSocketChannelInitializer( private val sslContext: SslContext?, private val remoteAddress: InetSocketAddress?, - private val httpHandler: () -> ChannelHandler, - private val webSocketHandler: () -> ChannelHandler, ) : ChannelInitializer() { + abstract fun createHttpHandler(): ChannelHandler + abstract fun createWebSocketHandler(): ChannelHandler + override fun initChannel(ch: DuplexChannel): Unit = with(ch.pipeline()) { + //addLast(LoggingHandler(if (remoteAddress == null) "server" else "client")) if (sslContext != null) { addLast( "ssl", @@ -40,8 +42,8 @@ internal open class NettyWebSocketChannelInitializer( } // TODO: should those handlers be configurable? // what is the the good defaults here and for HttpObjectAggregator - addLast("http", httpHandler()) + addLast("http", createHttpHandler()) addLast(HttpObjectAggregator(65536)) - addLast("websocket", webSocketHandler()) + addLast("websocket", createWebSocketHandler()) } } diff --git a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketClientTransport.kt b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketClientTransport.kt index 4b4f2cbd..36125181 100644 --- a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketClientTransport.kt +++ b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketClientTransport.kt @@ -127,27 +127,18 @@ private class NettyWebSocketClientTransportImpl( } } - override fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget { - val config = WebSocketClientProtocolConfig.newBuilder().apply { - // transport config first - webSocketProtocolConfig?.invoke(this) - // target config - configure.invoke(this) - }.build() - - val remoteAddress = InetSocketAddress(config.webSocketUri().host, config.webSocketUri().port) - return NettyWebSocketClientTransportTargetImpl( + override fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget = + NettyWebSocketClientTransportTargetImpl( coroutineContext = coroutineContext.supervisorContext(), - bootstrap = bootstrap.clone().remoteAddress(remoteAddress).handler( - NettyWebSocketChannelInitializer( - sslContext = sslContext, - remoteAddress = remoteAddress, - httpHandler = { HttpClientCodec() }, - webSocketHandler = { WebSocketClientProtocolHandler(config) } - ) - ), + bootstrap = bootstrap, + sslContext = sslContext, + webSocketProtocolConfig = WebSocketClientProtocolConfig.newBuilder().apply { + // transport config first + webSocketProtocolConfig?.invoke(this) + // target config + configure.invoke(this) + }.build(), ) - } override fun target(uri: URI, configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget = target { webSocketUri(uri) @@ -177,15 +168,44 @@ private class NettyWebSocketClientTransportImpl( } } +// TODO: make TCP use the same IDEA private class NettyWebSocketClientTransportTargetImpl( override val coroutineContext: CoroutineContext, private val bootstrap: Bootstrap, + private val sslContext: SslContext?, + private val webSocketProtocolConfig: WebSocketClientProtocolConfig, ) : RSocketClientTarget { @RSocketTransportApi override fun connectClient(handler: RSocketConnectionHandler): Job = launch { - val future = bootstrap.connect() + val remoteAddress = InetSocketAddress(webSocketProtocolConfig.webSocketUri().host, webSocketProtocolConfig.webSocketUri().port) + val future = bootstrap.clone().handler( + NettyWebSocketClientChannelInitializer( + sslContext = sslContext, + remoteAddress = remoteAddress, + webSocketProtocolConfig, + coroutineContext, + handler + ) + ).connect(remoteAddress) future.awaitFuture() - handler.handleNettyWebSocketConnection(future.channel() as DuplexChannel) + } +} + +@RSocketTransportApi +private class NettyWebSocketClientChannelInitializer( + sslContext: SslContext?, + remoteAddress: InetSocketAddress?, + private val webSocketProtocolConfig: WebSocketClientProtocolConfig, + override val coroutineContext: CoroutineContext, + private val handler: RSocketConnectionHandler, +) : NettyWebSocketChannelInitializer(sslContext, remoteAddress), CoroutineScope { + override fun createHttpHandler(): ChannelHandler = HttpClientCodec() + + override fun createWebSocketHandler(): ChannelHandler = WebSocketClientProtocolHandler(webSocketProtocolConfig) + + override fun initChannel(ch: DuplexChannel) { + super.initChannel(ch) + launch(start = CoroutineStart.UNDISPATCHED) { handler.handleNettyWebSocketConnection(ch) } } } diff --git a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketConnection.kt b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketConnection.kt index e1fae68d..8864c32a 100644 --- a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketConnection.kt +++ b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketConnection.kt @@ -34,12 +34,14 @@ internal suspend fun RSocketConnectionHandler.handleNettyWebSocketConnection(cha val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED) val inbound = channelForCloseable(Channel.UNLIMITED) // TODO val handshakeDeferred = CompletableDeferred() - channel.pipeline().addLast(InboundHandler(inbound, handshakeDeferred)) + + with(channel.pipeline()) { + addLast(InboundHandler(inbound, handshakeDeferred)) + } try { handshakeDeferred.await() } catch (cause: Throwable) { - println(cause) withContext(NonCancellable) { // TODO: do both calls needed? channel.shutdown().awaitFuture() diff --git a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketServerTransport.kt b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketServerTransport.kt index 08f1b064..d7b00165 100644 --- a/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketServerTransport.kt +++ b/rsocket-transports/netty-websocket/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/websocket/NettyWebSocketServerTransport.kt @@ -205,15 +205,13 @@ private class NettyWebSocketServerTargetImpl( @RSocketTransportApi private class NettyWebSocketServerHandler( sslContext: SslContext?, - webSocketProtocolConfig: WebSocketServerProtocolConfig, + private val webSocketProtocolConfig: WebSocketServerProtocolConfig, override val coroutineContext: CoroutineContext, private val handler: RSocketConnectionHandler, -) : NettyWebSocketChannelInitializer( - sslContext, - null, - { HttpServerCodec() }, - { WebSocketServerProtocolHandler(webSocketProtocolConfig) } -), CoroutineScope { +) : NettyWebSocketChannelInitializer(sslContext, null), CoroutineScope { + override fun createHttpHandler(): ChannelHandler = HttpServerCodec() + override fun createWebSocketHandler(): ChannelHandler = WebSocketServerProtocolHandler(webSocketProtocolConfig) + override fun initChannel(ch: DuplexChannel) { super.initChannel(ch) // TODO: connection should be configured in place - specifically handler should be installed in place