diff --git a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala index 30594e6c20..b594697c7c 100644 --- a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala @@ -18,6 +18,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue,CountDownLatch} * * Each `AsynchronousSocketGroup` is assigned a single daemon thread that performs all read/write operations. */ + sealed trait AsynchronousSocketGroup { private[udp] type Context private[udp] def register(channel: DatagramChannel): Context @@ -32,6 +33,11 @@ sealed trait AsynchronousSocketGroup { } object AsynchronousSocketGroup { + /* + * Used to avoid copying between Chunk[Byte] and ByteBuffer during writes within the selector thread, + * as it can be expensive depending on particular implementation of Chunk. + */ + private class WriterPacket(val remote: InetSocketAddress, val bytes: ByteBuffer) def apply(): AsynchronousSocketGroup = new AsynchronousSocketGroup { @@ -51,9 +57,9 @@ object AsynchronousSocketGroup { implicit val ordTimeout: Ordering[Timeout] = Ordering.by[Timeout, Long](_.expiry) } - class Attachment( + private class Attachment( readers: ArrayDeque[(Either[Throwable,Packet] => Unit,Option[Timeout])] = new ArrayDeque(), - writers: ArrayDeque[((Packet,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque() + writers: ArrayDeque[((WriterPacket,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque() ) { def hasReaders: Boolean = !readers.isEmpty @@ -86,12 +92,12 @@ object AsynchronousSocketGroup { def hasWriters: Boolean = !writers.isEmpty - def peekWriter: Option[(Packet,Option[Throwable] => Unit)] = { + def peekWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = { if (writers.isEmpty) None else Some(writers.peek()._1) } - def dequeueWriter: Option[(Packet,Option[Throwable] => Unit)] = { + def dequeueWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = { if (writers.isEmpty) None else { val (w, timeout) = writers.pop() @@ -100,7 +106,7 @@ object AsynchronousSocketGroup { } } - def queueWriter(writer: (Packet,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = { + def queueWriter(writer: (WriterPacket,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = { if (closed) { writer._2(Some(new ClosedChannelException)) timeout.foreach(_.cancel) @@ -173,14 +179,13 @@ object AsynchronousSocketGroup { } private def read1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, reader: Either[Throwable,Packet] => Unit): Boolean = { - readBuffer.clear try { val src = channel.receive(readBuffer).asInstanceOf[InetSocketAddress] if (src eq null) { false } else { readBuffer.flip - val bytes = Array.ofDim[Byte](readBuffer.remaining) + val bytes = new Array[Byte](readBuffer.remaining) readBuffer.get(bytes) readBuffer.clear reader(Right(new Packet(src, Chunk.bytes(bytes)))) @@ -193,6 +198,17 @@ object AsynchronousSocketGroup { } override def write(key: SelectionKey, packet: Packet, timeout: Option[FiniteDuration], cb: Option[Throwable] => Unit): Unit = { + val writerPacket = { + val bytes = { + val srcBytes = packet.bytes.toBytes + if (srcBytes.size == srcBytes.values.size) srcBytes.values else { + val destBytes = new Array[Byte](srcBytes.size) + Array.copy(srcBytes.values, 0, destBytes, srcBytes.offset, srcBytes.size) + destBytes + } + } + new WriterPacket(packet.remote, ByteBuffer.wrap(bytes)) + } onSelectorThread { val channel = key.channel.asInstanceOf[DatagramChannel] val attachment = key.attachment.asInstanceOf[Attachment] @@ -202,11 +218,11 @@ object AsynchronousSocketGroup { if (cancelWriter ne null) cancelWriter() }} if (attachment.hasWriters) { - cancelWriter = attachment.queueWriter((packet, cb), t) + cancelWriter = attachment.queueWriter((writerPacket, cb), t) t.foreach { t => pendingTimeouts += t } } else { - if (!write1(key, channel, attachment, packet, cb)) { - cancelWriter = attachment.queueWriter((packet, cb), t) + if (!write1(key, channel, attachment, writerPacket, cb)) { + cancelWriter = attachment.queueWriter((writerPacket, cb), t) t.foreach { t => pendingTimeouts += t } try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () } catch { case t: CancelledKeyException => /* Ignore; key was closed */ } @@ -215,9 +231,9 @@ object AsynchronousSocketGroup { } { cb(Some(new ClosedChannelException)) } } - private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, p: Packet, cb: Option[Throwable] => Unit): Boolean = { + private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, packet: WriterPacket, cb: Option[Throwable] => Unit): Boolean = { try { - val sent = channel.send(ByteBuffer.wrap(p.bytes.toArray), p.remote) + val sent = channel.send(packet.bytes, packet.remote) if (sent > 0) { cb(None) true