Skip to content

Commit 663a6b4

Browse files
mcollinaMylesBorins
authored andcommitted
stream: make all streams error in a pipeline
This changes makes all stream in a pipeline emit 'error' in case of an abnormal termination of the pipeline. If the last stream is currently being async iterated, this change will make the iteration reject accordingly. See: #30861 Fixes: #28194 PR-URL: #30869 Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent c43461a commit 663a6b4

File tree

3 files changed

+55
-6
lines changed

3 files changed

+55
-6
lines changed

lib/internal/streams/pipeline.js

+18-6
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,21 @@ function destroyer(stream, reading, writing, callback) {
4343

4444
// request.destroy just do .end - .abort is what we want
4545
if (isRequest(stream)) return stream.abort();
46-
if (typeof stream.destroy === 'function') return stream.destroy();
46+
if (typeof stream.destroy === 'function') {
47+
if (stream.req && stream._writableState === undefined) {
48+
// This is a ClientRequest
49+
// TODO(mcollina): backward compatible fix to avoid crashing.
50+
// Possibly remove in a later semver-major change.
51+
stream.req.on('error', noop);
52+
}
53+
return stream.destroy(err);
54+
}
4755

4856
callback(err || new ERR_STREAM_DESTROYED('pipe'));
4957
};
5058
}
5159

52-
function call(fn) {
53-
fn();
54-
}
60+
function noop() {}
5561

5662
function pipe(from, to) {
5763
return from.pipe(to);
@@ -81,9 +87,15 @@ function pipeline(...streams) {
8187
const writing = i > 0;
8288
return destroyer(stream, reading, writing, function(err) {
8389
if (!error) error = err;
84-
if (err) destroys.forEach(call);
90+
if (err) {
91+
for (const destroy of destroys) {
92+
destroy(err);
93+
}
94+
}
8595
if (reading) return;
86-
destroys.forEach(call);
96+
for (const destroy of destroys) {
97+
destroy();
98+
}
8799
callback(error);
88100
});
89101
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable, PassThrough, pipeline } = require('stream');
5+
const assert = require('assert');
6+
7+
const _err = new Error('kaboom');
8+
9+
async function run() {
10+
const source = new Readable({
11+
read() {
12+
}
13+
});
14+
source.push('hello');
15+
source.push('world');
16+
17+
setImmediate(() => { source.destroy(_err); });
18+
19+
const iterator = pipeline(
20+
source,
21+
new PassThrough(),
22+
() => {});
23+
24+
iterator.setEncoding('utf8');
25+
26+
for await (const k of iterator) {
27+
assert.strictEqual(k, 'helloworld');
28+
}
29+
}
30+
31+
run().catch(common.mustCall((err) => assert.strictEqual(err, _err)));

test/parallel/test-stream-pipeline.js

+6
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ const { promisify } = require('util');
119119
transform.on('close', common.mustCall());
120120
write.on('close', common.mustCall());
121121

122+
[read, transform, write].forEach((stream) => {
123+
stream.on('error', common.mustCall((err) => {
124+
assert.deepStrictEqual(err, new Error('kaboom'));
125+
}));
126+
});
127+
122128
const dst = pipeline(read, transform, write, common.mustCall((err) => {
123129
assert.deepStrictEqual(err, new Error('kaboom'));
124130
}));

0 commit comments

Comments
 (0)