Skip to content

Commit

Permalink
refactor(cursor): remove async.queue() from eachAsync() re: #8073 #5502
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarpov15 committed Sep 28, 2019
1 parent e60db1b commit 3647292
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions lib/helpers/cursor/eachAsync.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* Module dependencies.
*/

const async = require('async');
const utils = require('../../utils');

/**
Expand Down Expand Up @@ -37,18 +36,9 @@ module.exports = function eachAsync(next, fn, options, callback) {

const iterate = function(callback) {
let drained = false;
const nextQueue = async.queue(function(task, cb) {
if (drained) {
return cb();
}
next(function(err, doc) {
if (err) return cb(err);
cb(null, doc);
});
}, 1);

const getAndRun = function(cb) {
nextQueue.push({}, function(err, doc) {
_next(function(err, doc) {
if (err) return cb(err);
if (drained) {
return;
Expand Down Expand Up @@ -81,7 +71,37 @@ module.exports = function eachAsync(next, fn, options, callback) {
}
};

const _nextQueue = [];
return utils.promiseOrCallback(callback, cb => {
iterate(cb);
});

// `next()` can only execute one at a time, so make sure we always execute
// `next()` in series, while still allowing multiple `fn()` instances to run
// in parallel.
function _next(cb) {
if (_nextQueue.length === 0) {
return next(_step(cb));
}
_nextQueue.push(cb);
}

function _step(cb) {
return function(err, doc) {
if (err != null) {
return cb(err);
}
cb(null, doc);

if (doc == null) {
return;
}

setTimeout(() => {
if (_nextQueue.length > 0) {
next(_step(_nextQueue.unshift()));
}
}, 0);
};
}
};

0 comments on commit 3647292

Please sign in to comment.