Skip to content

Commit

Permalink
stream: ensure pipeline always destroys streams
Browse files Browse the repository at this point in the history
There was an edge case where an incorrect assumption was made
in regardos whether eos/finished means that the stream is
actually destroyed or not.

Backport-PR-URL: nodejs#31975
PR-URL: nodejs#31940
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
  • Loading branch information
ronag committed Feb 27, 2020
1 parent 21a0dd0 commit 23011b9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
17 changes: 5 additions & 12 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,20 @@ function destroyStream(stream, err) {

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

let closed = false;
stream.on('close', () => {
closed = true;
});
let destroyed = false;

if (eos === undefined) eos = require('internal/streams/end-of-stream');
eos(stream, { readable: reading, writable: writing }, (err) => {
if (err) return callback(err);
closed = true;
callback();
if (destroyed) return;
destroyed = true;
destroyStream(stream, err);
callback(err);
});

let destroyed = false;
return (err) => {
if (closed) return;
if (destroyed) return;
destroyed = true;

destroyStream(stream, err);

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}
Expand Down
15 changes: 14 additions & 1 deletion test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,10 @@ const { promisify } = require('util');
s.emit('data', 'asd');
s.emit('end');
});
s.close = common.mustCall();
// 'destroyer' can be called multiple times,
// once from stream wrapper and
// once from iterator wrapper.
s.close = common.mustCallAtLeast(1);
let ret = '';
pipeline(s, async function(source) {
for await (const chunk of source) {
Expand Down Expand Up @@ -909,3 +912,13 @@ const { promisify } = require('util');
assert.strictEqual(err.message, 'kaboom');
}));
}

{
const src = new PassThrough({ autoDestroy: false });
const dst = new PassThrough({ autoDestroy: false });
pipeline(src, dst, common.mustCall(() => {
assert.strictEqual(src.destroyed, true);
assert.strictEqual(dst.destroyed, true);
}));
src.end();
}

0 comments on commit 23011b9

Please sign in to comment.