Skip to content

Commit

Permalink
We also need to emit nil on error.
Browse files Browse the repository at this point in the history
  • Loading branch information
vqvu committed Apr 15, 2016
1 parent b921a60 commit 2551b45
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
27 changes: 17 additions & 10 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,44 +379,51 @@ function nop() {
}

function pipeReadable(xs, stream) {
var streamEnded = false;

// write any errors into the stream
xs.on('error', writeStreamError);

// We need to bind to 'close' because not all streams will emit 'end' when
// they are done. For example, FS streams that try to read a non-existant
// file.
xs.once('close', emitEnd);
xs.once('close', tryEndStream);

// We need to bind to 'end' because some streams will emit *both* 'end'
// and 'close'. For example, FS streams that complete successfully. Such
// streams should not cause Stream#end() to be called twice. Well-behaved
// streams should call 'close' after 'end', so we don't have to worry
// about finding out about the end of the stream after emitEnd has been
// executed.
xs.once('end', unbindCloseHandler);
xs.once('end', recordStreamEnded);

xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
stream._destructors.push(function () {
xs.removeListener('error', writeStreamError);
xs.removeListener('close', tryEndStream);
xs.removeListener('end', recordStreamEnded);

if (xs.unpipe) {
xs.unpipe(stream);
}

xs.removeListener('error', writeStreamError);
xs.removeListener('close', emitEnd);
xs.removeListener('end', unbindCloseHandler);
});

function writeStreamError(err) {
stream.write(new StreamError(err));
tryEndStream();
}

function emitEnd() {
stream.end();
function recordStreamEnded() {
streamEnded = true;
}

function unbindCloseHandler() {
xs.removeListener('close', unbindCloseHandler);
function tryEndStream() {
if (!streamEnded) {
streamEnded = true;
stream.end();
}
}
}

Expand Down
20 changes: 16 additions & 4 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -574,22 +574,22 @@ exports['constructor'] = {
test.ok(!writtenTo, 'Drain should not cause write to be called.');
test.done();
},
"from Readable that emits 'close' not 'end' - issue #478": function (test) {
"from Readable - emits 'close' not 'end' - issue #478": function (test) {
test.expect(1);
var rs = new Stream.Readable();
rs._read = function (size) {
this.emit('close');
}
};
var s = _(rs);
s.pull(valueEquals(test, _.nil));
test.done();
},
"from Readable that emits 'close' and 'end' - issue #478": function (test) {
"from Readable - emits 'close' and 'end' - issue #478": function (test) {
test.expect(2);
var rs = new Stream.Readable();
rs._read = function (size) {
this.push(null);
}
};
rs.on('end', function () {
_.setImmediate(function () {
rs.emit('close');
Expand All @@ -612,6 +612,18 @@ exports['constructor'] = {
});
s.pull(valueEquals(test, _.nil));
},
"from Readable - emits 'error' - issue #478": function (test) {
test.expect(2);
var rs = new Stream.Readable();
rs._read = function (size) {
// Infinite stream!
};
var s = _(rs);
rs.emit('error', new Error('error'));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();
},
'throws error for unsupported object': function (test) {
test.expect(1);
test.throws(function () {
Expand Down

0 comments on commit 2551b45

Please sign in to comment.