From e72242f44b116b238761250134867f4ed4ecff65 Mon Sep 17 00:00:00 2001 From: Victor Vu Date: Mon, 20 Mar 2017 22:50:14 -0600 Subject: [PATCH 1/2] Attempt to fix the backpressure in 2.x. In certain cases, consuming a stream that has been resumed may cause a stream generator/consume handler to be called twice without next() ever being called. This is mostly relevant for .each(...).done(...) use cases. --- lib/index.js | 79 +++++++++++++++++++++++++++++++++------------------- test/test.js | 35 +++++++++++++++++++++++ 2 files changed, 86 insertions(+), 28 deletions(-) diff --git a/lib/index.js b/lib/index.js index 2259bb9..6b13ffe 100755 --- a/lib/index.js +++ b/lib/index.js @@ -704,6 +704,12 @@ function Stream(/*optional*/xs, /*optional*/secondArg, /*optional*/mappingHint) this._is_observer = false; this._in_consume_cb = false; this._repeat_resume = false; + + // Used by consume() to signal that next() hasn't been called, so resume() + // shouldn't ask for more data. Backpressure handling is getting fairly + // complicated, and this is very much a hack to get consume() backpressure + // to work correctly. + this._consume_waiting_for_next = false; this.source = null; // Old-style node Stream.pipe() checks for this @@ -759,14 +765,14 @@ function Stream(/*optional*/xs, /*optional*/secondArg, /*optional*/mappingHint) } self.write(new StreamRedirect(s)); if (!_paused) { - self.resume(); + self._resume(false); } } else { self._generator_running = false; } if (!self.paused) { - self.resume(); + self._resume(false); } }; @@ -1036,15 +1042,20 @@ Stream.prototype.pause = function () { Stream.prototype._checkBackPressure = function () { if (!this._consumers.length) { this._repeat_resume = false; - return this.pause(); + this.pause(); + return; } for (var i = 0, len = this._consumers.length; i < len; i++) { if (this._consumers[i].paused) { this._repeat_resume = false; - return this.pause(); + this.pause(); + return; } } - return this.resume(); + + if (this.paused) { + this._resume(false); + } }; /** @@ -1099,22 +1110,7 @@ Stream.prototype._sendOutgoing = function () { this._outgoing.splice(0, i); }; -/** - * Resumes a paused Stream. This will either read from the Stream's incoming - * buffer or request more data from an upstream source. Never call this method - * on a stream that has been consumed (via a call to [consume](#consume) or any - * other transform). - * - * @id resume - * @section Stream Objects - * @name Stream.resume() - * @api public - * - * var xs = _(generator); - * xs.resume(); - */ - -Stream.prototype.resume = function () { +Stream.prototype._resume = function (forceResumeSource) { //console.log(['resume', this.id]); if (this._resume_running || this._in_consume_cb) { //console.log(['resume already processing _incoming buffer, ignore resume call']); @@ -1138,8 +1134,10 @@ Stream.prototype.resume = function () { if (!this.paused && !this._is_observer) { // ask parent for more data if (this.source) { - //console.log(['ask parent for more data']); - this.source._checkBackPressure(); + if (!this._consume_waiting_for_next || forceResumeSource) { + //console.log(['ask parent for more data']); + this.source._checkBackPressure(); + } } // run _generator to fill up _incoming buffer else if (this._generator) { @@ -1155,6 +1153,25 @@ Stream.prototype.resume = function () { this._resume_running = false; }; +/** + * Resumes a paused Stream. This will either read from the Stream's incoming + * buffer or request more data from an upstream source. Never call this method + * on a stream that has been consumed (via a call to [consume](#consume) or any + * other transform). + * + * @id resume + * @section Stream Objects + * @name Stream.resume() + * @api public + * + * var xs = _(generator); + * xs.resume(); + */ + +Stream.prototype.resume = function () { + this._resume(true); +}; + /** * Ends a Stream. This is the same as sending a [nil](#nil) value as data. * You shouldn't need to call this directly, rather it will be called by @@ -1351,7 +1368,10 @@ Stream.prototype._addConsumer = function (s) { } s.source = this; this._consumers.push(s); - this._checkBackPressure(); + + if (this.paused && !this._consume_waiting_for_next) { + this._checkBackPressure(); + } }; /** @@ -1442,12 +1462,13 @@ Stream.prototype.consume = function (f) { if (x === nil) { // ended, remove consumer from source s._nil_pushed = true; + s._consume_waiting_for_next = false; self._removeConsumer(s); // We previously paused the stream, but since a nil was pushed, // next won't be called and we must manually resume. if (async) { - s.resume(); + s._resume(false); } } if (s.paused) { @@ -1464,6 +1485,7 @@ Stream.prototype.consume = function (f) { }; var next = function (s2) { //console.log(['next', async]); + s._consume_waiting_for_next = false; if (s._nil_pushed) { throw new Error('Cannot call next after nil'); } @@ -1477,11 +1499,11 @@ Stream.prototype.consume = function (f) { } s.write(new StreamRedirect(s2)); if (!_paused) { - s.resume(); + s._resume(false); } } else if (async) { - s.resume(); + s._resume(false); } else { next_called = true; @@ -1499,12 +1521,13 @@ Stream.prototype.consume = function (f) { // Don't pause if x is nil -- as next will never be called after if (!next_called && x !== nil) { + s._consume_waiting_for_next = true; s.pause(); } if (s._repeat_resume) { s._repeat_resume = false; - s.resume(); + s._resume(false); } }; self._addConsumer(s); diff --git a/test/test.js b/test/test.js index 8e2c296..233b80d 100755 --- a/test/test.js +++ b/test/test.js @@ -1330,6 +1330,41 @@ exports['consume - throws error if next called after nil'] = function (test) { test.done(); }; +exports['consume - call handler once without next() (issue #570)'] = function (test) { + test.expect(1); + var clock = sinon.useFakeTimers(); + var consumedCalledNum = 0; + _([1, 2, 3]) + .consume(function (err, x, push, next) { + consumedCalledNum++; + }) + .resume(); + clock.tick(10000); + clock.restore(); + test.equal(consumedCalledNum, 1); + test.done(); +}; + +exports['consume - consume resumed stream - call handler once without next() (issue #570)'] = function (test) { + test.expect(1); + var clock = sinon.useFakeTimers(); + var consumedCalledNum = 0; + var s = _([1, 2, 3]) + .consume(function (err, x, push, next) { + consumedCalledNum++; + }); + s.resume(); + s.consume(function (err, x, push, next) { + if (x !== _.nil) { + next(); + } + }).resume(); + clock.tick(10000); + clock.restore(); + test.equal(consumedCalledNum, 1); + test.done(); +}; + exports.errors = function (test) { var errs = []; var err1 = new Error('one'); From 030e03b3b5af119d11e2083dab498eed1b3bc344 Mon Sep 17 00:00:00 2001 From: Victor Vu Date: Mon, 20 Mar 2017 23:27:14 -0600 Subject: [PATCH 2/2] Add changelog entry. --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec6c204..d36c214 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ This file does not aim to be comprehensive (you have git history for that), rather it lists changes that might impact your own code as a consumer of this library. +2.10.3 +------ +### Bugfix +* In certain cases, consuming a stream that has been resumed may cause a stream + generator/consume handler to be called twice without next() ever being called. + This is mostly relevant for .each(...).done(...) use cases. + Noticed in [#570 (comment)](https://github.com/caolan/highland/issues/570#issuecomment-287980514). + [#608](https://github.com/caolan/highland/issues/608). + 2.10.2 ------ ### Bugfix