Skip to content

Commit

Permalink
Merge pull request #927 from sbuzzard/avoid-chunk-array-creation
Browse files Browse the repository at this point in the history
Avoids converting from/to byte array from Chunk within UDP ASG single…
  • Loading branch information
mpilquist authored Aug 28, 2017
2 parents 731acf6 + ca6e66c commit 6cddf8b
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions io/src/main/scala/fs2/io/udp/AsynchronousSocketGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import java.util.concurrent.{ConcurrentLinkedQueue,CountDownLatch}
*
* Each `AsynchronousSocketGroup` is assigned a single daemon thread that performs all read/write operations.
*/

sealed trait AsynchronousSocketGroup {
private[udp] type Context
private[udp] def register(channel: DatagramChannel): Context
Expand All @@ -32,6 +33,11 @@ sealed trait AsynchronousSocketGroup {
}

object AsynchronousSocketGroup {
/*
* Used to avoid copying between Chunk[Byte] and ByteBuffer during writes within the selector thread,
* as it can be expensive depending on particular implementation of Chunk.
*/
private class WriterPacket(val remote: InetSocketAddress, val bytes: ByteBuffer)

def apply(): AsynchronousSocketGroup = new AsynchronousSocketGroup {

Expand All @@ -51,9 +57,9 @@ object AsynchronousSocketGroup {
implicit val ordTimeout: Ordering[Timeout] = Ordering.by[Timeout, Long](_.expiry)
}

class Attachment(
private class Attachment(
readers: ArrayDeque[(Either[Throwable,Packet] => Unit,Option[Timeout])] = new ArrayDeque(),
writers: ArrayDeque[((Packet,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque()
writers: ArrayDeque[((WriterPacket,Option[Throwable] => Unit),Option[Timeout])] = new ArrayDeque()
) {

def hasReaders: Boolean = !readers.isEmpty
Expand Down Expand Up @@ -86,12 +92,12 @@ object AsynchronousSocketGroup {

def hasWriters: Boolean = !writers.isEmpty

def peekWriter: Option[(Packet,Option[Throwable] => Unit)] = {
def peekWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = {
if (writers.isEmpty) None
else Some(writers.peek()._1)
}

def dequeueWriter: Option[(Packet,Option[Throwable] => Unit)] = {
def dequeueWriter: Option[(WriterPacket,Option[Throwable] => Unit)] = {
if (writers.isEmpty) None
else {
val (w, timeout) = writers.pop()
Expand All @@ -100,7 +106,7 @@ object AsynchronousSocketGroup {
}
}

def queueWriter(writer: (Packet,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = {
def queueWriter(writer: (WriterPacket,Option[Throwable] => Unit), timeout: Option[Timeout]): () => Unit = {
if (closed) {
writer._2(Some(new ClosedChannelException))
timeout.foreach(_.cancel)
Expand Down Expand Up @@ -173,14 +179,13 @@ object AsynchronousSocketGroup {
}

private def read1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, reader: Either[Throwable,Packet] => Unit): Boolean = {
readBuffer.clear
try {
val src = channel.receive(readBuffer).asInstanceOf[InetSocketAddress]
if (src eq null) {
false
} else {
readBuffer.flip
val bytes = Array.ofDim[Byte](readBuffer.remaining)
val bytes = new Array[Byte](readBuffer.remaining)
readBuffer.get(bytes)
readBuffer.clear
reader(Right(new Packet(src, Chunk.bytes(bytes))))
Expand All @@ -193,6 +198,17 @@ object AsynchronousSocketGroup {
}

override def write(key: SelectionKey, packet: Packet, timeout: Option[FiniteDuration], cb: Option[Throwable] => Unit): Unit = {
val writerPacket = {
val bytes = {
val srcBytes = packet.bytes.toBytes
if (srcBytes.size == srcBytes.values.size) srcBytes.values else {
val destBytes = new Array[Byte](srcBytes.size)
Array.copy(srcBytes.values, 0, destBytes, srcBytes.offset, srcBytes.size)
destBytes
}
}
new WriterPacket(packet.remote, ByteBuffer.wrap(bytes))
}
onSelectorThread {
val channel = key.channel.asInstanceOf[DatagramChannel]
val attachment = key.attachment.asInstanceOf[Attachment]
Expand All @@ -202,11 +218,11 @@ object AsynchronousSocketGroup {
if (cancelWriter ne null) cancelWriter()
}}
if (attachment.hasWriters) {
cancelWriter = attachment.queueWriter((packet, cb), t)
cancelWriter = attachment.queueWriter((writerPacket, cb), t)
t.foreach { t => pendingTimeouts += t }
} else {
if (!write1(key, channel, attachment, packet, cb)) {
cancelWriter = attachment.queueWriter((packet, cb), t)
if (!write1(key, channel, attachment, writerPacket, cb)) {
cancelWriter = attachment.queueWriter((writerPacket, cb), t)
t.foreach { t => pendingTimeouts += t }
try { key.interestOps(key.interestOps | SelectionKey.OP_WRITE); () }
catch { case t: CancelledKeyException => /* Ignore; key was closed */ }
Expand All @@ -215,9 +231,9 @@ object AsynchronousSocketGroup {
} { cb(Some(new ClosedChannelException)) }
}

private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, p: Packet, cb: Option[Throwable] => Unit): Boolean = {
private def write1(key: SelectionKey, channel: DatagramChannel, attachment: Attachment, packet: WriterPacket, cb: Option[Throwable] => Unit): Boolean = {
try {
val sent = channel.send(ByteBuffer.wrap(p.bytes.toArray), p.remote)
val sent = channel.send(packet.bytes, packet.remote)
if (sent > 0) {
cb(None)
true
Expand Down

0 comments on commit 6cddf8b

Please sign in to comment.