Skip to content

Commit

Permalink
stream: fix pipeline pump
Browse files Browse the repository at this point in the history
Refs: #39005

PR-URL: #39006
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
ronag authored and danielleadams committed Jun 15, 2021
1 parent 4c6193f commit c20e28e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 14 deletions.
72 changes: 58 additions & 14 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

const {
ArrayIsArray,
Promise,
SymbolAsyncIterator,
} = primordials;

Expand All @@ -13,21 +14,27 @@ const eos = require('internal/streams/end-of-stream');
const { once } = require('internal/util');
const destroyImpl = require('internal/streams/destroy');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
aggregateTwoErrors,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE,
},
} = require('internal/errors');

const { validateCallback } = require('internal/validators');

function noop() {}

const {
isIterable,
isReadable,
isStream,
} = require('internal/streams/utils');
const assert = require('internal/assert');

let EE;
let PassThrough;
let Readable;

Expand Down Expand Up @@ -101,25 +108,62 @@ async function* fromReadable(val) {
}

async function pump(iterable, writable, finish) {
if (!EE) {
EE = require('events');
}
let error;
let callback = noop;
const resume = (err) => {
error = aggregateTwoErrors(error, err);
const _callback = callback;
callback = noop;
_callback();
};
const onClose = () => {
resume(new ERR_STREAM_PREMATURE_CLOSE());
};

const waitForDrain = () => new Promise((resolve) => {
assert(callback === noop);
if (error || writable.destroyed) {
resolve();
} else {
callback = resolve;
}
});

writable
.on('drain', resume)
.on('error', resume)
.on('close', onClose);

try {
if (writable.writableNeedDrain === true) {
await EE.once(writable, 'drain');
if (writable.writableNeedDrain) {
await waitForDrain();
}

if (error) {
return;
}

for await (const chunk of iterable) {
if (!writable.write(chunk)) {
if (writable.destroyed) return;
await EE.once(writable, 'drain');
await waitForDrain();
}
if (error) {
return;
}
}

if (error) {
return;
}

writable.end();
} catch (err) {
error = err;
error = aggregateTwoErrors(error, err);
} finally {
writable
.off('drain', resume)
.off('error', resume)
.off('close', onClose);
finish(error);
}
}
Expand Down
33 changes: 33 additions & 0 deletions test/parallel/test-stream-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -1387,3 +1387,36 @@ const net = require('net');
assert.strictEqual(res, content);
}));
}

{
const writableLike = new Stream();
writableLike.writableNeedDrain = true;

pipeline(
async function *() {},
writableLike,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
})
);

writableLike.emit('close');
}

{
const writableLike = new Stream();
writableLike.write = () => false;

pipeline(
async function *() {
yield null;
yield null;
},
writableLike,
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
})
);

writableLike.emit('close');
}

0 comments on commit c20e28e

Please sign in to comment.