Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#691] Propagate destroy from consumers to sources #692

Open
wants to merge 3 commits into
base: 2.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ function pipeReadable(xs, onFinish, stream) {
xs.pipe(stream);

// TODO: Replace with onDestroy in v3.
// ...although it appears `_destructors` are called `onEnd`, not just `onDestroy`?
stream._destructors.push(unbind);

function streamEndCb(error) {
Expand All @@ -495,12 +496,12 @@ function pipeReadable(xs, onFinish, stream) {
}

if (error == null || endOnError) {
unbind();
unbind(error);
stream.end();
}
}

function unbind() {
function unbind(error) {
if (unbound) {
return;
}
Expand All @@ -514,6 +515,16 @@ function pipeReadable(xs, onFinish, stream) {
if (xs.unpipe) {
xs.unpipe(stream);
}

// Destroy the wrapped `Readable` stream if it's not yet ended.
// i.e. this was ended externally, perhaps by a consumer.
// TODO: Use something other than `readableEnded` as it was introduced recently (Node v12.9.0)
if (!xs.readableEnded) {
// NOTE: Not sure whether `error` is necessary here as it'd only ever come from
// the unbind call in `streamEndCb` which originates from the `xs` in the
// first place, so it'd prob be able to handle itself?
xs.destroy(error);
}
}
}

Expand Down Expand Up @@ -1327,6 +1338,8 @@ Stream.prototype.pipe = function (dest, options) {
*/

Stream.prototype.destroy = function () {
var source = this.source;

if (this.ended) {
return;
}
Expand All @@ -1336,6 +1349,19 @@ Stream.prototype.destroy = function () {
}

this._onEnd();

if (!this._is_observer && source) {
// TODO: Should we only destroy the source if nothing else is consuming it, i.e. all sibling `_consumers` are already
// destroyed / ended?
// i.e.
// var s = _()
// var s1 = _().fork()
// var s2 = _().fork()
// s1.destroy() // s still alive
// s2.destroy() // s now destroyed
// and ignore `_observers`?
source.destroy();
}
};

/**
Expand Down Expand Up @@ -1618,7 +1644,7 @@ Stream.prototype.pull = function (f) {
*
* Only call this function on streams that were constructed with no source
* (i.e., with `_()`).

* @id write
* @section Stream Objects
* @name Stream.write(x)
Expand Down Expand Up @@ -2426,7 +2452,7 @@ var objectOnly = _.curry(function(strategy, x) {
* {breed: 'labrador', name: 'Rocky', age: 3},
* {breed: 'german-shepherd', name: 'Waffles', age: 9}
* ];

* _(dogs).pickBy(function (key, value) {
* return value > 4;
* }).toArray(function (xs) {
Expand Down