Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix the backpressure in 2.x. #608

Merged
merged 2 commits into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ This file does not aim to be comprehensive (you have git history for that),
rather it lists changes that might impact your own code as a consumer of
this library.

2.10.3
------
### Bugfix
* In certain cases, consuming a stream that has been resumed may cause a stream
generator/consume handler to be called twice without next() ever being called.
This is mostly relevant for .each(...).done(...) use cases.
Noticed in [#570 (comment)](https://github.com/caolan/highland/issues/570#issuecomment-287980514).
[#608](https://github.com/caolan/highland/issues/608).

2.10.2
------
### Bugfix
Expand Down
79 changes: 51 additions & 28 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,12 @@ function Stream(/*optional*/xs, /*optional*/secondArg, /*optional*/mappingHint)
this._is_observer = false;
this._in_consume_cb = false;
this._repeat_resume = false;

// Used by consume() to signal that next() hasn't been called, so resume()
// shouldn't ask for more data. Backpressure handling is getting fairly
// complicated, and this is very much a hack to get consume() backpressure
// to work correctly.
this._consume_waiting_for_next = false;
this.source = null;

// Old-style node Stream.pipe() checks for this
Expand Down Expand Up @@ -759,14 +765,14 @@ function Stream(/*optional*/xs, /*optional*/secondArg, /*optional*/mappingHint)
}
self.write(new StreamRedirect(s));
if (!_paused) {
self.resume();
self._resume(false);
}
}
else {
self._generator_running = false;
}
if (!self.paused) {
self.resume();
self._resume(false);
}
};

Expand Down Expand Up @@ -1036,15 +1042,20 @@ Stream.prototype.pause = function () {
Stream.prototype._checkBackPressure = function () {
if (!this._consumers.length) {
this._repeat_resume = false;
return this.pause();
this.pause();
return;
}
for (var i = 0, len = this._consumers.length; i < len; i++) {
if (this._consumers[i].paused) {
this._repeat_resume = false;
return this.pause();
this.pause();
return;
}
}
return this.resume();

if (this.paused) {
this._resume(false);
}
};

/**
Expand Down Expand Up @@ -1099,22 +1110,7 @@ Stream.prototype._sendOutgoing = function () {
this._outgoing.splice(0, i);
};

/**
* Resumes a paused Stream. This will either read from the Stream's incoming
* buffer or request more data from an upstream source. Never call this method
* on a stream that has been consumed (via a call to [consume](#consume) or any
* other transform).
*
* @id resume
* @section Stream Objects
* @name Stream.resume()
* @api public
*
* var xs = _(generator);
* xs.resume();
*/

Stream.prototype.resume = function () {
Stream.prototype._resume = function (forceResumeSource) {
//console.log(['resume', this.id]);
if (this._resume_running || this._in_consume_cb) {
//console.log(['resume already processing _incoming buffer, ignore resume call']);
Expand All @@ -1138,8 +1134,10 @@ Stream.prototype.resume = function () {
if (!this.paused && !this._is_observer) {
// ask parent for more data
if (this.source) {
//console.log(['ask parent for more data']);
this.source._checkBackPressure();
if (!this._consume_waiting_for_next || forceResumeSource) {
//console.log(['ask parent for more data']);
this.source._checkBackPressure();
}
}
// run _generator to fill up _incoming buffer
else if (this._generator) {
Expand All @@ -1155,6 +1153,25 @@ Stream.prototype.resume = function () {
this._resume_running = false;
};

/**
* Resumes a paused Stream. This will either read from the Stream's incoming
* buffer or request more data from an upstream source. Never call this method
* on a stream that has been consumed (via a call to [consume](#consume) or any
* other transform).
*
* @id resume
* @section Stream Objects
* @name Stream.resume()
* @api public
*
* var xs = _(generator);
* xs.resume();
*/

Stream.prototype.resume = function () {
this._resume(true);
};

/**
* Ends a Stream. This is the same as sending a [nil](#nil) value as data.
* You shouldn't need to call this directly, rather it will be called by
Expand Down Expand Up @@ -1351,7 +1368,10 @@ Stream.prototype._addConsumer = function (s) {
}
s.source = this;
this._consumers.push(s);
this._checkBackPressure();

if (this.paused && !this._consume_waiting_for_next) {
this._checkBackPressure();
}
};

/**
Expand Down Expand Up @@ -1442,12 +1462,13 @@ Stream.prototype.consume = function (f) {
if (x === nil) {
// ended, remove consumer from source
s._nil_pushed = true;
s._consume_waiting_for_next = false;
self._removeConsumer(s);

// We previously paused the stream, but since a nil was pushed,
// next won't be called and we must manually resume.
if (async) {
s.resume();
s._resume(false);
}
}
if (s.paused) {
Expand All @@ -1464,6 +1485,7 @@ Stream.prototype.consume = function (f) {
};
var next = function (s2) {
//console.log(['next', async]);
s._consume_waiting_for_next = false;
if (s._nil_pushed) {
throw new Error('Cannot call next after nil');
}
Expand All @@ -1477,11 +1499,11 @@ Stream.prototype.consume = function (f) {
}
s.write(new StreamRedirect(s2));
if (!_paused) {
s.resume();
s._resume(false);
}
}
else if (async) {
s.resume();
s._resume(false);
}
else {
next_called = true;
Expand All @@ -1499,12 +1521,13 @@ Stream.prototype.consume = function (f) {

// Don't pause if x is nil -- as next will never be called after
if (!next_called && x !== nil) {
s._consume_waiting_for_next = true;
s.pause();
}

if (s._repeat_resume) {
s._repeat_resume = false;
s.resume();
s._resume(false);
}
};
self._addConsumer(s);
Expand Down
35 changes: 35 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,41 @@ exports['consume - throws error if next called after nil'] = function (test) {
test.done();
};

exports['consume - call handler once without next() (issue #570)'] = function (test) {
test.expect(1);
var clock = sinon.useFakeTimers();
var consumedCalledNum = 0;
_([1, 2, 3])
.consume(function (err, x, push, next) {
consumedCalledNum++;
})
.resume();
clock.tick(10000);
clock.restore();
test.equal(consumedCalledNum, 1);
test.done();
};

exports['consume - consume resumed stream - call handler once without next() (issue #570)'] = function (test) {
test.expect(1);
var clock = sinon.useFakeTimers();
var consumedCalledNum = 0;
var s = _([1, 2, 3])
.consume(function (err, x, push, next) {
consumedCalledNum++;
});
s.resume();
s.consume(function (err, x, push, next) {
if (x !== _.nil) {
next();
}
}).resume();
clock.tick(10000);
clock.restore();
test.equal(consumedCalledNum, 1);
test.done();
};

exports.errors = function (test) {
var errs = [];
var err1 = new Error('one');
Expand Down