Skip to content

Commit

Permalink
stream: reset flowing state if no 'readable' or 'data' listeners
Browse files Browse the repository at this point in the history
If we don't have any 'readable' or 'data' listeners and we are
not about to resume. Then reset flowing state to initial null state.

PR-URL: #31036
Fixes: #24474
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
ronag authored and BridgeAR committed Jan 3, 2020
1 parent 1463214 commit bca23b9
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 5 deletions.
26 changes: 21 additions & 5 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const {
ObjectDefineProperty,
ObjectSetPrototypeOf,
SymbolAsyncIterator,
Symbol
} = primordials;

module.exports = Readable;
Expand All @@ -51,6 +52,8 @@ const {
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
} = require('internal/errors').codes;

const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
Expand Down Expand Up @@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) {
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this.paused = true;
this[kPaused] = null;

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
Expand Down Expand Up @@ -170,6 +173,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', {
}
});

// Legacy property for `paused`
ObjectDefineProperty(ReadableState.prototype, 'paused', {
get() {
return this[kPaused] !== false;
},
set(value) {
this[kPaused] = !!value;
}
});

function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
Expand Down Expand Up @@ -365,7 +378,8 @@ function chunkInvalid(state, chunk) {


Readable.prototype.isPaused = function() {
return this._readableState.flowing === false;
const state = this._readableState;
return state[kPaused] === true || state.flowing === false;
};

// Backwards compatibility.
Expand Down Expand Up @@ -962,14 +976,16 @@ function updateReadableListening(self) {
const state = self._readableState;
state.readableListening = self.listenerCount('readable') > 0;

if (state.resumeScheduled && !state.paused) {
if (state.resumeScheduled && state[kPaused] === false) {
// Flowing needs to be set to true now, otherwise
// the upcoming resume will not flow.
state.flowing = true;

// Crude way to check if we should resume
} else if (self.listenerCount('data') > 0) {
self.resume();
} else if (!state.readableListening) {
state.flowing = null;
}
}

Expand All @@ -990,7 +1006,7 @@ Readable.prototype.resume = function() {
state.flowing = !state.readableListening;
resume(this, state);
}
state.paused = false;
state[kPaused] = false;
return this;
};

Expand Down Expand Up @@ -1021,7 +1037,7 @@ Readable.prototype.pause = function() {
this._readableState.flowing = false;
this.emit('pause');
}
this._readableState.paused = true;
this._readableState[kPaused] = true;
return this;
};

Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-readable-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');

const { Readable } = require('stream');

const readable = new Readable({
read() {}
});

function read() {}

readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);

process.nextTick(function() {
readable.on('data', common.mustCall());
readable.push('hello');
});
18 changes: 18 additions & 0 deletions test/parallel/test-stream-readable-pause-and-resume.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');

let ticks = 18;
Expand Down Expand Up @@ -38,3 +39,20 @@ function readAndPause() {

rs.on('data', ondata);
}

{
const readable = new Readable({
read() {}
});

function read() {}

readable.setEncoding('utf8');
readable.on('readable', read);
readable.removeListener('readable', read);
readable.pause();

process.nextTick(function() {
assert(readable.isPaused());
});
}

0 comments on commit bca23b9

Please sign in to comment.