Skip to content

Commit

Permalink
fix Netty WS
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Apr 12, 2024
1 parent d80c65c commit 3a68fb4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import io.netty.handler.codec.http.*
import io.netty.handler.ssl.*
import java.net.*

internal open class NettyWebSocketChannelInitializer(
internal abstract class NettyWebSocketChannelInitializer(
private val sslContext: SslContext?,
private val remoteAddress: InetSocketAddress?,
private val httpHandler: () -> ChannelHandler,
private val webSocketHandler: () -> ChannelHandler,
) : ChannelInitializer<DuplexChannel>() {
abstract fun createHttpHandler(): ChannelHandler
abstract fun createWebSocketHandler(): ChannelHandler

override fun initChannel(ch: DuplexChannel): Unit = with(ch.pipeline()) {
//addLast(LoggingHandler(if (remoteAddress == null) "server" else "client"))
if (sslContext != null) {
addLast(
"ssl",
Expand All @@ -40,8 +42,8 @@ internal open class NettyWebSocketChannelInitializer(
}
// TODO: should those handlers be configurable?
// what is the the good defaults here and for HttpObjectAggregator
addLast("http", httpHandler())
addLast("http", createHttpHandler())
addLast(HttpObjectAggregator(65536))
addLast("websocket", webSocketHandler())
addLast("websocket", createWebSocketHandler())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,18 @@ private class NettyWebSocketClientTransportImpl(
}
}

override fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget {
val config = WebSocketClientProtocolConfig.newBuilder().apply {
// transport config first
webSocketProtocolConfig?.invoke(this)
// target config
configure.invoke(this)
}.build()

val remoteAddress = InetSocketAddress(config.webSocketUri().host, config.webSocketUri().port)
return NettyWebSocketClientTransportTargetImpl(
override fun target(configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget =
NettyWebSocketClientTransportTargetImpl(
coroutineContext = coroutineContext.supervisorContext(),
bootstrap = bootstrap.clone().remoteAddress(remoteAddress).handler(
NettyWebSocketChannelInitializer(
sslContext = sslContext,
remoteAddress = remoteAddress,
httpHandler = { HttpClientCodec() },
webSocketHandler = { WebSocketClientProtocolHandler(config) }
)
),
bootstrap = bootstrap,
sslContext = sslContext,
webSocketProtocolConfig = WebSocketClientProtocolConfig.newBuilder().apply {
// transport config first
webSocketProtocolConfig?.invoke(this)
// target config
configure.invoke(this)
}.build(),
)
}

override fun target(uri: URI, configure: WebSocketClientProtocolConfig.Builder.() -> Unit): RSocketClientTarget = target {
webSocketUri(uri)
Expand Down Expand Up @@ -177,15 +168,44 @@ private class NettyWebSocketClientTransportImpl(
}
}

// TODO: make TCP use the same IDEA
private class NettyWebSocketClientTransportTargetImpl(
override val coroutineContext: CoroutineContext,
private val bootstrap: Bootstrap,
private val sslContext: SslContext?,
private val webSocketProtocolConfig: WebSocketClientProtocolConfig,
) : RSocketClientTarget {

@RSocketTransportApi
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
val future = bootstrap.connect()
val remoteAddress = InetSocketAddress(webSocketProtocolConfig.webSocketUri().host, webSocketProtocolConfig.webSocketUri().port)
val future = bootstrap.clone().handler(
NettyWebSocketClientChannelInitializer(
sslContext = sslContext,
remoteAddress = remoteAddress,
webSocketProtocolConfig,
coroutineContext,
handler
)
).connect(remoteAddress)
future.awaitFuture()
handler.handleNettyWebSocketConnection(future.channel() as DuplexChannel)
}
}

@RSocketTransportApi
private class NettyWebSocketClientChannelInitializer(
sslContext: SslContext?,
remoteAddress: InetSocketAddress?,
private val webSocketProtocolConfig: WebSocketClientProtocolConfig,
override val coroutineContext: CoroutineContext,
private val handler: RSocketConnectionHandler,
) : NettyWebSocketChannelInitializer(sslContext, remoteAddress), CoroutineScope {
override fun createHttpHandler(): ChannelHandler = HttpClientCodec()

override fun createWebSocketHandler(): ChannelHandler = WebSocketClientProtocolHandler(webSocketProtocolConfig)

override fun initChannel(ch: DuplexChannel) {
super.initChannel(ch)
launch(start = CoroutineStart.UNDISPATCHED) { handler.handleNettyWebSocketConnection(ch) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ internal suspend fun RSocketConnectionHandler.handleNettyWebSocketConnection(cha
val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED)
val inbound = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED) // TODO
val handshakeDeferred = CompletableDeferred<Unit>()
channel.pipeline().addLast(InboundHandler(inbound, handshakeDeferred))

with(channel.pipeline()) {
addLast(InboundHandler(inbound, handshakeDeferred))
}

try {
handshakeDeferred.await()
} catch (cause: Throwable) {
println(cause)
withContext(NonCancellable) {
// TODO: do both calls needed?
channel.shutdown().awaitFuture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,13 @@ private class NettyWebSocketServerTargetImpl(
@RSocketTransportApi
private class NettyWebSocketServerHandler(
sslContext: SslContext?,
webSocketProtocolConfig: WebSocketServerProtocolConfig,
private val webSocketProtocolConfig: WebSocketServerProtocolConfig,
override val coroutineContext: CoroutineContext,
private val handler: RSocketConnectionHandler,
) : NettyWebSocketChannelInitializer(
sslContext,
null,
{ HttpServerCodec() },
{ WebSocketServerProtocolHandler(webSocketProtocolConfig) }
), CoroutineScope {
) : NettyWebSocketChannelInitializer(sslContext, null), CoroutineScope {
override fun createHttpHandler(): ChannelHandler = HttpServerCodec()
override fun createWebSocketHandler(): ChannelHandler = WebSocketServerProtocolHandler(webSocketProtocolConfig)

override fun initChannel(ch: DuplexChannel) {
super.initChannel(ch)
// TODO: connection should be configured in place - specifically handler should be installed in place
Expand Down

0 comments on commit 3a68fb4

Please sign in to comment.