Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Buffering Issue With parallel() #302

Merged
merged 1 commit into from
May 21, 2015

Conversation

svozza
Copy link
Collaborator

@svozza svozza commented May 18, 2015

Resolves #234. I finally got round to looking at this and although I've managed to fix it I'm still a bit hazy as to why what I've done works and wouldn't mind if someone could explain a bit as to what exactly is happening in the function. There's a lot of state and mutation going on and I find it very hard to follow.

@vqvu
Copy link
Collaborator

vqvu commented May 19, 2015

Yep. parallel as is is fairly complicated with lots of bookkeeping. Part of this is because it wants to be lazy and won't try to consume streams whose values you never end up needing. The implementation can be simplified by being a bit more eager on that front.

Here's the intent of the algorithm

Some definitions:

  • running - an array of buffered substream state. Ordered by the same order that their values should be emitted in.
  • running.length === 0 - No stream being consumed.
  • running[0].buffer - The buffer for the "top" stream (the one that is emitting downstream).
  • running[0].buffer.length === 0 - The top stream is currently running.
  • ended === true - No more substreams to run.
  • reading_source === true - In the process of pulling the next stream. Don't try to pull some more.

It goes like this:

  1. If there is data that can be emitted.
    1. Emit the data (flush the buffer).
    2. If the current buffer contain a nil, then the current stream is done, and we can drop its state.
    3. This step essentially clears out any streams at the top that are already completed. The fact that this block will never run when the top stream in running has not completed is a direct consequence of the property below.
  2. If we are not at the limit of currently running stream, and there are more streams, and we are not already pulling a stream.
    1. Pull the next stream.
    2. Once we get the stream, consume it.
    3. While consuming the stream,
      1. If the stream being consumed is the top stream
        1. If we have a value or error, short-circuit and push it directly.
        2. If we have a nil, drop our buffer and flush the next one. This is critical for making the behavior in (1) safe, and its omission was the reason for the bug in Behaviour Of _.parallel With fork() #234.
      2. Otherwise, buffer the items emitted.
  3. If we have no more stream and there are no more running streams, we stop.

Here's the property that makes (2)(iii)(a)(a) work and its proof.

At the start of the x.consume callback, if x is the top stream, then running[0].buffer.length === 0.

Proof by induction:
Top streams always directly push their data. This means the first stream never pushes its data into the buffer and will throw it away when it completes, so the property is trivially true.

