Skip to content

Commit

Permalink
fix liveness bug and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stefano committed May 19, 2015
1 parent 1010c92 commit 7de1562
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
8 changes: 4 additions & 4 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2933,6 +2933,7 @@ Stream.prototype.parallel = function (n) {
if (running.length && running[0].buffer.length) {
// send buffered data
flushBuffer();
next();
// still waiting for more data before we can shift
// the running array...
}
Expand Down Expand Up @@ -2961,9 +2962,7 @@ Stream.prototype.parallel = function (n) {
if (running.length && running[0].buffer.length) {
flushBuffer();
}
else {
next();
}
next();

}
else {
Expand Down Expand Up @@ -2996,13 +2995,14 @@ Stream.prototype.parallel = function (n) {
if (buf[i][1] === nil) {
// this stream has ended
running.shift();
return next();
return;
}
else {
// send the buffered output
push.apply(null, buf[i]);
}
}
buf.length = 0;
}
// else wait for more data to arrive from running streams
});
Expand Down
42 changes: 41 additions & 1 deletion test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4770,7 +4770,7 @@ exports['parallel consume from async generator'] = function (test) {
});
};

exports['parallel - behaviour of parallel With fork() - issue #234'] = function (test) {
exports['parallel - behaviour of parallel with fork() - issue #234'] = function (test) {
test.expect(1);

function addTen(a) {
Expand Down Expand Up @@ -4799,6 +4799,46 @@ exports['parallel - behaviour of parallel With fork() - issue #234'] = function
});
};

exports['parallel - behaviour of parallel with fork() - invariant violation - issue #234'] = function (test) {
test.expect(1);
var clock = sinon.useFakeTimers(),
expected = [1, 2, 10, 20, 30, 40, 100];

function delay(push, ms, x) {
setTimeout(function () {
push(null, x);
}, ms);
}

var s1 = _(function (push) {
delay(push, 1, 1);
delay(push, 2, 2);
delay(push, 3, _.nil);
});

var s2 = _(function (push) {
delay(push, 1, 10);
delay(push, 2, 20);
delay(push, 3, 30);
delay(push, 4, 40);
delay(push, 5, _.nil);
});

var s3 = _(function (push, next) {
push(null, 100);
push(null, _.nil);
});

_([s1, s2, s3]).parallel(2)
.toArray(function (xs) {
test.same(xs, expected);
clock.restore();
test.done();
});

clock.tick(10);
};

exports['throttle'] = {
setUp: function (callback) {
this.clock = sinon.useFakeTimers();
Expand Down

0 comments on commit 7de1562

Please sign in to comment.