From ca6e66c1ad69c5332e2e97d6a4a0c8a263d58a0e Mon Sep 17 00:00:00 2001 From: sbuzzard Date: Fri, 25 Aug 2017 12:15:19 -0400 Subject: [PATCH] Avoids converting from Chunk to ByteBuffer within UDP ASG singleton socket selector thread when doing writes by introducing an ASG private analog of the public Packet data type for the ASG writer which holds a ByteBuffer. The conversion from Chunk is done in the ASG write implementation prior to context switching to selector thread. --- .../fs2/io/udp/AsynchronousSocketGroup.scala | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala index 30594e6c20..b594697c7c 100644 --- a/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala +++ b/io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala @@ -18,6 +18,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue,CountDownLatch} * * Each `AsynchronousSocketGroup` is assigned a single daemon thread that performs all read/write operations. */ + sealed trait AsynchronousSocketGroup { private[udp] type Context private[udp] def register(channel: DatagramChannel): Context @@ -32,6 +33,11 @@ sealed trait AsynchronousSocketGroup { } object AsynchronousSocketGroup { + /* + * Used to avoid copying between Chunk[Byte] and ByteBuffer during writes within the selector thread, + * as it can be expensive depending on particular implementation of Chunk. + */ + private class WriterPacket(val remote: InetSocketAddress, val bytes: ByteBuffer) def apply(): AsynchronousSocketGroup = new AsynchronousSocketGroup { @@ -51,9 +57,9 @@ object AsynchronousSocketGroup { implicit val ordTimeout: Ordering[Timeout] = Ordering.by[Timeout, Long](_.expiry) } - class Attachment( + private class Attachment( readers: ArrayDeque[(Either[Throwable,Packet] => Unit,Option[Timeout])] = new ArrayDeque(), - writers: ArrayDeque[((Packet,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque() + writers: ArrayDeque[((WriterPacket,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque() ) { def hasReaders: Boolean = !readers.isEmpty @@ -86,12 +92,12 @@ object AsynchronousSocketGroup { def hasWriters: Boolean = !writers.isEmpty - def peekWriter: Option[(Packet,Option[Throwable] => Unit)] = { + def peekWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = { if (writers.isEmpty) None else Some(writers.peek()._1) } - def dequeueWriter: Option[(Packet,Option[Throwable] => Unit)] = { + def dequeueWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = { if (writers.isEmpty) None else { val (w, timeout) = writers.pop() @@ -100,7 +106,7 @@ object AsynchronousSocketGroup { } } - def queueWriter(writer: (Packet,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = { + def queueWriter(writer: (WriterPacket,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = { if (closed) { writer._2(Some(new ClosedChannelException)) timeout.foreach(_.cancel) @@ -173,14 +179,13 @@ object AsynchronousSocketGroup { } private def read1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, reader: Either[Throwable,Packet] => Unit): Boolean = { - readBuffer.clear try { val src = channel.receive(readBuffer).asInstanceOf[InetSocketAddress] if (src eq null) { false } else { readBuffer.flip - val bytes = Array.ofDim[Byte](readBuffer.remaining) + val bytes = new Array[Byte](readBuffer.remaining) readBuffer.get(bytes) readBuffer.clear reader(Right(new Packet(src, Chunk.bytes(bytes)))) @@ -193,6 +198,17 @@ object AsynchronousSocketGroup { } override def write(key: SelectionKey, packet: Packet, timeout: Option[FiniteDuration], cb: Option[Throwable] => Unit): Unit = { + val writerPacket = { + val bytes = { + val srcBytes = packet.bytes.toBytes + if (srcBytes.size == srcBytes.values.size) srcBytes.values else { + val destBytes = new Array[Byte](srcBytes.size) + Array.copy(srcBytes.values, 0, destBytes, srcBytes.offset, srcBytes.size) + destBytes + } + } + new WriterPacket(packet.remote, ByteBuffer.wrap(bytes)) + } onSelectorThread { val channel = key.channel.asInstanceOf[DatagramChannel] val attachment = key.attachment.asInstanceOf[Attachment] @@ -202,11 +218,11 @@ object AsynchronousSocketGroup { if (cancelWriter ne null) cancelWriter() }} if (attachment.hasWriters) { - cancelWriter = attachment.queueWriter((packet, cb), t) + cancelWriter = attachment.queueWriter((writerPacket, cb), t) t.foreach { t => pendingTimeouts += t } } else { - if (!write1(key, channel, attachment, packet, cb)) { - cancelWriter = attachment.queueWriter((packet, cb), t) + if (!write1(key, channel, attachment, writerPacket, cb)) { + cancelWriter = attachment.queueWriter((writerPacket, cb), t) t.foreach { t => pendingTimeouts += t } try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () } catch { case t: CancelledKeyException => /* Ignore; key was closed */ } @@ -215,9 +231,9 @@ object AsynchronousSocketGroup { } { cb(Some(new ClosedChannelException)) } } - private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, p: Packet, cb: Option[Throwable] => Unit): Boolean = { + private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, packet: WriterPacket, cb: Option[Throwable] => Unit): Boolean = { try { - val sent = channel.send(ByteBuffer.wrap(p.bytes.toArray), p.remote) + val sent = channel.send(packet.bytes, packet.remote) if (sent > 0) { cb(None) true