From 0b80b1dfb29dcb36cc609de3500b61288bd3cd20 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 28 Jun 2020 18:38:20 +0200 Subject: [PATCH] stream: destroy wrapped streams on error Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. --- lib/_stream_readable.js | 28 +++++++++++++--- .../test-stream2-readable-wrap-error.js | 32 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream2-readable-wrap-error.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 952f2d75b84839..d33d53de2b23a4 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; -const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -1051,10 +1050,29 @@ Readable.prototype.wrap = function(stream) { } } - // Proxy certain important events. - for (const kProxyEvent of kProxyEvents) { - stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent)); - } + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + // TODO(ronag): Update readable state? + this.emit('close'); + }); + + stream.on('destroy', () => { + // TODO(ronag): this.destroy()? + this.emit('destroy'); + }); + + stream.on('pause', () => { + // TODO(ronag): this.pause()? + this.emit('pause'); + }); + + stream.on('resume', () => { + // TODO(ronag): this.resume()? + this.emit('resume'); + }); // When we try to consume some more bytes, simply unpause the // underlying stream. diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js new file mode 100644 index 00000000000000..b56b9bc41c7527 --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -0,0 +1,32 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + const r = new Readable({ autoDestroy: true }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, true); + })); + oldStream.emit('error', new Error()); +} + +{ + const r = new Readable({ autoDestroy: false }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, false); + })); + oldStream.emit('error', new Error()); +}