diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5d0e8aa243e9d6..b2859ea3d28af8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -511,7 +511,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr; - var endFn = doEnd ? onend : cleanup; + var endFn = doEnd ? onend : unpipe; if (state.endEmitted) process.nextTick(endFn); else @@ -547,7 +547,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); - src.removeListener('end', cleanup); + src.removeListener('end', unpipe); src.removeListener('data', ondata); cleanedUp = true; diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js new file mode 100644 index 00000000000000..befa24253d33f5 --- /dev/null +++ b/test/parallel/test-stream-unpipe-event.js @@ -0,0 +1,85 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const {Writable, Readable} = require('stream'); +class NullWriteable extends Writable { + _write(chunk, encoding, callback) { + return callback(); + } +} +class QuickEndReadable extends Readable { + _read() { + this.push(null); + } +} +class NeverEndReadable extends Readable { + _read() {} +} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +}