Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

highland doesn't handle backpressure with a mongo cursor stream #388

Closed
Sebmaster opened this issue Nov 12, 2015 · 5 comments
Closed

highland doesn't handle backpressure with a mongo cursor stream #388

Sebmaster opened this issue Nov 12, 2015 · 5 comments

Comments

@Sebmaster
Copy link

Now I'm not sure if this is a mongodb bug or a highland bug, but I got a repro:

'use strict';
const mongodb = require('mongodb');
const _ = require('highland');
const connString = "mongodb://localhost:27017/";

function getStream(db) {
  return _(db.db("test")
    .collection("coll")
    .find({})
    .sort({ _id: 1 }));
}

let count = 0;
let countAfter = 0;

mongodb.MongoClient.connect(connString)
  .then((db) => {
    const stream = getStream(db)
      .tap((oplog) => {
        if (++count % 1 === 0) {
          console.log("before", count);
        }
      })
      .flatMap((val) => {
        return _([val]);
      })
      .tap(() => {
        if (++countAfter % 1 === 0) {
          console.log("after", countAfter);
        }
      });

    const forked = stream
      .batchWithTimeOrCount(500, 2)
      .flatMap((val) => {
        return _((push) => {
          setTimeout(() => (push(null, 10), push(null, _.nil)), 1000);
        });
      });

    var total = 0;
    forked
      .tap(() => ++total)
      // done not actually called
      .done(()=>console.log('before:', count, 'after: ', countAfter, 'total: ', total));
  })
  .then(null, console.error);

Preconditions:
My collection is filled with 93 elements.

  • first flatMap (it should just be an identity flatMap) is essential
  • batchWithTimeOrCount is essential
  • works with a self-implemented stream I tried, but not with a mongo stream (which makes me believe the mongo cursor could just be implemented wrong...)

Output:

sebastian@sebastian-ubuntu:~/trello/mongo-stream$ node test
before 1
after 1
before 2
after 2
before 3
before 4
before 5
before 6
before 7
before 8
before 9
before 10
before 11
[...]
before 91
before 92
before 93
^C

Observations:

  • done will never be called
  • the whole output happens while the consumer is still processing the first 2 elements

I'll continue to try to get to the bottom of why only the mongo cursor causes this, but I thought you might have an idea why this would happen...

@Sebmaster
Copy link
Author

Actually, just noticed this seems to work with 3.0 - not sure if it's possible to backport the fix with the whole engine rewrite thing... Is 3.0 already "ready for use"?

@vqvu
Copy link
Collaborator

vqvu commented Nov 13, 2015

Is a fork required somewhere in your example or does it fail exactly as is? I don't see a consumer anywhere. Also, are you sure that done is never called, or does it just take 46.5 to 93 seconds for it to be called?

Regardless, this is definitely a back pressure bug in highland. Even even the mongo cursor misbehaves and ignores all backpressure, it should still be held back at the source (in the stream returned by getStream).

I'll try to set up a mongodb to see if I can repro this.

3.0 is ready if you're OK with depending on a github branch/commit rather than npm. We just need to add some docs and minor tests. You can consider it to be in a beta/RC stage. See #179 for the list of backwards incompatible changes.

@Sebmaster
Copy link
Author

Is a fork required somewhere in your example or does it fail exactly as is?

It's not, fails as-is. I originally had one in there since that's how we use it in production, but it's not required for the bug to occur.

I don't see a consumer anywhere.

I mean the second flatMap, it does the actual processing in our scenario.

Also, are you sure that done is never called, or does it just take 46.5 to 93 seconds for it to be called?

Yep, runs indefinitely.

Even even the mongo cursor misbehaves and ignores all backpressure, it should still be held back at the source (in the stream returned by getStream).

I think the mongo cursor should be able to handle back-pressure - it implements a streams v3 interface with _read, however I tried to replicate the behaviour of it with my own Readable stream class with respect to backpressure, as well as without and couldn't repro the bug this way, so I believe something about mongo's implementation is very weird.

3.0 is ready if you're OK with depending on a github branch/commit rather than npm.

Sounds good enough for our internal use then. We don't see the same bug anymore, we're running now and we'll see if we hit another one.

@apaleslimghost
Copy link
Collaborator

(aside: @vqvu, we could actually release a 3.0.0-beta1 to npm to encourage more widespread testing. people just doing npm install highland would still get 2.x)

@vqvu
Copy link
Collaborator

vqvu commented Nov 13, 2015

Well, I can't say I had a good time doing it, but I think I figured out the root cause.

Basically, much of the code base assumes that various callback/handlers will not be re-entered. For example, the _next method in a consume stream assumes that it is not possible to re-enter itself when executing the consume callback (N.B., it's still ok for the execution of s1._next() to call s2._next() as long as s1 !== s2).

It's not an unreasonable assumption, since it's difficult make these things re-entrant-safe. Unfortunately, the 2.x engine is bad at guaranteeing this. There's essentially a race condition where where re-entrancy is possible under specific circumstances. When that happens, data starts being thrown away in the middle of the pipeline. That's why done was never called---the nil never got to the end.

The 3.0 engine was written with these issues in mind, so I like to think that it's less prone to these kinds of bugs.

Here's a minimal-ish test case that doesn't involve mongodb. I'll make up a PR this weekend.

Edit: Added comments on what's so special about mongodb that triggers this bug. This is the reason why you can't immediately replicate the behavior in a custom Readable stream.

var stream = _();
var i = 0;

function write() {
    var cont = false;
    while ((cont = stream.write(i++)) && i <= 10) {
    }
    if (cont) {
        i++;
        stream.end();
    }
}

// This stream mimics the behavior of things like database
// drivers.
stream.on('drain', function () {
    if (i === 0) {
        // The initial read is async.
        setTimeout(write, 0);
    } else if (i > 0 && i < 10) {
        // The driver loads data in batches, so subsequent drains
        // are sync to mimic pulling from a pre-loaded buffer.
        write();
    } else if (i === 10) {
        i++;
        setTimeout(() => stream.end(), 0);
    }
});

var done = false;
stream
    // flatMap to disassociate from the source, since sequence
    // returns a new stream not a consume stream.
    .flatMap(function (x) {
        return _([x]);
    })
    // batch(2) to get a transform that sometimes calls next
    // without calling push beforehand.
    .batch(2)
    // Another flatMap to get an async transform.
    .flatMap(function (x) {
        return _(function (push) {
            setTimeout(function () {
                push(null, x);
                push(null, _.nil);
            }, 100);
        });
    })
    .done(function () {
        console.log('done');
    });

@quarterto Good idea. I'll do that this weekend too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants