From fa60eb83bed6b582260db4a503ea51fdb14b679b Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 14 Nov 2018 17:23:28 +0100 Subject: [PATCH] stream: correctly pause and resume after once('readable') Fixes: https://github.com/nodejs/node/issues/24281 PR-URL: https://github.com/nodejs/node/pull/24366 Reviewed-By: Anna Henningsen Reviewed-By: Franziska Hinkelmann --- lib/_stream_readable.js | 15 ++++++++-- ...st-stream-readable-readable-then-resume.js | 29 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-stream-readable-readable-then-resume.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 488d10a10b5bbd..3534557ab4644a 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -113,6 +113,7 @@ function ReadableState(options, stream, isDuplex) { this.emittedReadable = false; this.readableListening = false; this.resumeScheduled = false; + this.paused = true; // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; @@ -858,10 +859,16 @@ Readable.prototype.removeAllListeners = function(ev) { }; function updateReadableListening(self) { - self._readableState.readableListening = self.listenerCount('readable') > 0; + const state = self._readableState; + state.readableListening = self.listenerCount('readable') > 0; - // crude way to check if we should resume - if (self.listenerCount('data') > 0) { + if (state.resumeScheduled && !state.paused) { + // flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; + + // crude way to check if we should resume + } else if (self.listenerCount('data') > 0) { self.resume(); } } @@ -883,6 +890,7 @@ Readable.prototype.resume = function() { state.flowing = !state.readableListening; resume(this, state); } + state.paused = false; return this; }; @@ -913,6 +921,7 @@ Readable.prototype.pause = function() { this._readableState.flowing = false; this.emit('pause'); } + this._readableState.paused = true; return this; }; diff --git a/test/parallel/test-stream-readable-readable-then-resume.js b/test/parallel/test-stream-readable-readable-then-resume.js new file mode 100644 index 00000000000000..83cf49333a8d83 --- /dev/null +++ b/test/parallel/test-stream-readable-readable-then-resume.js @@ -0,0 +1,29 @@ +'use strict'; + +const common = require('../common'); +const { Readable } = require('stream'); + +// This test verifies that a stream could be resumed after +// removing the readable event in the same tick + +check(new Readable({ + objectMode: true, + highWaterMark: 1, + read() { + if (!this.first) { + this.push('hello'); + this.first = true; + return; + } + + this.push(null); + } +})); + +function check(s) { + const readableListener = common.mustNotCall(); + s.on('readable', readableListener); + s.on('end', common.mustCall()); + s.removeListener('readable', readableListener); + s.resume(); +}