Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Node.js Duplex interop #3365

Merged
merged 4 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 152 additions & 52 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ import cats.Show
import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.effect.kernel.Sync
import cats.effect.std.Dispatcher
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.internal.facade

import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import scala.annotation.nowarn
import scala.scalajs.js
import scala.scalajs.js.typedarray.Uint8Array

private[fs2] trait ioplatform {

Expand Down Expand Up @@ -226,8 +224,10 @@ private[fs2] trait ioplatform {
* and return a stream which, when run, will perform that function and emit
* the bytes recorded in the `Writable` as an fs2.Stream
*/
def readWritable[F[_]: Async](f: Writable => F[Unit]): Stream[F, Byte] =
Stream.empty.through(toDuplexAndRead(f))
def readWritable[F[_]](f: Writable => F[Unit])(implicit F: Async[F]): Stream[F, Byte] =
Stream.empty.through(toDuplexAndRead { duplex =>
F.delay(duplex.on("data", () => ())) *> f(duplex)
})

/** Take a function that reads and writes from a `Duplex` effectfully,
* and return a pipe which, when run, will perform that function,
Expand All @@ -241,64 +241,164 @@ private[fs2] trait ioplatform {

private[io] def mkDuplex[F[_]](
in: Stream[F, Byte]
)(implicit F: Async[F]): Resource[F, (Duplex, Stream[F, Byte])] =
for {
readDispatcher <- Dispatcher.sequential[F]
writeDispatcher <- Dispatcher.sequential[F]
errorDispatcher <- Dispatcher.sequential[F]
readQueue <- Queue.bounded[F, Option[Chunk[Byte]]](1).toResource
writeChannel <- Channel.synchronous[F, Chunk[Byte]].toResource
interrupt <- F.deferred[Either[Throwable, Unit]].toResource
duplex <- Resource.make {
)(implicit F: Async[F]): Resource[F, (Duplex, Stream[F, Byte])] = {

type ReadCallback = Uint8Array => Unit
type ReadReadyCallback = Either[Throwable, ReadCallback] => Unit

type WriteCallback = Either[Throwable, Option[Chunk[Byte]]] => Unit
type WriteReadyCallback = WriteCallback => Unit

final class Listener {
// implementation note:
// we always null the callback vars *before* invoking the callback
// this is b/c a new callback may be installed during the invocation of the current callback
// nulling *after* invoking the current callback would clobber this new value

private[this] var readCallback: ReadCallback = null
private[this] var readReadyCallback: ReadReadyCallback = null

private[this] var writeCallback: WriteCallback = null
private[this] var writeReadyCallback: WriteReadyCallback = null

private[this] var destroy: Either[Throwable, Unit] = null
private[this] var destroyCallback: Either[Throwable, Unit] => Unit = null

def handleRead(rcb: ReadCallback): Unit =
if (readReadyCallback ne null) {
val rrcb = readReadyCallback
readReadyCallback = null
rrcb(Right(rcb))
} else {
readCallback = rcb
}

private[this] def readReady: F[ReadCallback] = F.async { rrcb =>
F.delay {
if (readCallback ne null) {
val rcb = readCallback
readCallback = null
rrcb(Right(rcb))
None
} else {
readReadyCallback = rrcb
Some(F.delay { readReadyCallback = null })
}
}
}

def reads: Pipe[F, Byte, Nothing] = {
def go(in: Stream[F, Byte]): Pull[F, Nothing, Unit] =
Pull.eval(readReady).flatMap { cb =>
in.pull.uncons.flatMap {
case Some((head, tail)) =>
Pull.eval(F.delay(cb(head.toUint8Array))) >> go(tail)
case None => Pull.eval(F.delay(cb(null)))
}
}

go(_).stream
}

private[this] def onWrite: F[Option[Chunk[Byte]]] = F.async { cb =>
F.delay {
if (writeReadyCallback ne null) {
val wrcb = writeReadyCallback
writeReadyCallback = null
wrcb(cb)
None
} else {
writeCallback = cb
Some(F.delay { writeCallback = null })
}
}
}

def writes: Stream[F, Byte] =
Stream.repeatEval(onWrite).unNoneTerminate.unchunks

def handleWrite(wrcb: WriteReadyCallback): Unit =
if (writeCallback ne null) {
val wcb = writeCallback
writeCallback = null
wrcb(wcb)
} else {
writeReadyCallback = wrcb
}

def handleDestroy(e: js.Error): Unit = {
destroy = Option(e).map(js.JavaScriptException(_)).toLeft(())
if (destroyCallback ne null) {
val dcb = destroyCallback
destroyCallback = null
dcb(destroy)
}
}

def onDestroy: F[Unit] = F.async { cb =>
F.delay {
new facade.stream.Duplex(
new facade.stream.DuplexOptions {
if (destroy ne null) {
cb(destroy)
None
} else {
destroyCallback = cb
Some(F.delay { destroyCallback = null })
}
}
}
}

Resource.eval(F.delay(new Listener)).flatMap { listener =>
Resource
.make {
F.delay {
new facade.stream.Duplex(
new facade.stream.DuplexOptions {

var autoDestroy = false
var autoDestroy = false

var read = { readable =>
readDispatcher.unsafeRunAndForget(
readQueue.take.flatMap { chunk =>
F.delay(readable.push(chunk.map(_.toUint8Array).orNull)).void
var read = { readable =>
listener.handleRead { c =>
readable.push(c)
()
}
)
}
}

var write = { (_, chunk, _, cb) =>
writeDispatcher.unsafeRunAndForget(
writeChannel.send(Chunk.uint8Array(chunk)) *> F.delay(cb(null))
)
}
var write = { (_, chunk, _, cb) =>
listener.handleWrite { write =>
write(Right(Some(Chunk.uint8Array(chunk))))
cb(null)
}
}

var `final` = { (_, cb) =>
writeDispatcher.unsafeRunAndForget(
writeChannel.close *> F.delay(cb(null))
)
}
var `final` = { (_, cb) =>
listener.handleWrite { write =>
write(Right(None))
cb(null)
}
}

var destroy = { (_, err, cb) =>
errorDispatcher.unsafeRunAndForget {
interrupt
.complete(
Option(err).map(js.JavaScriptException(_)).toLeft(())
) *> F.delay(cb(null))
var destroy = { (_, err, cb) =>
listener.handleDestroy(err)
cb(null)
}
}
}
)
)
}
} { duplex =>
F.delay {
if (!duplex.readableEnded | !duplex.writableEnded)
duplex.destroy()
}
}
} { duplex =>
F.delay {
if (!duplex.readableEnded | !duplex.writableEnded)
duplex.destroy()
.tupleRight {
in.through(listener.reads)
.merge(listener.writes)
.interruptWhen(listener.onDestroy.attempt)
.adaptError { case IOException(ex) => ex }
}
}
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
out = writeChannel.stream.unchunks
} yield (
duplex,
drainIn.merge(out).interruptWhen(interrupt).adaptError { case IOException(ex) => ex }
)
}
}

/** Stream of bytes read asynchronously from standard input. */
def stdin[F[_]: Async]: Stream[F, Byte] = stdinAsync
Expand Down
23 changes: 23 additions & 0 deletions io/js/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,27 @@ class IoPlatformSuite extends Fs2Suite {
.drain
}

test("write in readWritable write callback does not hang") {
readWritable { writable =>
IO.async_[Unit] { cb =>
writable.write(
Chunk[Byte](0).toUint8Array,
_ => { // start the next write in the callback
writable.write(Chunk[Byte](1).toUint8Array, _ => cb(Right(())))
()
}
)
()
}
}.take(2).compile.toList.assertEquals(List[Byte](0, 1))
}

test("toReadable does not start input stream eagerly") {
IO.ref(true).flatMap { notStarted =>
toReadableResource(Stream.exec(notStarted.set(false))).surround {
IO.sleep(100.millis) *> notStarted.get.assert
}
}
}

}