Skip to content

Commit

Permalink
KTOR-1166 Fix UDP socket send exception (#2164)
Browse files Browse the repository at this point in the history
* KTOR-1166 Fix UDP socket send exception
  • Loading branch information
e5l authored Jan 25, 2021
1 parent 2dd2ed7 commit 2ec366d
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class Datagram(
*/
public interface DatagramWriteChannel {
/**
* Datagram outgoing channel
* Datagram outgoing channel.
*/
public val outgoing: SendChannel<Datagram>

Expand Down
15 changes: 9 additions & 6 deletions ktor-network/common/src/io/ktor/network/sockets/SocketOptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public sealed class SocketOptions(
}
}

private class GeneralSocketOptions constructor(customOptions: MutableMap<Any, Any?>) :
SocketOptions(customOptions) {
private class GeneralSocketOptions constructor(
customOptions: MutableMap<Any, Any?>
) : SocketOptions(customOptions) {
override fun copy(): GeneralSocketOptions = GeneralSocketOptions(HashMap(customOptions)).apply {
copyCommon(this@GeneralSocketOptions)
}
Expand Down Expand Up @@ -120,8 +121,9 @@ public sealed class SocketOptions(
/**
* Represents UDP socket options
*/
public class UDPSocketOptions internal constructor(customOptions: MutableMap<Any, Any?>) :
PeerSocketOptions(customOptions) {
public class UDPSocketOptions internal constructor(
customOptions: MutableMap<Any, Any?>
) : PeerSocketOptions(customOptions) {
override fun copy(): UDPSocketOptions {
return UDPSocketOptions(HashMap(customOptions)).apply {
copyCommon(this@UDPSocketOptions)
Expand All @@ -132,8 +134,9 @@ public sealed class SocketOptions(
/**
* Represents TCP client socket options
*/
public class TCPClientSocketOptions internal constructor(customOptions: MutableMap<Any, Any?>) :
PeerSocketOptions(customOptions) {
public class TCPClientSocketOptions internal constructor(
customOptions: MutableMap<Any, Any?>
) : PeerSocketOptions(customOptions) {
/**
* TCP_NODELAY socket option, useful to disable Nagle
*/
Expand Down
157 changes: 157 additions & 0 deletions ktor-network/jvm/src/io/ktor/network/sockets/DatagramSendChannel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2014-2020 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

import io.ktor.network.selector.*
import io.ktor.network.util.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.pool.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import java.net.*
import java.nio.*
import java.nio.channels.*
import kotlin.coroutines.*

private val CLOSED: (Throwable?) -> Unit = {}
private val CLOSED_INVOKED: (Throwable?) -> Unit = {}

internal class DatagramSendChannel(
val channel: DatagramChannel,
val socket: DatagramSocketImpl
) : SendChannel<Datagram> {
private val onCloseHandler = atomic<((Throwable?) -> Unit)?>(null)
private val closed = atomic(false)
private val closedCause = atomic<Throwable?>(null)

@ExperimentalCoroutinesApi
override val isClosedForSend: Boolean
get() = socket.isClosed

@ExperimentalCoroutinesApi
override val isFull: Boolean
get() = if (isClosedForSend) false else lock.isLocked

private val lock = Mutex()

override fun close(cause: Throwable?): Boolean {
if (!closed.compareAndSet(false, true)) {
return false
}

closedCause.value = cause

if (!socket.isClosed) {
socket.close()
}

closeAndCheckHandler()

return true
}


override fun offer(element: Datagram): Boolean {
if (!lock.tryLock()) return false

var result = false

try {
DefaultDatagramByteBufferPool.useInstance { buffer ->
element.packet.copy().readAvailable(buffer)
result = channel.send(buffer, element.address) == 0
}
} finally {
lock.unlock()
}

if (result) {
element.packet.release()
}

return result
}

override suspend fun send(element: Datagram) {
lock.withLock {
withContext(Dispatchers.IO) {
DefaultDatagramByteBufferPool.useInstance { buffer ->
element.writeMessageTo(buffer)

val rc = channel.send(buffer, element.address)
if (rc != 0) {
socket.interestOp(SelectInterest.WRITE, false)
return@useInstance
}

sendSuspend(buffer, element.address)
}
}
}
}

private suspend fun sendSuspend(buffer: ByteBuffer, address: SocketAddress) {
while (true) {
socket.interestOp(SelectInterest.WRITE, true)
socket.selector.select(socket, SelectInterest.WRITE)

if (channel.send(buffer, address) != 0) {
socket.interestOp(SelectInterest.WRITE, false)
break
}
}
}

override val onSend: SelectClause2<Datagram, SendChannel<Datagram>>
get() = TODO("[DatagramSendChannel] doesn't support [onSend] select clause")

@ExperimentalCoroutinesApi
override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) {
if (onCloseHandler.compareAndSet(null, handler)) {
return
}

if (onCloseHandler.value === CLOSED) {
require(onCloseHandler.compareAndSet(CLOSED, CLOSED_INVOKED))
handler(closedCause.value)
return
}

failInvokeOnClose(onCloseHandler.value)
}

private fun closeAndCheckHandler() {
while (true) {
val handler = onCloseHandler.value
if (handler === CLOSED_INVOKED) break
if (handler == null) {
if (onCloseHandler.compareAndSet(null, CLOSED)) break
continue
}

require(onCloseHandler.compareAndSet(handler, CLOSED_INVOKED))
handler(closedCause.value)
break
}
}
}

private fun failInvokeOnClose(handler: ((cause: Throwable?) -> Unit)?) {
val message = if (handler === CLOSED_INVOKED) {
"Another handler was already registered and successfully invoked"
} else {
"Another handler was already registered: $handler"
}

throw IllegalStateException(message)
}

private fun Datagram.writeMessageTo(buffer: ByteBuffer) {
packet.readAvailable(buffer)
buffer.flip()
}
66 changes: 22 additions & 44 deletions ktor-network/jvm/src/io/ktor/network/sockets/DatagramSocketImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,34 @@ import io.ktor.util.network.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import io.ktor.utils.io.core.*
import java.net.*
import java.nio.*
import java.nio.channels.*

@Suppress("BlockingMethodInNonBlockingContext")
internal class DatagramSocketImpl(override val channel: DatagramChannel, selector: SelectorManager)
: BoundDatagramSocket, ConnectedDatagramSocket, NIOSocketImpl<DatagramChannel>(channel, selector, DefaultDatagramByteBufferPool) {

internal class DatagramSocketImpl(
override val channel: DatagramChannel,
selector: SelectorManager
) : BoundDatagramSocket, ConnectedDatagramSocket, NIOSocketImpl<DatagramChannel>(
channel, selector, DefaultDatagramByteBufferPool
) {
private val socket = channel.socket()!!

override val localAddress: NetworkAddress
get() = socket.localSocketAddress as? NetworkAddress
get() = socket.localSocketAddress
?: throw IllegalStateException("Channel is not yet bound")

override val remoteAddress: NetworkAddress
get() = socket.remoteSocketAddress as? NetworkAddress
get() = socket.remoteSocketAddress
?: throw IllegalStateException("Channel is not yet connected")

@OptIn(ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class)
private val sender = actor<Datagram>(Dispatchers.IO) {
consumeEach { datagram ->
sendImpl(datagram)
}
}
private val sender: SendChannel<Datagram> = DatagramSendChannel(channel, this)

@OptIn(ExperimentalCoroutinesApi::class)
private val receiver = produce<Datagram>(Dispatchers.IO) {
while (true) {
channel.send(receiveImpl())
private val receiver: ReceiveChannel<Datagram> = produce(Dispatchers.IO) {
try {
while (true) {
channel.send(receiveImpl())
}
} catch (_: ClosedChannelException) {
}
}

Expand All @@ -50,17 +49,18 @@ internal class DatagramSocketImpl(override val channel: DatagramChannel, selecto

override fun close() {
receiver.cancel()
sender.close()
super.close()
sender.close()
}

@Suppress("BlockingMethodInNonBlockingContext")
private suspend fun receiveImpl(): Datagram {
val buffer = DefaultDatagramByteBufferPool.borrow()
val address = try {
channel.receive(buffer)
} catch (t: Throwable) {
} catch (cause: Throwable) {
DefaultDatagramByteBufferPool.recycle(buffer)
throw t
throw cause
} ?: return receiveSuspend(buffer)

interestOp(SelectInterest.READ, false)
Expand All @@ -76,12 +76,10 @@ internal class DatagramSocketImpl(override val channel: DatagramChannel, selecto

val address = try {
channel.receive(buffer)
} catch (t: Throwable) {
} catch (cause: Throwable) {
DefaultDatagramByteBufferPool.recycle(buffer)
throw t
}

if (address == null) return receiveSuspend(buffer)
throw cause
} ?: return receiveSuspend(buffer)

interestOp(SelectInterest.READ, false)
buffer.flip()
Expand All @@ -90,24 +88,4 @@ internal class DatagramSocketImpl(override val channel: DatagramChannel, selecto
return datagram
}

private suspend fun sendImpl(datagram: Datagram) {
val buffer = ByteBuffer.allocateDirect(datagram.packet.remaining.toInt())
datagram.packet.readAvailable(buffer)
buffer.flip()

val rc = channel.send(buffer, datagram.address)
if (rc == 0) {
sendSuspend(buffer, datagram.address)
} else {
interestOp(SelectInterest.WRITE, false)
}
}

private tailrec suspend fun sendSuspend(buffer: ByteBuffer, address: SocketAddress) {
interestOp(SelectInterest.WRITE, true)
selector.select(this, SelectInterest.WRITE)

if (channel.send(buffer, address) == 0) sendSuspend(buffer, address)
else interestOp(SelectInterest.WRITE, false)
}
}
6 changes: 3 additions & 3 deletions ktor-network/jvm/src/io/ktor/network/sockets/NIOSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ internal abstract class NIOSocketImpl<out S>(
val pool: ObjectPool<ByteBuffer>?,
private val socketOptions: SocketOptions.TCPClientSocketOptions? = null
) : ReadWriteSocket, SelectableBase(channel), CoroutineScope
where S : java.nio.channels.ByteChannel, S : java.nio.channels.SelectableChannel {
where S : java.nio.channels.ByteChannel, S : SelectableChannel {

private val closeFlag = AtomicBoolean()
private val readerJob = AtomicReference<ReaderJob?>()
Expand Down Expand Up @@ -105,8 +105,8 @@ internal abstract class NIOSocketImpl<out S>(
channel.close()
super.close()
null
} catch (t: Throwable) {
t
} catch (cause: Throwable) {
cause
} finally {
selector.notifyClosed(this)
}
Expand Down
Loading

0 comments on commit 2ec366d

Please sign in to comment.