Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

stream: Throw on 'error' if listeners removed #6075

Merged
merged 1 commit into from
Aug 19, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -511,14 +511,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {

// if the dest has an error, then stop piping into it.
// however, don't suppress the throwing behavior for this.
// check for listeners before emit removes one-time listeners.
var errListeners = EE.listenerCount(dest, 'error');
function onerror(er) {
unpipe();
if (errListeners === 0 && EE.listenerCount(dest, 'error') === 0)
dest.removeListener('error', onerror);
if (EE.listenerCount(dest, 'error') === 0)
dest.emit('error', er);
}
dest.once('error', onerror);
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS.
if (!dest._events.error)
dest.on('error', onerror);
else if (Array.isArray(dest._events.error))
dest._events.error.unshift(onerror);
else
dest._events.error = [onerror, dest._events.error];



// Both close and finish should trigger unpipe, but only once.
function onclose() {
Expand Down
73 changes: 73 additions & 0 deletions test/simple/test-stream-pipe-error-handling.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,76 @@ var Stream = require('stream').Stream;

assert.strictEqual(gotErr, err);
})();

(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;

var r = new R;
var w = new W;
var removed = false;
var didTest = false;

process.on('exit', function() {
assert(didTest);
console.log('ok');
});

r._read = function() {
setTimeout(function() {
assert(removed);
assert.throws(function() {
w.emit('error', new Error('fail'));
});
didTest = true;
});
};

w.on('error', myOnError);
r.pipe(w);
w.removeListener('error', myOnError);
removed = true;

function myOnError(er) {
throw new Error('this should not happen');
}
})();

(function testErrorWithRemovedListenerThrows() {
var EE = require('events').EventEmitter;
var R = Stream.Readable;
var W = Stream.Writable;

var r = new R;
var w = new W;
var removed = false;
var didTest = false;
var caught = false;

process.on('exit', function() {
assert(didTest);
console.log('ok');
});

r._read = function() {
setTimeout(function() {
assert(removed);
w.emit('error', new Error('fail'));
didTest = true;
});
};

w.on('error', myOnError);
w._write = function() {};

r.pipe(w);
// Removing some OTHER random listener should not do anything
w.removeListener('error', function() {});
removed = true;

function myOnError(er) {
assert(!caught);
caught = true;
}
})();