diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 22f010b9834385..1b04a0f70a1e62 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -238,13 +238,14 @@ module.exports = function compose(...streams) { ondrain = null; onfinish = null; + if (isNodeStream(tail)) { + destroyer(tail, err); + } + if (onclose === null) { callback(err); } else { onclose = callback; - if (isNodeStream(tail)) { - destroyer(tail, err); - } } }; diff --git a/test/parallel/test-stream-compose.js b/test/parallel/test-stream-compose.js index cba7f9519d87eb..1ff8c39b7a2234 100644 --- a/test/parallel/test-stream-compose.js +++ b/test/parallel/test-stream-compose.js @@ -4,6 +4,7 @@ const common = require('../common'); const { + Duplex, Readable, Transform, Writable, @@ -494,3 +495,47 @@ const assert = require('assert'); assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]); })().then(common.mustCall()); } + +{ + class DuplexProcess extends Duplex { + constructor(options) { + super({ ...options, objectMode: true }); + this.stuff = []; + } + + _write(message, _, callback) { + this.stuff.push(message); + callback(); + } + + _destroy(err, cb) { + cb(err); + } + + _read() { + if (this.stuff.length) { + this.push(this.stuff.shift()); + } else if (this.writableEnded) { + this.push(null); + } else { + this._read(); + } + } + } + + const pass = new PassThrough({ objectMode: true }); + const duplex = new DuplexProcess(); + + const composed = compose( + pass, + duplex + ).on('error', () => {}); + + composed.write('hello'); + composed.write('world'); + composed.end(); + + composed.destroy(new Error('an unexpected error')); + assert.strictEqual(duplex.destroyed, true); + +}