Skip to content

Commit

Permalink
Merge pull request #2926 from armanbilge/fix/js-stream-destroy-error
Browse files Browse the repository at this point in the history
Don't propagate errors into Node.js
  • Loading branch information
mpilquist authored Jul 1, 2022
2 parents 620c277 + 7cfb4de commit 7f26024
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
6 changes: 2 additions & 4 deletions io/js/src/main/scala/fs2/io/NodeStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ trait Readable extends EventEmitter {

protected[io] def destroy(): this.type = js.native

protected[io] def destroy(error: js.Error): this.type = js.native

protected[io] def push(chunk: js.typedarray.Uint8Array): Boolean = js.native

protected[io] def readableEnded: Boolean = js.native
Expand All @@ -52,7 +50,7 @@ trait Readable extends EventEmitter {
@nowarn
trait Writable extends EventEmitter {

protected[io] def destroy(error: js.Error): this.type = js.native
protected[io] def destroy(): this.type = js.native

protected[io] def write(
chunk: js.typedarray.Uint8Array,
Expand All @@ -70,7 +68,7 @@ trait Writable extends EventEmitter {
*/
@js.native
trait Duplex extends Readable with Writable {
protected[io] override def destroy(error: js.Error): this.type = js.native
protected[io] override def destroy(): this.type = js.native
}

final class StreamDestroyedException private[io] () extends IOException
34 changes: 18 additions & 16 deletions io/js/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2.io.internal.MicrotaskExecutor
import fs2.io.internal.ThrowableOps._
import fs2.io.internal.facade

import java.nio.charset.Charset
Expand Down Expand Up @@ -70,8 +69,10 @@ private[fs2] trait ioplatform {
if (!readable.readableEnded & destroyIfNotEnded)
readable.destroy()
}
case (readable, Resource.ExitCase.Errored(ex)) =>
F.delay(readable.destroy(ex.toJSError))
case (readable, Resource.ExitCase.Errored(_)) =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(readable.destroy())
case (readable, Resource.ExitCase.Canceled) =>
if (destroyIfCanceled)
F.delay(readable.destroy())
Expand Down Expand Up @@ -154,20 +155,21 @@ private[fs2] trait ioplatform {
()
}
} >> go(tail)
case None =>
if (endAfterUse)
Pull.eval(
F.async_[Unit] { cb =>
writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))
}
)
else
Pull.done
case None => Pull.done
}

go(in).stream.handleErrorWith { ex =>
Stream.eval(F.delay(writable.destroy(ex.toJSError)))
}.drain
go(in).stream.onFinalizeCase[F] {
case Resource.ExitCase.Succeeded =>
if (endAfterUse)
F.async_[Unit] { cb =>
writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))
}
else F.unit
case Resource.ExitCase.Errored(_) | Resource.ExitCase.Canceled =>
// tempting, but don't propagate the error!
// that would trigger a unhandled Node.js error that circumvents FS2/CE error channels
F.delay(writable.destroy())
}
}
.adaptError { case IOException(ex) => ex }

Expand Down Expand Up @@ -238,7 +240,7 @@ private[fs2] trait ioplatform {
} { duplex =>
F.delay {
if (!duplex.readableEnded | !duplex.writableEnded)
duplex.destroy(null)
duplex.destroy()
}
}
drainIn = in.enqueueNoneTerminatedChunks(readQueue).drain
Expand Down
23 changes: 23 additions & 0 deletions io/js/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package io

import cats.effect.IO
import fs2.Fs2Suite
import fs2.io.internal.facade
import org.scalacheck.effect.PropF.forAllF

class IoPlatformSuite extends Fs2Suite {
Expand Down Expand Up @@ -69,4 +70,26 @@ class IoPlatformSuite extends Fs2Suite {
}
}

test("Doesn't cause an unhandled error event") {
suspendReadableAndRead[IO, Readable]()(
facade.fs.createReadStream(
"README.md",
new facade.fs.ReadStreamOptions {
highWaterMark = 1
}
)
).use { case (_, stream) =>
IO.deferred[Unit].flatMap { gate =>
stream
.evalTap(_ => gate.complete(()) *> IO.never)
.compile
.drain
.background
.use { _ =>
gate.get *> IO.raiseError(new Exception("too hot to handle!"))
}
}
}.attempt
}

}

0 comments on commit 7f26024

Please sign in to comment.