Skip to content

Commit

Permalink
Removing a readable listener now updates the readable state i.e.
Browse files Browse the repository at this point in the history
readableListening, needReadable and emittedReadable are set to false.
Then, if a readable listener is added at a later time, the stream will
know none are attached and will take proper action to get the stream
going back again.

Fixes nodejs#7678
  • Loading branch information
plafer committed Aug 30, 2015
1 parent 2860c53 commit 3afaa0b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
13 changes: 12 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ Readable.prototype.on = function(ev, fn) {
self.read(0);
});
} else if (state.length) {
emitReadable(this, state);
setImmediate(emitReadable.bind(null, this));
}
}
}
Expand All @@ -698,6 +698,17 @@ Readable.prototype.on = function(ev, fn) {
};
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(type, listener) {
Stream.prototype.removeListener.call(this, type, listener);

if (type === 'readable' && EE.listenerCount(this, type) === 0) {
var state = this._readableState;
state.readableListening = false;
state.emittedReadable = false;
state.needReadable = false;
}
};

// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
Readable.prototype.resume = function() {
Expand Down
55 changes: 55 additions & 0 deletions test/simple/test-stream-readable-removeListener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var assert = require('assert');
var EE = require('events').EventEmitter;
var Readable = require('stream').Readable;

var buf = new Buffer(65536);
buf.fill(0);

var packetsToPush = 10;
var toUnshift = 16*1024;

Readable.prototype._read = function() {
var chunk = --packetsToPush > 0 ? buf.slice() : null;
setImmediate(this.push.bind(this, chunk));
}

var rStream = new Readable();

var first = true;
rStream.on('readable', function onReadable() {
var data = rStream.read();
if (first) {
first = false;
rStream.removeListener('readable', onReadable);
assert.equal(EE.listenerCount(rStream, 'readable'), 0);
rStream.unshift(buf.slice(0, toUnshift));
rStream.on('readable', onReadable);
}
});

process.on('exit', function() {
assert.equal(packetsToPush, 0);
});

rStream.read(0);

0 comments on commit 3afaa0b

Please sign in to comment.