diff --git a/io/js/src/main/scala/fs2/io/NodeStream.scala b/io/js/src/main/scala/fs2/io/NodeStream.scala index a0343882e9..a6f34c4252 100644 --- a/io/js/src/main/scala/fs2/io/NodeStream.scala +++ b/io/js/src/main/scala/fs2/io/NodeStream.scala @@ -37,8 +37,6 @@ trait Readable extends EventEmitter { protected[io] def destroy(): this.type = js.native - protected[io] def destroy(error: js.Error): this.type = js.native - protected[io] def push(chunk: js.typedarray.Uint8Array): Boolean = js.native protected[io] def readableEnded: Boolean = js.native @@ -52,7 +50,7 @@ trait Readable extends EventEmitter { @nowarn trait Writable extends EventEmitter { - protected[io] def destroy(error: js.Error): this.type = js.native + protected[io] def destroy(): this.type = js.native protected[io] def write( chunk: js.typedarray.Uint8Array, @@ -70,7 +68,7 @@ trait Writable extends EventEmitter { */ @js.native trait Duplex extends Readable with Writable { - protected[io] override def destroy(error: js.Error): this.type = js.native + protected[io] override def destroy(): this.type = js.native } final class StreamDestroyedException private[io] () extends IOException diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index 7fcb24bdc6..daad3890c5 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -31,7 +31,6 @@ import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.syntax.all._ import fs2.io.internal.MicrotaskExecutor -import fs2.io.internal.ThrowableOps._ import fs2.io.internal.facade import java.nio.charset.Charset @@ -70,8 +69,10 @@ private[fs2] trait ioplatform { if (!readable.readableEnded & destroyIfNotEnded) readable.destroy() } - case (readable, Resource.ExitCase.Errored(ex)) => - F.delay(readable.destroy(ex.toJSError)) + case (readable, Resource.ExitCase.Errored(_)) => + // tempting, but don't propagate the error! + // that would trigger a unhandled Node.js error that circumvents FS2/CE error channels + F.delay(readable.destroy()) case (readable, Resource.ExitCase.Canceled) => if (destroyIfCanceled) F.delay(readable.destroy()) @@ -154,20 +155,21 @@ private[fs2] trait ioplatform { () } } >> go(tail) - case None => - if (endAfterUse) - Pull.eval( - F.async_[Unit] { cb => - writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) - } - ) - else - Pull.done + case None => Pull.done } - go(in).stream.handleErrorWith { ex => - Stream.eval(F.delay(writable.destroy(ex.toJSError))) - }.drain + go(in).stream.onFinalizeCase[F] { + case Resource.ExitCase.Succeeded => + if (endAfterUse) + F.async_[Unit] { cb => + writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) + } + else F.unit + case Resource.ExitCase.Errored(_) | Resource.ExitCase.Canceled => + // tempting, but don't propagate the error! + // that would trigger a unhandled Node.js error that circumvents FS2/CE error channels + F.delay(writable.destroy()) + } } .adaptError { case IOException(ex) => ex } @@ -238,7 +240,7 @@ private[fs2] trait ioplatform { } { duplex => F.delay { if (!duplex.readableEnded | !duplex.writableEnded) - duplex.destroy(null) + duplex.destroy() } } drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain diff --git a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala index 7ef07df274..47e023041f 100644 --- a/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala +++ b/io/js/src/test/scala/fs2/io/IoPlatformSuite.scala @@ -24,6 +24,7 @@ package io import cats.effect.IO import fs2.Fs2Suite +import fs2.io.internal.facade import org.scalacheck.effect.PropF.forAllF class IoPlatformSuite extends Fs2Suite { @@ -69,4 +70,26 @@ class IoPlatformSuite extends Fs2Suite { } } + test("Doesn't cause an unhandled error event") { + suspendReadableAndRead[IO, Readable]()( + facade.fs.createReadStream( + "README.md", + new facade.fs.ReadStreamOptions { + highWaterMark = 1 + } + ) + ).use { case (_, stream) => + IO.deferred[Unit].flatMap { gate => + stream + .evalTap(_ => gate.complete(()) *> IO.never) + .compile + .drain + .background + .use { _ => + gate.get *> IO.raiseError(new Exception("too hot to handle!")) + } + } + }.attempt + } + }