diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 879e5ddb53fe40..a7f861eb9f18fc 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -518,10 +518,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.once('end', endFn); dest.on('unpipe', onunpipe); - function onunpipe(readable) { + function onunpipe(readable, unpipeInfo) { debug('onunpipe'); if (readable === src) { - cleanup(); + if (unpipeInfo && unpipeInfo.hasUnpiped === false) { + unpipeInfo.hasUnpiped = true; + cleanup(); + } } } @@ -647,6 +650,7 @@ function pipeOnDrain(src) { Readable.prototype.unpipe = function(dest) { var state = this._readableState; + var unpipeInfo = { hasUnpiped: false }; // if we're not piping anywhere, then do nothing. if (state.pipesCount === 0) @@ -666,7 +670,7 @@ Readable.prototype.unpipe = function(dest) { state.pipesCount = 0; state.flowing = false; if (dest) - dest.emit('unpipe', this); + dest.emit('unpipe', this, unpipeInfo); return this; } @@ -681,7 +685,7 @@ Readable.prototype.unpipe = function(dest) { state.flowing = false; for (var i = 0; i < len; i++) - dests[i].emit('unpipe', this); + dests[i].emit('unpipe', this, unpipeInfo); return this; } @@ -695,7 +699,7 @@ Readable.prototype.unpipe = function(dest) { if (state.pipesCount === 1) state.pipes = state.pipes[0]; - dest.emit('unpipe', this); + dest.emit('unpipe', this, unpipeInfo); return this; }; diff --git a/test/parallel/test-stream-pipe-same-destination-twice.js b/test/parallel/test-stream-pipe-same-destination-twice.js new file mode 100644 index 00000000000000..1824c0606451a2 --- /dev/null +++ b/test/parallel/test-stream-pipe-same-destination-twice.js @@ -0,0 +1,78 @@ +'use strict'; +const common = require('../common'); + +// Regression test for https://github.com/nodejs/node/issues/12718. +// Tests that piping a source stream twice to the same destination stream +// works, and that a subsequent unpipe() call only removes the pipe *once*. +const assert = require('assert'); +const { PassThrough, Writable } = require('stream'); + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert.strictEqual(`${chunk}`, 'foobar'); + cb(); + }) + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.unpipe(dest); + + assert.strictEqual(passThrough._events.data.length, 1); + assert.strictEqual(passThrough._readableState.pipesCount, 1); + assert.strictEqual(passThrough._readableState.pipes, dest); + + passThrough.write('foobar'); + passThrough.pipe(dest); +} + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustCall((chunk, encoding, cb) => { + assert.strictEqual(`${chunk}`, 'foobar'); + cb(); + }, 2) + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.write('foobar'); +} + +{ + const passThrough = new PassThrough(); + const dest = new Writable({ + write: common.mustNotCall() + }); + + passThrough.pipe(dest); + passThrough.pipe(dest); + + assert.strictEqual(passThrough._events.data.length, 2); + assert.strictEqual(passThrough._readableState.pipesCount, 2); + assert.strictEqual(passThrough._readableState.pipes[0], dest); + assert.strictEqual(passThrough._readableState.pipes[1], dest); + + passThrough.unpipe(dest); + passThrough.unpipe(dest); + + assert.strictEqual(passThrough._events.data, undefined); + assert.strictEqual(passThrough._readableState.pipesCount, 0); + + passThrough.write('foobar'); +}