diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index daad3890c5..424efcb2b6 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -158,13 +158,18 @@ private[fs2] trait ioplatform { case None => Pull.done } - go(in).stream.onFinalizeCase[F] { - case Resource.ExitCase.Succeeded => - if (endAfterUse) + val end = + if (endAfterUse) + Stream.exec { F.async_[Unit] { cb => writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) } - else F.unit + } + else Stream.empty + + (go(in).stream ++ end).onFinalizeCase[F] { + case Resource.ExitCase.Succeeded => + F.unit case Resource.ExitCase.Errored(_) | Resource.ExitCase.Canceled => // tempting, but don't propagate the error! // that would trigger a unhandled Node.js error that circumvents FS2/CE error channels diff --git a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala index 47e023041f..2c64498e72 100644 --- a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -27,6 +27,8 @@ import fs2.Fs2Suite import fs2.io.internal.facade import org.scalacheck.effect.PropF.forAllF +import scala.concurrent.duration._ + class IoPlatformSuite extends Fs2Suite { test("to/read Readable") { @@ -92,4 +94,25 @@ class IoPlatformSuite extends Fs2Suite { }.attempt } + test("unacknowledged 'end' does not prevent writeWritable cancelation") { + val writable = IO { + new facade.stream.Duplex( + new facade.stream.DuplexOptions { + var autoDestroy = false + var read = _ => () + var write = (_, _, _, _) => () + var `final` = (_, _) => () + var destroy = (_, _, _) => () + } + ) + } + + Stream + .empty[IO] + .through(writeWritable[IO](writable)) + .compile + .drain + .timeoutTo(100.millis, IO.unit) + } + }