Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the Readable constructor handle 'close' events without 'end'. Fixes #478. #479

Merged
merged 5 commits into from
Apr 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ This file does not aim to be comprehensive (you have git history for that),
rather it lists changes that might impact your own code as a consumer of
this library.

2.7.5
-----
### Bugfix
* A Highland Stream that wraps `Readable` now properly handles the case where
the `Readable` emits the `close` event but not the `end` event (this can
happen with an `fs` read stream when it encounters an error). It will also end
the wrapper stream when it encounters an error (this happens when reading from
a non-existent file). Before, such streams would simply never end.
[#479](https://github.com/caolan/highland/pull/479).
Fixes [#478](https://github.com/caolan/highland/issues/478).

2.7.4
-----
### Bugfix
* `mergeOnError` no longer causes an `// Unhandled 'error' event` error when one
of its sources emits an error.
[#476](https://github.com/caolan/highland/pull/476).
Fixes [#475](https://github.com/caolan/highland/issues/475).

2.7.3
-----
Expand Down
33 changes: 32 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,51 @@ function nop() {
}

function pipeReadable(xs, stream) {
var streamEnded = false;

// write any errors into the stream
xs.on('error', writeStreamError);

// We need to bind to 'close' because not all streams will emit 'end' when
// they are done. For example, FS streams that try to read a non-existant
// file.
xs.once('close', tryEndStream);

// We need to bind to 'end' because some streams will emit *both* 'end'
// and 'close'. For example, FS streams that complete successfully. Such
// streams should not cause Stream#end() to be called twice. Well-behaved
// streams should call 'close' after 'end', so we don't have to worry
// about finding out about the end of the stream after emitEnd has been
// executed.
xs.once('end', recordStreamEnded);

xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
stream._destructors.push(function () {
xs.removeListener('error', writeStreamError);
xs.removeListener('close', tryEndStream);
xs.removeListener('end', recordStreamEnded);

if (xs.unpipe) {
xs.unpipe(stream);
}
xs.removeListener('error', writeStreamError);
});

function writeStreamError(err) {
stream.write(new StreamError(err));
tryEndStream();
}

function recordStreamEnded() {
streamEnded = true;
}

function tryEndStream() {
if (!streamEnded) {
streamEnded = true;
stream.end();
}
}
}

Expand Down
77 changes: 67 additions & 10 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,16 +531,19 @@ exports['constructor'] = {
this.tester = function (expected, test) {
return function (xs) {
test.same(xs, expected);
test.done();
};
};
callback();
},
'passing Stream to constructor returns original': function (test) {
test.expect(1);
var s = _([1,2,3]);
test.strictEqual(s, _(s));
test.done();
},
'from Readable with next function - issue #303': function (test) {
test.expect(1);
var Readable = Stream.Readable;

var rs = new Readable;
Expand All @@ -551,9 +554,9 @@ exports['constructor'] = {
rs.push(null);
_(rs).invoke('toString', ['utf8'])
.toArray(this.tester(['a', 'b', 'c'], test));
test.done();
},
'from Readable - unpipes on destroy': function (test) {
test.expect(2);
var rs = streamify([1, 2, 3]);

var s = _(rs);
Expand All @@ -571,15 +574,67 @@ exports['constructor'] = {
test.ok(!writtenTo, 'Drain should not cause write to be called.');
test.done();
},
"from Readable - emits 'close' not 'end' - issue #478": function (test) {
test.expect(1);
var rs = new Stream.Readable();
rs._read = function (size) {
this.emit('close');
};
var s = _(rs);
s.pull(valueEquals(test, _.nil));
test.done();
},
"from Readable - emits 'close' and 'end' - issue #478": function (test) {
test.expect(2);
var rs = new Stream.Readable();
rs._read = function (size) {
this.push(null);
};
rs.on('end', function () {
_.setImmediate(function () {
rs.emit('close');
});
});
var s = _(rs);
var oldEnd = s.end;
var numTimesEndCalled = 0;
s.end = function () {
numTimesEndCalled++;
oldEnd.call(s);
};
rs.on('close', function () {
// Wait for the rest of the close handlers to be called before
// checking.
_.setImmediate(function () {
test.equal(numTimesEndCalled, 1, 'end() should only be called once.');
test.done();
});
});
s.pull(valueEquals(test, _.nil));
},
"from Readable - emits 'error' - issue #478": function (test) {
test.expect(2);
var rs = new Stream.Readable();
rs._read = function (size) {
// Infinite stream!
};
var s = _(rs);
rs.emit('error', new Error('error'));
s.pull(errorEquals(test, 'error'));
s.pull(valueEquals(test, _.nil));
test.done();
},
'throws error for unsupported object': function (test) {
test.expect(1);
test.throws(function () {
_({}).done(function () {});
}, Error, 'Object was not a stream, promise, iterator or iterable: object');
test.done();
},
'from promise': function (test) {
_(Promise.resolve(3)).toArray(this.tester([3], test));
test.done();
test.expect(1);
_(Promise.resolve(3))
.toArray(this.tester([3], test));
},
'from promise - errors': function (test) {
test.expect(3);
Expand All @@ -596,20 +651,22 @@ exports['constructor'] = {
});
},
'from iterator': function (test) {
test.expect(1);
_(this.createTestIterator([1, 2, 3, 4, 5]))
.toArray(this.tester([1, 2, 3, 4, 5], test));
test.done();
},
'from iterator - error': function (test) {
_(this.createTestIterator([1, 2, 3, 4, 5], new Error('Error at index 2'))).errors(function (err) {
test.equals(err.message, 'Error at index 2');
}).toArray(this.tester([1, 2], test));
test.done();
test.expect(2);
_(this.createTestIterator([1, 2, 3, 4, 5], new Error('Error at index 2')))
.errors(function (err) {
test.equals(err.message, 'Error at index 2');
})
.toArray(this.tester([1, 2], test));
},
'from iterator - final return falsy': function (test) {
test.expect(1);
_(this.createTestIterator([1, 2, 3, 4, 5], void 0, 0)).toArray(this.tester([1, 2, 3, 4, 5, 0], test));
test.done();
_(this.createTestIterator([1, 2, 3, 4, 5], void 0, 0))
.toArray(this.tester([1, 2, 3, 4, 5, 0], test));
}
};

Expand Down