diff --git a/lib/priority-queue.js b/lib/priority-queue.js index c4790a69d..72a410281 100644 --- a/lib/priority-queue.js +++ b/lib/priority-queue.js @@ -26,29 +26,22 @@ var PriorityQueue = module.exports = function(name, redisPort, redisHost, redisO this.queues[PriorityQueue.priorities[key]] = queue; } - Promise.map(this.queues, function(queue) { - return new Promise(function(resolve, reject) { - queue.once('ready', resolve); - }); - }).then(_this.emit.bind(_this, 'ready')) - - this.queues.forEach(function(queue) { - queue.on('error', _this.emit.bind(_this, 'error')); - }) - - this.queues.forEach(function(queue) { - queue.on('progress', _this.emit.bind(_this, 'progress')); - }) - - this.queues.forEach(function(queue) { - queue.on('completed', _this.emit.bind(_this, 'completed')); + var groupEvents = ['ready', 'paused', 'resumed'] + groupEvents.forEach(function(event) { + Promise.map(_this.queues, function(queue) { + return new Promise(function(resolve, reject) { + queue.once(event, resolve); + }); + }).then(_this.emit.bind(_this, event)) }) - this.queues.forEach(function(queue) { - queue.on('failed', _this.emit.bind(_this, 'failed')); + var singleEvents = ['error', 'active', 'stalled', 'progress', 'completed', 'failed', 'cleaned'] + singleEvents.forEach(function(event) { + _this.queues.forEach(function(queue) { + queue.on(event, _this.emit.bind(_this, event)) + }) }) - this.strategy = Strategy.exponential; } @@ -164,21 +157,21 @@ PriorityQueue.prototype.empty = function() { }); } -PriorityQueue.prototype.pause = function() { +PriorityQueue.prototype.pause = function(localOnly) { var _this = this; - + _this.paused = Promise.map(this.queues, function(queue) { - return queue.pause(); + return queue.pause(localOnly || false); }).then(_this.emit.bind(_this, 'paused')); return _this.paused; } -PriorityQueue.prototype.resume = function() { +PriorityQueue.prototype.resume = function(localOnly) { var _this = this; _this.paused = false; return Promise.map(this.queues, function(queue) { - return queue.resume(); + return queue.resume(localOnly || false); }).then(_this.emit.bind(_this, 'resumed')).then(function() { if (_this.handler) { _this.run();