From f54a59176d21e732dc4549e1a2151ce1bede5185 Mon Sep 17 00:00:00 2001 From: Victor Vu Date: Mon, 7 Sep 2015 16:48:32 -0600 Subject: [PATCH] Keep a reference to the correct consumer/observer array in _send. --- lib/index.js | 14 ++++++++---- test/test.js | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/lib/index.js b/lib/index.js index ec8f949..081f8c6 100755 --- a/lib/index.js +++ b/lib/index.js @@ -688,14 +688,20 @@ Stream.prototype._send = function (err, x) { } if (this._consumers.length) { token = err ? new StreamError(err) : x; - for (var i = 0, len = this._consumers.length; i < len; i++) { - this._consumers[i].write(token); + // this._consumers may be changed from under us, + // so we keep a copy. + var consumers = this._consumers; + for (var i = 0, len = consumers.length; i < len; i++) { + consumers[i].write(token); } } if (this._observers.length) { token = err ? new StreamError(err) : x; - for (var j = 0, len2 = this._observers.length; j < len2; j++) { - this._observers[j].write(token); + // this._observers may be changed from under us, + // so we keep a copy. + var observers = this._observers; + for (var j = 0, len2 = observers.length; j < len2; j++) { + observers[j].write(token); } } if (this._send_events) { diff --git a/test/test.js b/test/test.js index 82ad22c..a26430a 100755 --- a/test/test.js +++ b/test/test.js @@ -369,6 +369,34 @@ exports['consume - push nil async (issue #173)'] = function (test) { }); }; +exports['consume - fork after consume should not throw (issue #366)'] = function (test) { + test.expect(2); + var arr1, arr2; + + var s = _(); + var s1 = s.toArray(function (a) { + arr1 = a; + if (arr1 && arr2) { + runTest(); + } + }); + var s2 = s.fork().toArray(function (a) { + arr2 = a; + if (arr1 && arr2) { + runTest(); + } + }); + + s.write(1); + s.end(); + + function runTest() { + test.same(arr1, [1]); + test.same(arr2, [1]); + test.done(); + } +} + exports['constructor'] = { setUp: function (callback) { this.clock = sinon.useFakeTimers(); @@ -748,7 +776,7 @@ exports['generator throws error if push called after nil'] = function (test) { test.done(); }; -exports['consume throws error if push called after nil'] = function (test) { +exports['consume - throws error if push called after nil'] = function (test) { test.expect(1); var s = _([1,2,3]); var s2 = s.consume(function (err, x, push, next) { @@ -765,7 +793,7 @@ exports['consume throws error if push called after nil'] = function (test) { test.done(); }; -exports['consume throws error if next called after nil'] = function (test) { +exports['consume - throws error if next called after nil'] = function (test) { test.expect(1); var s = _([1,2,3]); var nil_seen = false; @@ -2003,6 +2031,35 @@ exports['observe - observers should be destroyed (issue #208)'] = function (test test.done(); }; +exports['observe - observe consume before source emit should not throw'] = function (test) { + test.expect(2); + var arr1, arr2; + + var s = _(); + var s1 = s.observe().toArray(function (a) { + arr1 = a; + if (arr1 && arr2) { + runTest(); + } + }); + var s2 = s.observe().toArray(function (a) { + arr2 = a; + if (arr1 && arr2) { + runTest(); + } + }); + + s.write(1); + s.end(); + s.resume(); + + function runTest() { + test.same(arr1, [1]); + test.same(arr2, [1]); + test.done(); + } +}; + // TODO: test redirect after fork, forked streams should transfer over // TODO: test redirect after observe, observed streams should transfer over