From 9dcf18a5c0449700598d26dd8b0d252c1eb6ce5d Mon Sep 17 00:00:00 2001 From: Christopher Luke Date: Wed, 15 Mar 2017 22:53:35 -0700 Subject: [PATCH] stream: Fixes missing 'unpipe' event Currently when the destination emits an 'error', 'finish' or 'close' event the pipe calls unpipe to emit 'unpipe' and trigger the clean up of all it's listeners. When the source emits an 'end' event without {end: false} it calls end() on the destination leading it to emit a 'close', this will again lead to the pipe calling unpipe. However the source emitting an 'end' event along side {end: false} is the only time the cleanup gets ran directly without unpipe being called. This fixes that so the 'unpipe' event does get emitted and cleanup in turn gets ran by that event. Fixes: https://github.com/nodejs/node/issues/11837 PR-URL: https://github.com/nodejs/node/pull/11876 Reviewed-By: Matteo Collina Reviewed-By: Colin Ihrig --- lib/_stream_readable.js | 4 +- test/parallel/test-stream-unpipe-event.js | 85 +++++++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-stream-unpipe-event.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 5d0e8aa243e9d6..b2859ea3d28af8 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -511,7 +511,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest !== process.stdout && dest !== process.stderr; - var endFn = doEnd ? onend : cleanup; + var endFn = doEnd ? onend : unpipe; if (state.endEmitted) process.nextTick(endFn); else @@ -547,7 +547,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { dest.removeListener('error', onerror); dest.removeListener('unpipe', onunpipe); src.removeListener('end', onend); - src.removeListener('end', cleanup); + src.removeListener('end', unpipe); src.removeListener('data', ondata); cleanedUp = true; diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js new file mode 100644 index 00000000000000..befa24253d33f5 --- /dev/null +++ b/test/parallel/test-stream-unpipe-event.js @@ -0,0 +1,85 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const {Writable, Readable} = require('stream'); +class NullWriteable extends Writable { + _write(chunk, encoding, callback) { + return callback(); + } +} +class QuickEndReadable extends Readable { + _read() { + this.push(null); + } +} +class NeverEndReadable extends Readable { + _read() {} +} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new QuickEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted')); + src.pipe(dest, {end: false}); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 1); + }); +} + +{ + const dest = new NullWriteable(); + const src = new NeverEndReadable(); + dest.on('pipe', common.mustCall()); + dest.on('unpipe', common.mustCall()); + src.pipe(dest, {end: false}); + src.unpipe(dest); + setImmediate(() => { + assert.strictEqual(src._readableState.pipesCount, 0); + }); +}