Skip to content

Commit

Permalink
Merge pull request #367 from vqvu/consume-mutates-consumers
Browse files Browse the repository at this point in the history
Keep a reference to the correct consumer/observer array in _send (resolves  #366).
  • Loading branch information
vqvu committed Sep 7, 2015
2 parents 746b1fa + f54a591 commit 02e4270
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 6 deletions.
14 changes: 10 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,20 @@ Stream.prototype._send = function (err, x) {
}
if (this._consumers.length) {
token = err ? new StreamError(err) : x;
for (var i = 0, len = this._consumers.length; i < len; i++) {
this._consumers[i].write(token);
// this._consumers may be changed from under us,
// so we keep a copy.
var consumers = this._consumers;

This comment has been minimized.

Copy link
@jgrund

jgrund Sep 7, 2015

Collaborator

A note on this, you are relying on a reference still, if someone mutates that reference you will still get the changes.

The reason this works right now is because the _removeConsumer does a reassignment. Cloning is still a safer way to go.

This comment has been minimized.

Copy link
@vqvu

vqvu Sep 7, 2015

Author Collaborator

I know, but cloning causes memory pressure, and I'd rather not do that in a function that is called every time an element is emitted. It's better to just make sure that changes to those arrays does a copy than to defensively copy all the time.

This comment has been minimized.

Copy link
@jgrund

jgrund Sep 7, 2015

Collaborator

I'd be interested to see the actual memory ramifications of creating a new array with existing objects inside it, but overall I can buy that argument.

for (var i = 0, len = consumers.length; i < len; i++) {
consumers[i].write(token);
}
}
if (this._observers.length) {
token = err ? new StreamError(err) : x;
for (var j = 0, len2 = this._observers.length; j < len2; j++) {
this._observers[j].write(token);
// this._observers may be changed from under us,
// so we keep a copy.
var observers = this._observers;
for (var j = 0, len2 = observers.length; j < len2; j++) {
observers[j].write(token);
}
}
if (this._send_events) {
Expand Down
61 changes: 59 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,34 @@ exports['consume - push nil async (issue #173)'] = function (test) {
});
};

exports['consume - fork after consume should not throw (issue #366)'] = function (test) {
test.expect(2);
var arr1, arr2;

var s = _();
var s1 = s.toArray(function (a) {
arr1 = a;
if (arr1 && arr2) {
runTest();
}
});
var s2 = s.fork().toArray(function (a) {
arr2 = a;
if (arr1 && arr2) {
runTest();
}
});

s.write(1);
s.end();

function runTest() {
test.same(arr1, [1]);
test.same(arr2, [1]);
test.done();
}
}

exports['constructor'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
Expand Down Expand Up @@ -748,7 +776,7 @@ exports['generator throws error if push called after nil'] = function (test) {
test.done();
};

exports['consume throws error if push called after nil'] = function (test) {
exports['consume - throws error if push called after nil'] = function (test) {
test.expect(1);
var s = _([1,2,3]);
var s2 = s.consume(function (err, x, push, next) {
Expand All @@ -765,7 +793,7 @@ exports['consume throws error if push called after nil'] = function (test) {
test.done();
};

exports['consume throws error if next called after nil'] = function (test) {
exports['consume - throws error if next called after nil'] = function (test) {
test.expect(1);
var s = _([1,2,3]);
var nil_seen = false;
Expand Down Expand Up @@ -2003,6 +2031,35 @@ exports['observe - observers should be destroyed (issue #208)'] = function (test
test.done();
};

exports['observe - observe consume before source emit should not throw'] = function (test) {
test.expect(2);
var arr1, arr2;

var s = _();
var s1 = s.observe().toArray(function (a) {
arr1 = a;
if (arr1 && arr2) {
runTest();
}
});
var s2 = s.observe().toArray(function (a) {
arr2 = a;
if (arr1 && arr2) {
runTest();
}
});

s.write(1);
s.end();
s.resume();

function runTest() {
test.same(arr1, [1]);
test.same(arr2, [1]);
test.done();
}
};

// TODO: test redirect after fork, forked streams should transfer over
// TODO: test redirect after observe, observed streams should transfer over

Expand Down

0 comments on commit 02e4270

Please sign in to comment.