Skip to content

Commit

Permalink
stream: pipeline wait for close before calling the callback
Browse files Browse the repository at this point in the history
The pipeline should wait for close event to finish before calling
the callback.

The `finishCount` should not below 0 when calling finish function.

Fixes: #51540

Co-authored-by: wh0 <wh0@users.noreply.github.com>
PR-URL: #53462
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
jakecastelli and wh0 authored Jun 27, 2024
1 parent 27f1306 commit 8e5d88b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
12 changes: 8 additions & 4 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ function pipelineImpl(streams, callback, opts) {
finishImpl(err, --finishCount === 0);
}

function finishOnlyHandleError(err) {
finishImpl(err, false);
}

function finishImpl(err, final) {
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
error = err;
Expand Down Expand Up @@ -279,7 +283,7 @@ function pipelineImpl(streams, callback, opts) {
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
finishOnlyHandleError(err);
}
}
stream.on('error', onError);
Expand Down Expand Up @@ -372,7 +376,7 @@ function pipelineImpl(streams, callback, opts) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
const cleanup = pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, finishOnlyHandleError, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
Expand Down Expand Up @@ -415,12 +419,12 @@ function pipelineImpl(streams, callback, opts) {
return ret;
}

function pipe(src, dst, finish, { end }) {
function pipe(src, dst, finish, finishOnlyHandleError, { end }) {
let ended = false;
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
finish(new ERR_STREAM_PREMATURE_CLOSE());
finishOnlyHandleError(new ERR_STREAM_PREMATURE_CLOSE());
}
});

Expand Down
42 changes: 42 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1677,5 +1677,47 @@ tmpdir.refresh();
pipeline(r, w, common.mustCall((err) => {
assert.strictEqual(err, undefined);
}));
}

{
// See https://github.com/nodejs/node/issues/51540 for the following 2 tests
const src = new Readable();
const dst = new Writable({
destroy(error, cb) {
// Takes a while to destroy
setImmediate(cb);
},
});

pipeline(src, dst, (err) => {
assert.strictEqual(src.closed, true);
assert.strictEqual(dst.closed, true);
assert.strictEqual(err.message, 'problem');
});
src.destroy(new Error('problem'));
}

{
const src = new Readable();
const dst = new Writable({
destroy(error, cb) {
// Takes a while to destroy
setImmediate(cb);
},
});
const passThroughs = [];
for (let i = 0; i < 10; i++) {
passThroughs.push(new PassThrough());
}

pipeline(src, ...passThroughs, dst, (err) => {
assert.strictEqual(src.closed, true);
assert.strictEqual(dst.closed, true);
assert.strictEqual(err.message, 'problem');

for (let i = 0; i < passThroughs.length; i++) {
assert.strictEqual(passThroughs[i].closed, true);
}
});
src.destroy(new Error('problem'));
}

0 comments on commit 8e5d88b

Please sign in to comment.