Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does mongoose support Mongo Aggregate Cursor (mongo 2.6) #2187

Closed
mattbryson opened this issue Jul 11, 2014 · 12 comments
Closed

Does mongoose support Mongo Aggregate Cursor (mongo 2.6) #2187

mattbryson opened this issue Jul 11, 2014 · 12 comments

Comments

@mattbryson
Copy link

Hi,

We are hitting the 16MB doc limit when using the aggregation pipeline, so have upgraded mongo to use 2.6 which supports cursors.

I can run the aggregation on the mongo terminal, and it iterates the cursor, but I cant see how to do this via the mongoose aggregation api.

Any help would be much appreciated..

m

@vkarpov15 vkarpov15 added this to the 3.8.14 milestone Jul 11, 2014
@vkarpov15
Copy link
Collaborator

Right now, unfortunately, the MongoDB node driver supports this via an option, but mongoose doesn't :(

For now, you can access a raw mongodb driver collection object using Model.collection property, for example MyModel.collection.aggregate([...], { cursor: {batchSize: 1000} }). This fix should be in 3.8.14.

@vkarpov15
Copy link
Collaborator

@mattbryson 3.8.14 will have a .cursor() helper on the aggregate option that will enable you to easily tell aggregation to return a cursor.

@mattbryson
Copy link
Author

Hi, having trouble getting this working. (mongoose 3.8.12, mongo 2.6.3)

Firstly, how do you set it if you call aggregate on the model with a callback?

MyModel.aggregate([...], { cursor: {batchSize: 1000} }, callback)

The above errors saying its not valid aggregation stage.

And when trying via the aggregation object that is returned

MyModel.aggregate([...]).cursor( {batchSize: 1000}).exec(callback)

The callback is never executed. If I omit the cursor() option, the callback is executed.

@vkarpov15 vkarpov15 modified the milestones: 3.8.14, 3.8.15 Aug 4, 2014
@vkarpov15
Copy link
Collaborator

Hmm weird. I'll investigate this

@vkarpov15 vkarpov15 reopened this Aug 4, 2014
@mattbryson
Copy link
Author

Did you get any time to look into this?

@vkarpov15
Copy link
Collaborator

Not quite yet. I should be able to knock this out and release 3.8.15 by the end of the week, but time is scarce :(

@mattbryson
Copy link
Author

:) yeah, tell me about it! If I can help test in anyway let us know.

@wescleveland
Copy link

I'm also available to test, hitting the same issue here as well.

Strangely, if I do the following:
cursor = model
.collection
.aggregate pipeline, { cursor: {batchSize: 1000} }

I see in the mongod log for the aggregate query that the cursor batchSize is 1, which seems to kill performance. Any idea what I can do in the interim to force a higher batchSize until I can just do model.aggregate.cursor?

edit: I am currently on v3.8.14, let me know if I should try another branch, tag, etc for testing.

@vkarpov15
Copy link
Collaborator

Actually, @mattbryson , are you still using mongoose 3.8.12? If you are, the .cursor() option doesn't exist there, it was added in 3.8.14.

@vkarpov15 vkarpov15 modified the milestones: 3.8.15, 3.8.16 Aug 17, 2014
@vkarpov15 vkarpov15 modified the milestones: 3.8.17, 3.8.16 Sep 8, 2014
@vkarpov15 vkarpov15 removed this from the 3.8.17 milestone Sep 29, 2014
@mattbryson
Copy link
Author

Hi, was there any progress on this?

We are using 3.8.14 - was this added in a later version?

cheers

m

@AzadMemon
Copy link

Has there been any progress on this? I'm using version. 3.8.26 and I have the same problem.

This is my implementation:

var aggreagateCursor = Conversation
            .aggregate(query)
            .cursor({batchSize: 1000})
            .exec();

I think it has something to do with the the code in lib/aggregate.js, in the exec function. The exec function calls 'this._model.collection.aggregate' (passing in the promise's resolve function), and then proceeds to return the promise. But the collection.aggregate function doesn't use that promise in anyway IF a cursor is requested. It instead returns an aggregate cursor.

The result is you end up with an unresolved promise, and obviously no cursor.

lib/model.js

Model.aggregate = function aggregate () {
  var args = [].slice.call(arguments)
    , aggregate
    , callback;

  if ('function' === typeof args[args.length - 1]) {
    callback = args.pop();
  }

  if (1 === args.length && util.isArray(args[0])) {
    aggregate = new Aggregate(args[0]);
  } else {
    aggregate = new Aggregate(args);
  }

  aggregate.bind(this);

  if ('undefined' === typeof callback) {
    return aggregate;
  }

  return aggregate.exec(callback);
}

lib/aggregate.js

Aggregate.prototype.exec = function (callback) {
  var promise = new Promise();

  if (callback) {
    promise.addBack(callback);
  }

  if (!this._pipeline.length) {
    promise.error(new Error("Aggregate has empty pipeline"));
    return promise;
  }

  if (!this._model) {
    promise.error(new Error("Aggregate not bound to any Model"));
    return promise;
  }

  this._model
    .collection
    .aggregate(this._pipeline, this.options || {}, promise.resolve.bind(promise));

  return promise;
}

mongodb driver: lib/collection/aggregation.js

var aggregate = function(pipeline, options, callback) {
  var args = Array.prototype.slice.call(arguments, 0);
  callback = args.pop();
  var self = this;

  // If we have any of the supported options in the options object
  var opts = args[args.length - 1] || {};
  options = opts.readPreference 
    || opts.explain 
    || opts.cursor 
    || opts.out
    || opts.maxTimeMS
    || opts.allowDiskUse ? args.pop() : {}

  // If the callback is the option (as for cursor override it)
  if(typeof callback == 'object' && callback != null) options = callback;

  // Convert operations to an array
  if(!Array.isArray(args[0])) {
    pipeline = [];
    // Push all the operations to the pipeline
    for(var i = 0; i < args.length; i++) pipeline.push(args[i]);
  }

  // Is the user requesting a cursor
  if(options.cursor != null && options.out == null) {
    if(typeof options.cursor != 'object') throw utils.toError('cursor options must be an object');
    // Set the aggregation cursor options
    var agg_cursor_options = options.cursor;
    agg_cursor_options.pipe = pipeline;
    agg_cursor_options.allowDiskUse = options.allowDiskUse == null ? false : options.allowDiskUse;
    // Set the maxTimeMS if passed in
    if(typeof options.maxTimeMS == 'number') agg_cursor_options.maxTimeMS = options.maxTimeMS;
    // Return the aggregation cursor
    return new AggregationCursor(this, this.serverCapabilities, agg_cursor_options);
  }

.......... etc ...........

As you can see the AggregationCursor isn't even concerned with the promise anymore, so it never gets resolved. I may be wrong, but this is my best guess. Let me know if my understanding is based on a wrong assumption.

@vkarpov15 vkarpov15 added this to the 4.0.3 milestone Apr 22, 2015
@vkarpov15
Copy link
Collaborator

See #2306, there's rudimentary support for aggregation cursor in 4.0.2.

@vkarpov15 vkarpov15 removed this from the 4.0.3 milestone Apr 27, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants