From ffae5f3809c99f8356006fadb5cbe9e57bd260a4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 28 Jun 2020 18:43:59 +0200 Subject: [PATCH] stream: save error in state Useful for future PR's to resolve situations where e.g. finished() is invoked on an already errored streams. PR-URL: https://github.com/nodejs/node/pull/34103 Backport-PR-URL: https://github.com/nodejs/node/pull/34887 Refs: https://github.com/nodejs/node/issues/34680 Reviewed-By: Matteo Collina Reviewed-By: Denys Otrishko Reviewed-By: Anna Henningsen Reviewed-By: Luigi Pinca --- lib/_stream_readable.js | 7 +++- lib/_stream_writable.js | 16 +++++++- lib/internal/streams/destroy.js | 37 ++++++++++++------- test/parallel/test-stream-readable-destroy.js | 29 +++++++++++---- test/parallel/test-stream-writable-destroy.js | 24 ++++++++++-- .../test-stream2-readable-wrap-error.js | 10 +++-- 6 files changed, 90 insertions(+), 33 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c15850f9aa2064..af8786198674cb 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -145,8 +145,11 @@ function ReadableState(options, stream, isDuplex) { // Has it been destroyed. this.destroyed = false; - // Indicates whether the stream has errored. - this.errored = false; + // Indicates whether the stream has errored. When true no further + // _read calls, 'data' or 'readable' events should occur. This is needed + // since when autoDestroy is disabled we need a way to tell whether the + // stream has failed. + this.errored = null; // Indicates whether the stream has finished destroying. this.closed = false; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 236e236aa32459..4d42e5cd6b660a 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -171,7 +171,7 @@ function WritableState(options, stream, isDuplex) { // Indicates whether the stream has errored. When true all write() calls // should return false. This is needed since when autoDestroy // is disabled we need a way to tell whether the stream has failed. - this.errored = false; + this.errored = null; // Indicates whether the stream has finished destroying. this.closed = false; @@ -407,7 +407,19 @@ function onwrite(stream, er) { state.writelen = 0; if (er) { - state.errored = true; + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + // In case of duplex streams we need to notify the readable side of the + // error. + if (stream._readableState && !stream._readableState.errored) { + stream._readableState.errored = er; + } + if (sync) { process.nextTick(onwriteError, stream, state, er, cb); } else { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index f7f282ec5079f3..8cb7509ea66aa2 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -15,11 +15,14 @@ function destroy(err, cb) { } if (err) { - if (w) { - w.errored = true; + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } } @@ -35,11 +38,14 @@ function destroy(err, cb) { this._destroy(err || null, (err) => { if (err) { - if (w) { - w.errored = true; + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } } @@ -108,7 +114,7 @@ function undestroy() { r.closed = false; r.closeEmitted = false; r.destroyed = false; - r.errored = false; + r.errored = null; r.errorEmitted = false; r.reading = false; r.ended = false; @@ -118,7 +124,7 @@ function undestroy() { if (w) { w.closed = false; w.destroyed = false; - w.errored = false; + w.errored = null; w.ended = false; w.ending = false; w.finalCalled = false; @@ -145,11 +151,14 @@ function errorOrDestroy(stream, err, sync) { if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err); else if (err) { - if (w) { - w.errored = true; + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (w && !w.errored) { + w.errored = err; } - if (r) { - r.errored = true; + if (r && !r.errored) { + r.errored = err; } if (sync) { diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index 3d1ac8c92f9bd3..8ab78ec8ccec35 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -134,13 +134,13 @@ const assert = require('assert'); read.on('error', common.mustCall((err) => { assert.strictEqual(ticked, true); assert.strictEqual(read._readableState.errorEmitted, true); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(err, expected); })); read.destroy(); assert.strictEqual(read._readableState.errorEmitted, false); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(read.destroyed, true); ticked = true; } @@ -190,15 +190,15 @@ const assert = require('assert'); assert.strictEqual(err, expected); })); - assert.strictEqual(read._readableState.errored, false); + assert.strictEqual(read._readableState.errored, null); assert.strictEqual(read._readableState.errorEmitted, false); read.destroy(expected, common.mustCall(function(err) { - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); assert.strictEqual(err, expected); })); assert.strictEqual(read._readableState.errorEmitted, false); - assert.strictEqual(read._readableState.errored, true); + assert.strictEqual(read._readableState.errored, expected); ticked = true; } @@ -223,14 +223,14 @@ const assert = require('assert'); readable.destroy(); assert.strictEqual(readable.destroyed, true); - assert.strictEqual(readable._readableState.errored, false); + assert.strictEqual(readable._readableState.errored, null); assert.strictEqual(readable._readableState.errorEmitted, false); // Test case where `readable.destroy()` is called again with an error before // the `_destroy()` callback is called. readable.destroy(new Error('kaboom 2')); assert.strictEqual(readable._readableState.errorEmitted, false); - assert.strictEqual(readable._readableState.errored, false); + assert.strictEqual(readable._readableState.errored, null); ticked = true; } @@ -253,3 +253,18 @@ const assert = require('assert'); assert.strictEqual(read.destroyed, true); read.read(); } + +{ + const read = new Readable({ + autoDestroy: false, + read() { + this.push(null); + this.push('asd'); + } + }); + + read.on('error', common.mustCall(() => { + assert(read._readableState.errored); + })); + read.resume(); +} diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 706847a8582d0c..9f6136923ee176 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -167,9 +167,10 @@ const assert = require('assert'); assert.strictEqual(write._writableState.errorEmitted, true); })); - write.destroy(new Error('kaboom 1')); + const expected = new Error('kaboom 1'); + write.destroy(expected); write.destroy(new Error('kaboom 2')); - assert.strictEqual(write._writableState.errored, true); + assert.strictEqual(write._writableState.errored, expected); assert.strictEqual(write._writableState.errorEmitted, false); assert.strictEqual(write.destroyed, true); ticked = true; @@ -200,14 +201,14 @@ const assert = require('assert'); writable.destroy(); assert.strictEqual(writable.destroyed, true); - assert.strictEqual(writable._writableState.errored, false); + assert.strictEqual(writable._writableState.errored, null); assert.strictEqual(writable._writableState.errorEmitted, false); // Test case where `writable.destroy()` is called again with an error before // the `_destroy()` callback is called. writable.destroy(new Error('kaboom 2')); assert.strictEqual(writable._writableState.errorEmitted, false); - assert.strictEqual(writable._writableState.errored, false); + assert.strictEqual(writable._writableState.errored, null); ticked = true; } @@ -401,3 +402,18 @@ const assert = require('assert'); })); write.destroy(); } + +{ + const write = new Writable({ + autoDestroy: false, + write(chunk, enc, cb) { + cb(); + cb(); + } + }); + + write.on('error', common.mustCall(() => { + assert(write._writableState.errored); + })); + write.write('asd'); +} diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js index b56b9bc41c7527..ac1f64f69810f0 100644 --- a/test/parallel/test-stream2-readable-wrap-error.js +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -10,23 +10,25 @@ oldStream.pause = () => {}; oldStream.resume = () => {}; { + const err = new Error(); const r = new Readable({ autoDestroy: true }) .wrap(oldStream) .on('error', common.mustCall(() => { assert.strictEqual(r._readableState.errorEmitted, true); - assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r._readableState.errored, err); assert.strictEqual(r.destroyed, true); })); - oldStream.emit('error', new Error()); + oldStream.emit('error', err); } { + const err = new Error(); const r = new Readable({ autoDestroy: false }) .wrap(oldStream) .on('error', common.mustCall(() => { assert.strictEqual(r._readableState.errorEmitted, true); - assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r._readableState.errored, err); assert.strictEqual(r.destroyed, false); })); - oldStream.emit('error', new Error()); + oldStream.emit('error', err); }