Assume true for the first n streams. If the n+1th stream completes before the nth one, then the property is trivially true. Otherwise, when the nth stream ends, and it immediately flushes the buffer for the n+1th stream. The n+1th stream cannot emit an item during the call to push (which would cause an out-of-order emit downstream). This is because:

  1. If the n+1th stream has not been started, it can only start when the _(function (push, next) { callback is executed again, and that cannot happen in a call to push (this is an assumption that the Highland engine must guarantee).
  2. If the n+1th stream has started, then you cannot cause it (an upstream stream) to emit an item by pushing downstream (again, another assumption that the Highland engine must guarantee).

Thus, the next time that the n+1th stream runs, it will do so with an empty buffer. The _(function (push, next) { may run before that, but it won't do anything to running because the top buffer is empty.

@vqvu
Copy link
Collaborator

vqvu commented May 19, 2015

As it turns out, doesn't satisfy the invariant. flushBuffer doesn't clear running[0].buffer when it completes. The reason why you don't notice this is because it synergizes with a subtle liveness bug. Here's the test case for that bug.

var _ = require('./lib/index.js'),
    sinon = require('sinon');

var clock = sinon.useFakeTimers();

function delay(push, ms, x) {
    setTimeout(function () {
        push(null, x);
    }, ms);
}

var s1 = _(function (push) {
    delay(push, 1, 1);
    delay(push, 2, 2);
    delay(push, 3, _.nil);
});

var s2 = _(function (push) {
    delay(push, 1, 10);
    delay(push, 2, 20);
    delay(push, 3, 30);
    delay(push, 4, 40);
    delay(push, 5, _.nil);
});

var s3 = _(function (push, next) {
    console.log("S3 generator");
    push(null, 100);
    push(null, _.nil);
});

var ss = _([s1, s2, s3]).parallel(2).each(_.log);

clock.tick(10);

You expect s3 to be consumed as soon as s1 is done, so the output should be

1
2
10
20
S3 generator
30
40
100

What actually happens is

1
2
10
20
30
40
S3 generator
100

The reason is that flushBuffer only calls next when running[0].buffer contains a nil. This means that if one stream ends and the next is still going, next is never called. Since you must call next to trigger another consume, it essentially means that there won't be any replacement for ended streams until

  1. all running streams are ended or
  2. a running stream ends after the one following it ends.

The solution is to call next always.

diff --git a/lib/index.js b/lib/index.js
index a50247e..8b728db 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -2933,6 +2933,7 @@ Stream.prototype.parallel = function (n) {
         if (running.length && running[0].buffer.length) {
             // send buffered data
             flushBuffer();
+            next();
             // still waiting for more data before we can shift
             // the running array...
         }
@@ -2961,9 +2962,7 @@ Stream.prototype.parallel = function (n) {
                                 if (running.length && running[0].buffer.length) {
                                     flushBuffer();
                                 }
-                                else {
-                                    next();
-                                }
+                                next();

                             }
                             else {
@@ -2996,7 +2995,6 @@ Stream.prototype.parallel = function (n) {
                 if (buf[i][1] === nil) {
                     // this stream has ended
                     running.shift();
-                    return next();
                 }
                 else {
                     // send the buffered output

If you apply this patch, you'll then notice the true consequence of the code not following the invariant I described. The test case above will continually repeat

10
20

The real fix, of course, is to also clear the buffer in flushBuffer.

diff --git a/lib/index.js b/lib/index.js
index a50247e..6d8f10b 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -2933,6 +2933,7 @@ Stream.prototype.parallel = function (n) {
         if (running.length && running[0].buffer.length) {
             // send buffered data
             flushBuffer();
+            next();
             // still waiting for more data before we can shift
             // the running array...
         }
@@ -2961,9 +2962,7 @@ Stream.prototype.parallel = function (n) {
                                 if (running.length && running[0].buffer.length) {
                                     flushBuffer();
                                 }
-                                else {
-                                    next();
-                                }
+                                next();

                             }
                             else {
@@ -2996,13 +2995,14 @@ Stream.prototype.parallel = function (n) {
                 if (buf[i][1] === nil) {
                     // this stream has ended
                     running.shift();
-                    return next();
+                    return;
                 }
                 else {
                     // send the buffered output
                     push.apply(null, buf[i]);
                 }
             }
+            buf.length = 0;
         }
         // else wait for more data to arrive from running streams
     });

@vqvu
Copy link
Collaborator

vqvu commented May 19, 2015

By the way, thanks for asking for an explanation. I only noticed these bugs when writing the proof for why parallel works.

@svozza
Copy link
Collaborator Author

svozza commented May 19, 2015

Thanks @vqvu for such a thorough explanation. I really am in awe at your knowledge and understanding of the codebase. I knew something wasn't right but I don't think I would ever have figured this out on my own. I've made your suggested changes and also added a test case for the liveness bug too.

@vqvu
Copy link
Collaborator

vqvu commented May 19, 2015

I don't think your test actually tests the liveness bug. Does it fail if you don't apply my patch? The bug is about when s3 is consumed, not about the order of the resulting stream (which was always correct).

You probably want to set a flag in the generator for s3, then advance time just enough so that s1 completes, then check the flag. The old code should not have the flag set.

@svozza svozza force-pushed the issue-234-buffer-not-flushed-by-parallel branch from 7de1562 to 51d6939 Compare May 19, 2015 22:15
@svozza
Copy link
Collaborator Author

svozza commented May 19, 2015

Ah yes, I get what you mean.

});
};

exports['parallel - behaviour of parallel with fork() - invariant violation - issue #234'] = function (test) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't "behavior of parallel with fork()". It's more "parallel consumption liveness". Also, reference this issue (302) instead of 234.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

fix liveness bug and add tests

fix liveness test

rename liveness test

remove doto logging function
@svozza svozza force-pushed the issue-234-buffer-not-flushed-by-parallel branch from a9482c8 to 13c4d87 Compare May 20, 2015 09:26
@svozza
Copy link
Collaborator Author

svozza commented May 20, 2015

What is going on with Travis? These builds are taking hours.

@svozza
Copy link
Collaborator Author

svozza commented May 20, 2015

Looks like there's a problem but it's been resolved:

https://www.traviscistatus.com/incidents/gtd208xrt4r2

@vqvu
Copy link
Collaborator

vqvu commented May 21, 2015

Cool.

vqvu added a commit that referenced this pull request May 21, 2015
…arallel

Fix buffering Issue With parallel(). Fixes #234.
@vqvu vqvu merged commit 4dc90d9 into caolan:master May 21, 2015
@vqvu vqvu added this to the v2.6.0 milestone Jun 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Behaviour Of _.parallel With fork()
2 participants