diff --git a/.gitignore b/.gitignore index 66685ed5..36148b20 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ node_modules *.sock testing lib/http/public/stylesheets/main.css + +.project +.settings/ \ No newline at end of file diff --git a/Readme.md b/Readme.md index c71a49a0..c2512dc9 100644 --- a/Readme.md +++ b/Readme.md @@ -9,6 +9,7 @@ ## Features - delayed jobs + - auto-restart of stuck/crashed jobs - job event and progress pubsub - rich integrated UI - infinite scrolling @@ -90,6 +91,23 @@ job.log('$%d sent to %s', amount, user.name); job.progress(frames, totalFrames); ``` +### Job Heartbeat + + Jobs, especially those that are long running, can get stuck or terminated without completing. The heartbeat is used to ensure that jobs get restarted if this happens. To set a heartbeat interval for a job when the job is created, simply invoke `job.heartbeat(ms)`: + +```js +job.heartbeat(60000).restarts(2); // heart must beat at least every 60s, and will only be restarted twice (default is one restart) +``` + + During job execution, invoke `job.heartbeat()` at least as often as the heart beat interval (shorter is fine, longer will trigger a restart). + + When using heartbeats, the watchdog that checks every job's heart beat must be started. The watchdog will check for dead jobs every `Queue#watchdoc(ms)`, defaulting to a check every 5 seconds. + +```js +jobs.watchdog(); +``` +When a dead job is restarted, the `restarted` event is fired and it is moved back to the `inactive` state. If too many restarts have occurred, the job is moved to the `failed` state and the `failed` event is triggered. + ### Job Events Job-specific events are fired on the `Job` instances via Redis pubsub. The following events are currently supported: @@ -97,6 +115,7 @@ job.progress(frames, totalFrames); - `failed` the job has failed - `complete` the job has completed - `promotion` the job (when delayed) is now queued + - `restarted` the job has been restarted - `progress` the job's progress ranging from 0-100 For example this may look something like the following: @@ -155,6 +174,24 @@ When using delayed jobs, we must also check the delayed jobs with a timer, promo jobs.promote(); ``` +### Job serialization + + In some cases, two related jobs can't be processed at the same time (regardless of the worker). Jobs that are being held back because another job is executing are in the `staged` state. To handle this, jobs can be put in named groups, where members of the group are staged - only one executing at a time across all workers (even when workers are distributed or handling different queues). This is controlled by the `.serialize(name)` method. + +```js +var rjob1 = jobs.create('sellstocks', { + stocks: [ 'ORCL', 'MSFT' ] +}).serialize('js@steeleinc.com').save(); // ensure this user can't buy and sell stocks at the same time + +var rjob2 = jobs.create('buystocks', { + stocks: [ 'FB', 'LNKD' ] +}).serialize('js@steeleinc.com').save(); +``` + +### Job states + + Jobs can combine together use of `delay`, `after`, and `serialize`. If they are combined, first the delay (if any) is handled, and the job stays in the `delayed` state. Once the delay is finished the job is then put in the `staged` state along with the other jobs that are in the same serialization group. Once other jobs from the same serialization group have finished, the job moves to the `inactive` state to wait for a worker. Once a worker takes on the job it is moved to the `active` state. + ## Processing Jobs Processing jobs is simple with Kue. First create a `Queue` instance much like we do for creating jobs, providing us access to redis etc, then invoke `jobs.process()` with the associated type. @@ -177,6 +214,9 @@ jobs.process('email', function(job, done){ ```js jobs.process('email', 20, function(job, done){ // ... +}, +function(level, msg, data){ + // Print out worker error message here }); ``` diff --git a/examples/QoS.js b/examples/QoS.js new file mode 100644 index 00000000..a5f9c09e --- /dev/null +++ b/examples/QoS.js @@ -0,0 +1,106 @@ + +var kue = require('../') + , express = require('express'); + +// create our job queue + +var jobs = kue.createQueue(); +jobs.watchdog(); + +// start redis with $ redis-server + +// create some jobs at random, +// usually you would create these +// in your http processes upon +// user input etc. + +var count = 1000; + +function create() { + if(count-- <= 0 ) return; + + var name = ['tobi', 'loki', 'jane', 'manny'][Math.random() * 4 | 0]; + console.log('- creating job for %s', name); + + var stage1 = null, stage2 = null; + + stage1 = jobs.create('video conversion', { + title: 'converting ' + name + '\'s to avi' + , user: 1 + , frames: 200 + }).heartbeat(10000).save(function(err){ + stage2 = jobs.create('video conversion', { + title: 'converting ' + name + '\'s to mpeg' + , user: 1 + , frames: 200 + }).heartbeat(10000).save(function(err){ + jobs.create('video analysis', { + title: 'analyzing ' + name + '\'s avi' + , user: 1 + , frames: 200 + }).heartbeat(10000).serialize('analysis').save(); + + jobs.create('video analysis', { + title: 'analyzing ' + name + '\'s avi and mpeg' + , user: 1 + , frames: 200 + }).heartbeat(10000).serialize('analysis').save(); + }); + }); + + setTimeout(create, Math.random() * 3000 | 0); +} + +if(process.argv.length > 2) { + count = Number(process.argv[2]); + create(); +} +else + console.log('usage: node QoS.js []'); + +// process video analysis jobs, 6 at a time. + +jobs.process('video analysis', 6, function(job, done){ + var frames = job.data.frames; + console.log("job process %d", job.id); + function next(i) { + // pretend we are doing some work + convertFrame(i, function(err){ + if (err) return done(err); + // report progress, i/frames complete + job.progress(i, frames); + if (i == frames) done(); + else next(i + 1); + }); + } + next(0); +}); + +//process video conversion jobs, 4 at a time. + +jobs.process('video conversion', 4, function(job, done){ + var frames = job.data.frames; + console.log("job process %d", job.id); + function next(i) { + // pretend we are doing some work + convertFrame(i, function(err){ + if (err) return done(err); + // report progress, i/frames complete + job.progress(i, frames); + if (i == frames) done(); + else next(i + 1); + }); + } + next(0); +}); + +function convertFrame(i, fn) { + setTimeout(fn, Math.random() * 100); +} + +// start the UI +var app = express.createServer(); +app.use(express.basicAuth('foo', 'bar')); +app.use(kue.app); +app.listen(3000); +console.log('UI started on port 3000'); \ No newline at end of file diff --git a/examples/video.js b/examples/video.js index 0fdb5586..8141cace 100644 --- a/examples/video.js +++ b/examples/video.js @@ -37,7 +37,7 @@ jobs.process('video conversion', 3, function(job, done){ if (err) return done(err); // report progress, i/frames complete job.progress(i, frames); - if (i == frames) done() + if (i == frames) done(); else next(i + 1); }); } diff --git a/lib/http/index.js b/lib/http/index.js index d4d38f59..5271f232 100644 --- a/lib/http/index.js +++ b/lib/http/index.js @@ -65,8 +65,9 @@ app.del('/job/:id', provides('json'), json.remove); // routes app.get('/', routes.jobs('active')); -app.get('/active', routes.jobs('active')); +app.get('/delayed', routes.jobs('delayed')); +app.get('/staged', routes.jobs('staged')); app.get('/inactive', routes.jobs('inactive')); +app.get('/active', routes.jobs('active')); app.get('/failed', routes.jobs('failed')); app.get('/complete', routes.jobs('complete')); -app.get('/delayed', routes.jobs('delayed')); diff --git a/lib/http/middleware/provides.js b/lib/http/middleware/provides.js index 412c91db..f499188b 100644 --- a/lib/http/middleware/provides.js +++ b/lib/http/middleware/provides.js @@ -11,5 +11,5 @@ module.exports = function(type){ return function(req, res, next){ if (req.accepts(type)) return next(); next('route'); - } + }; }; \ No newline at end of file diff --git a/lib/http/public/javascripts/caustic.js b/lib/http/public/javascripts/caustic.js index f2709677..f4f357a3 100644 --- a/lib/http/public/javascripts/caustic.js +++ b/lib/http/public/javascripts/caustic.js @@ -230,7 +230,7 @@ View.prototype.visitDIV = function(el, name){ var self = this; this[name] = function(val){ if (0 == arguments.length) return el; - el.empty().append(val.el || val); + el.empty().append((val && val.el) ? val.el : val); return this; }; }; diff --git a/lib/http/public/javascripts/job.js b/lib/http/public/javascripts/job.js index 18323fe4..b991d997 100644 --- a/lib/http/public/javascripts/job.js +++ b/lib/http/public/javascripts/job.js @@ -192,6 +192,26 @@ Job.prototype.renderUpdate = function(){ view.attempts().parent().remove(); } + // restarts + if (this.restarts.made) { + view.restarts(this.restarts.made + '/' + this.restarts.max); + } else { + view.restarts().parent().remove(); + } + + // precursors + if (this.precursors) { + view.precursors(this.precursors); + } else { + view.precursors().parent().remove(); + } + + if (this.after) { + view.after(this.after); + } else { + view.after().parent().remove(); + } + // title view.title(this.data.title ? this.data.title diff --git a/lib/http/public/javascripts/main.js b/lib/http/public/javascripts/main.js index 2669c068..67113328 100644 --- a/lib/http/public/javascripts/main.js +++ b/lib/http/public/javascripts/main.js @@ -45,6 +45,8 @@ var sort = 'asc'; var loading; +var pollInterval = 1000; + /** * Initialize UI. */ @@ -57,13 +59,14 @@ function init(state) { loading.ctx = ctx; loading.size(canvas.width); - pollStats(1000); + pollStats(pollInterval); show(state)(); + o('li.delayed a').click(show('delayed')); + o('li.staged a').click(show('staged')); o('li.inactive a').click(show('inactive')); o('li.complete a').click(show('complete')); o('li.active a').click(show('active')); o('li.failed a').click(show('failed')); - o('li.delayed a').click(show('delayed')); o('#filter').change(function(){ filter = $(this).val(); @@ -141,7 +144,7 @@ function show(state) { o('#jobs .job').remove(); o('#menu li a').removeClass('active'); o('#menu li.' + state + ' a').addClass('active'); - pollForJobs(state, 2000); + pollForJobs(state, pollInterval); return false; } } @@ -159,7 +162,7 @@ function pollForJobs(state, ms) { infiniteScroll(); pollForJobs.timer = setTimeout(function(){ pollForJobs(state, ms); - }, 1000); + }, pollInterval); }); }; @@ -236,11 +239,12 @@ function refreshJobs(state, fn) { function pollStats(ms) { request('./stats', function(data){ + o('li.delayed .count').text(data.delayedCount); + o('li.staged .count').text(data.stagedCount); o('li.inactive .count').text(data.inactiveCount); o('li.active .count').text(data.activeCount); o('li.complete .count').text(data.completeCount); o('li.failed .count').text(data.failedCount); - o('li.delayed .count').text(data.delayedCount); setTimeout(function(){ pollStats(ms); }, ms); diff --git a/lib/http/public/javascripts/utils.js b/lib/http/public/javascripts/utils.js index c79c24e3..fc2677d1 100644 --- a/lib/http/public/javascripts/utils.js +++ b/lib/http/public/javascripts/utils.js @@ -36,11 +36,12 @@ function relative(ms) { */ var states = { - active: 'active' + delayed: 'delayed' , inactive: 'inactive' + , staged: 'staged' + , active: 'active' , failed: 'failed' , complete: 'complete' - , delayed: 'delayed' }; /** diff --git a/lib/http/routes/json.js b/lib/http/routes/json.js index 3f8e8768..f59a53ab 100644 --- a/lib/http/routes/json.js +++ b/lib/http/routes/json.js @@ -28,21 +28,24 @@ function getSearch() { /** * Get statistics including: * + * - delayed count + * - waiting count + * - staged count * - inactive count * - active count * - complete count * - failed count - * - delayed count * */ exports.stats = function(req, res){ get(queue) + ('delayedCount') + ('stagedCount') ('inactiveCount') ('completeCount') ('activeCount') ('failedCount') - ('delayedCount') ('workTime') (function(err, obj){ if (err) return res.send({ error: err.message }); @@ -71,6 +74,12 @@ exports.jobRange = function(req, res){ , to = parseInt(req.params.to, 10) , order = req.params.order; + if(order == 'desc'){ + var swap = from; + from = -1-to; + to = -1-swap; + } + Job.range(from, to, order, function(err, jobs){ if (err) return res.send({ error: err.message }); res.send(jobs); @@ -87,6 +96,12 @@ exports.jobStateRange = function(req, res){ , to = parseInt(req.params.to, 10) , order = req.params.order; + if(order == 'desc'){ + var swap = from; + from = -1-to; + to = -1-swap; + } + Job.rangeByState(state, from, to, order, function(err, jobs){ if (err) return res.send({ error: err.message }); res.send(jobs); @@ -104,6 +119,12 @@ exports.jobTypeRange = function(req, res){ , to = parseInt(req.params.to, 10) , order = req.params.order; + if(order == 'desc'){ + var swap = from; + from = -1-to; + to = -1-swap; + } + Job.rangeByType(type, state, from, to, order, function(err, jobs){ if (err) return res.send({ error: err.message }); res.send(jobs); @@ -163,8 +184,7 @@ exports.updateState = function(req, res){ Job.get(id, function(err, job){ if (err) return res.send({ error: err.message }); - job.state(state); - job.save(function(err){ + job.state(state, function(err) { if (err) return res.send({ error: err.message }); res.send({ message: 'updated state' }); }); diff --git a/lib/http/views/_job.jade b/lib/http/views/_job.jade index d7ec8a8c..38f6f79e 100644 --- a/lib/http/views/_job.jade +++ b/lib/http/views/_job.jade @@ -27,6 +27,15 @@ tr td Attempts: td.attempts + tr + td Restarts: + td.restarts + tr + td Precursors: + td.precursors + tr + td Remaining: + td.after tr.time td Duration: td.duration diff --git a/lib/http/views/_menu.jade b/lib/http/views/_menu.jade index 66611261..1fcf9643 100644 --- a/lib/http/views/_menu.jade +++ b/lib/http/views/_menu.jade @@ -1,5 +1,13 @@ ul#menu + li.delayed + a(href='./delayed') + .count 0 + | Delayed + li.staged + a(href='./staged') + .count 0 + | Staged li.inactive a(href='./inactive') .count 0 @@ -16,7 +24,3 @@ ul#menu a(href='./complete') .count 0 | Complete - li.delayed - a(href='./delayed') - .count 0 - | Delayed diff --git a/lib/kue.js b/lib/kue.js index 9527d0c0..5233e4de 100644 --- a/lib/kue.js +++ b/lib/kue.js @@ -96,6 +96,14 @@ Queue.prototype.createJob = function(type, data){ return new Job(type, data); }; +Queue.prototype.eventing = function(enabled) { + events.eventing = enabled; +}; + +Queue.prototype.indexing = function(enabled) { + Job.indexing = enabled; +}; + /** * Proxy to auto-subscribe to events. * @@ -145,7 +153,7 @@ Queue.prototype.promote = function(ms){ Job.get(id, function(err, job){ if (err) return; events.emit(id, 'promotion'); - job.inactive(); + job.staged(); }); } @@ -155,6 +163,62 @@ Queue.prototype.promote = function(ms){ }, ms); }; +/** + * Restart jobs that don't update themselves within their heartbeat interval, + * checking every `ms`, defaulting to 5 seconds. + * + * @params {Number} ms + * @api public + */ + +Queue.prototype.watchdog = function(ms){ + var client = this.client + , ms = ms || 5000 + , limit = 20; + + setInterval(function(){ + client.sort('q:jobs:active' + , 'by', 'q:job:*->update_by' + , 'get', '#' + , 'get', 'q:job:*->update_by' + , 'limit', 0, limit, function(err, jobs){ + if (err || !jobs.length) return; + + // iterate jobs with [id, update_by] + while (jobs.length) { + var job = jobs.slice(0, 2) + , id = parseInt(job[0], 10) + , update_by = parseInt(job[1], 10) + , restart = update_by < Date.now(); + + // if it hasn't had a heartbeat, restart the job + + if (restart) { + Job.get(id, function(err, job){ + if (err || !job) return; + + job.restart(function(err, remaining, restarts, max){ + if (err) return; + if(remaining) { + events.emit(id, 'restarted'); + job.inactive().log('job restarted - heartbeat missed'); + } else { + events.emit(job.id, 'failed'); + job.failed().error('too many restarts'); + } + }); + + events.emit(id, 'restart'); + job.inactive(); + }); + } + + jobs = jobs.slice(2); + } + }); + }, ms); +}; + /** * Get setting `name` and invoke `fn(err, res)`. * @@ -178,13 +242,17 @@ Queue.prototype.setting = function(name, fn){ * @api public */ -Queue.prototype.process = function(type, n, fn){ +Queue.prototype.process = function(type, n, fn, errfn){ var self = this; - if ('function' == typeof n) fn = n, n = 1; + if ('function' == typeof n) errfn = fn, fn = n, n = 1; + + if(!self.workers) self.workers = []; while (n--) { (function(worker){ + self.workers.push(worker); + worker.on('error', function(err){ self.emit('error', err); }); @@ -192,10 +260,64 @@ Queue.prototype.process = function(type, n, fn){ worker.on('job complete', function(job){ self.client.incrby('q:stats:work-time', job.duration); }); - })(new Worker(this, type).start(fn)); + })(new Worker(this, type).start(fn, errfn)); } }; +/** + * Stop all running workers. As workers need to finish whatever + * they are doing, this could take some time. The queue will emit + * the event 'stopped' once all workers are stopped + * + * @api public + */ + +Queue.prototype.stop = function() { + var self = this; + + if(!self.workers) return; + + // Tell each worker to stop + + for(var i = 0; i < self.workers.length; i++) + self.workers[i].stop(); +}; + +/** + * Called by a worker when it stops + * + * @param {Worker} worker + * @api private + */ + +Queue.prototype.stopped = function(worker) { + var self = this; + + for(var i = 0; i < self.workers.length; i++) + { + if(worker == self.workers[i]) + { + self.workers.splice(i, 1); + break; + } + } + + if(!self.workers.length) + self.emit('stopped', true); +}; + +/** + * Returns the number of running workers + * + * @returns {Number} number of running workers + * @api public + */ + +Queue.prototype.running = function() { + var self = this; + return self.workers.length; +} + /** * Get the job types present and callback `fn(err, types)`. * @@ -324,3 +446,11 @@ Queue.prototype.activeCount = function(fn){ Queue.prototype.delayedCount = function(fn){ return this.card('delayed', fn); }; + +/** + * Staged jobs. + */ + +Queue.prototype.stagedCount = function(fn){ + return this.card('staged', fn); +}; diff --git a/lib/queue/events.js b/lib/queue/events.js index b7ba486b..991c6d1e 100644 --- a/lib/queue/events.js +++ b/lib/queue/events.js @@ -33,6 +33,7 @@ exports.key = 'q:events'; */ exports.add = function(job){ + if(!exports.eventing) return; if (job.id) exports.jobs[job.id] = job; if (!exports.subscribed) exports.subscribe(); }; @@ -44,6 +45,7 @@ exports.add = function(job){ */ exports.subscribe = function(){ + if(!exports.eventing) return; if (exports.subscribed) return; var client = redis.pubsubClient(); client.subscribe(exports.key); @@ -86,6 +88,7 @@ exports.onMessage = function(channel, msg){ */ exports.emit = function(id, event) { + if(!exports.eventing) return; var client = redis.client() , msg = JSON.stringify({ id: id diff --git a/lib/queue/job.js b/lib/queue/job.js index 96c55d73..dc59e71e 100644 --- a/lib/queue/job.js +++ b/lib/queue/job.js @@ -13,7 +13,8 @@ var EventEmitter = require('events').EventEmitter , events = require('./events') , redis = require('../redis') , reds = require('reds') - , noop = function(){}; + , crypto = require('crypto') + , noop = function(err){ if(err) console.log('error: ignoring in noop', err); }; /** * Expose `Job`. @@ -25,6 +26,8 @@ exports = module.exports = Job; * Search instance. */ +exports.indexing = false; // Indexing is off by default (it leaks keys in kue currently) + var search; function getSearch() { if (search) return search; @@ -61,6 +64,30 @@ function map(jobs, ids){ return ret; } +/** + * Fetch the jobs for each of the given ids + * + * @param {Array} ids + * @param {String} order asc or desc + * @param {Function} fn + * @api private + */ + +function jobsForIds(ids, order, fn) { + var pending = ids.length + , jobs = {}; + if (!pending) return fn(null, ids); + ids.forEach(function(id){ + exports.get(id, function(err, job){ + if (err || !job) return fn(err); + jobs[job.id] = job; + --pending || fn(null, 'desc' == order + ? map(jobs, ids).reverse() + : map(jobs, ids)); + }); + }); +} + /** * Return a function that handles fetching * of jobs by the ids fetched. @@ -74,19 +101,8 @@ function map(jobs, ids){ function get(fn, order) { return function(err, ids){ if (err) return fn(err); - var pending = ids.length - , jobs = {}; - if (!pending) return fn(null, ids); - ids.forEach(function(id){ - exports.get(id, function(err, job){ - if (err) return fn(err); - jobs[job.id] = job; - --pending || fn(null, 'desc' == order - ? map(jobs, ids).reverse() - : map(jobs, ids)); - }); - }); - } + jobsForIds(ids, order, fn); + }; } /** @@ -105,7 +121,7 @@ exports.range = function(from, to, order, fn){ }; /** - * Get jobs of `state`, with the range `from`..`to` + * Get job IDs of `state`, with the range `from`..`to` * and invoke callback `fn(err, ids)`. * * @param {String} state @@ -116,10 +132,27 @@ exports.range = function(from, to, order, fn){ * @api public */ +exports.idRangeByState = function(state, from, to, order, fn){ + redis.client().zrange('q:jobs:' + state, from, to, fn); +}; + +/** + * Get jobs of `state`, with the range `from`..`to` + * and invoke callback `fn(err, jobs)`. + * + * @param {String} state + * @param {Number} from + * @param {Number} to + * @param {String} order + * @param {Function} fn + * @api public + */ + exports.rangeByState = function(state, from, to, order, fn){ - redis.client().zrange('q:jobs:' + state, from, to, get(fn, order)); + redis.client().zrange('q:jobs:' + state, from, to, get(fn, order)); }; + /** * Get jobs of `type` and `state`, with the range `from`..`to` * and invoke callback `fn(err, ids)`. @@ -156,17 +189,21 @@ exports.get = function(id, fn){ // TODO: really lame, change some methods so // we can just merge these job.type = hash.type; - job._delay = hash.delay; + job._delay = Number(hash.delay); + job._group = hash.group; + job._heartbeat = Number(hash.heartbeat); job.priority(Number(hash.priority)); job._progress = hash.progress; - job._attempts = hash.attempts; - job._max_attempts = hash.max_attempts; + job._attempts = Number(hash.attempts); + job._restarts = Number(hash.restarts); + job._max_attempts = Number(hash.max_attempts); + job._max_restarts = Number(hash.max_restarts); job._state = hash.state; job._error = hash.error; - job.created_at = hash.created_at; - job.updated_at = hash.updated_at; - job.failed_at = hash.failed_at; - job.duration = hash.duration; + job.created_at = Number(hash.created_at); + job.updated_at = Number(hash.updated_at); + job.failed_at = Number(hash.failed_at); + job.duration = Number(hash.duration); try { if (hash.data) job.data = JSON.parse(hash.data); fn(err, job); @@ -247,11 +284,17 @@ Job.prototype.toJSON = function(){ , failed_at: this.failed_at , duration: this.duration , delay: this._delay + , heartbeat: this._heartbeat , attempts: { made: this._attempts , remaining: this._max_attempts - this._attempts , max: this._max_attempts } + , restarts: { + made: this._restarts + , remaining: this._max_restarts - this._restarts + , max: this._max_restarts + } }; }; @@ -338,6 +381,43 @@ Job.prototype.delay = function(ms){ return this; }; +/** + * Set the job heartbeat to be required every `ms`. + * + * If called with no arguments, this will trigger the heartbeat of + * the job so that it won't get restarted. + * + * @param {Number} ms + * @return {Job|Number} + * @api public + */ + +Job.prototype.heartbeat = function(ms){ + if (0 == arguments.length) { + if(this._heartbeat) { + this.set('update_by', Date.now() + this._heartbeat); + } + return this._heartbeat; + } + this._heartbeat = ms; + return this; +}; + +/** + * Sets a job as part of a staged group + * + * @param {Job} precursor + * @return {Job|String} + * @api public + */ + +Job.prototype.serialize = function(group){ + if (0 == arguments.length) return this._group; + this._group = group; + if(this._state != 'delayed') this._state = 'staged'; + return this; +}; + /** * Set or get the priority `level`, which is one * of "low", "normal", "medium", and "high", or @@ -356,6 +436,31 @@ Job.prototype.priority = function(level){ return this; }; +/** + * Increment restarts, invoking callback `fn(remaining, restarts, max)`. + * + * @param {Function} fn + * @return {Job} for chaining + * @api public + */ + +Job.prototype.restart = function(fn) { + var self = this + , client = this.client + , id = this.id + , key = 'q:job:' + id; + + client.hsetnx(key, 'max_restarts', 1, function(){ + client.hget(key, 'max_restarts', function(err, max){ + client.hincrby(key, 'restarts', 1, function(err, restarts){ + fn(err, Math.max(0, Number(max) + 1 - restarts), restarts, max); + }); + }); + }); + + return this; +}; + /** * Increment attemps, invoking callback `fn(remaining, attempts, max)`. * @@ -381,6 +486,20 @@ Job.prototype.attempt = function(fn){ return this; }; +/** + * Set max restarts to `n`. + * + * @param {Number} n + * @return {Job} for chaining + * @api public + */ + +Job.prototype.restarts = function(n){ + this._max_restarts = n; + return this; +}; + + /** * Set max attempts to `n`. * @@ -403,47 +522,149 @@ Job.prototype.attempts = function(n){ */ Job.prototype.remove = function(fn){ - this.removeState(); - getSearch().remove(this.id); - this.client.del('q:job:' + this.id, fn || noop); + var self = this; + var client = self.client; + + this.state('removed', function(err) + { + if(err) return (fn || noop)(err); + + if(exports.indexing) + getSearch().remove(self.id); + + client.del('q:job:' + self.id, noop); + client.del('q:job:' + self.id + ':state', noop); + client.del('q:job:' + self.id + ':log', fn || noop); + }); + return this; }; /** - * Remove state and callback `fn(err)`. + * Set state to `state`. * - * @param {Function} fn + * @param {String} script * @return {Job} for chaining - * @api public + * @api private */ -Job.prototype.removeState = function(fn){ - var client = this.client - , state = this._state; - client.zrem('q:jobs', this.id); - client.zrem('q:jobs:' + state, this.id); - client.zrem('q:jobs:' + this.type + ':' + state, this.id); - return this; +var scripts = {}; +var scriptCache = {}; + +Job.prototype.cachedEval = function(script) { + var args = Array.prototype.slice.call(arguments, 0); + var fn = args[args.length-1]; + var hash = scriptCache[script]; + var self = this; + var client = self.client; + + if(typeof fn != 'function') { + args.push(noop); + fn = noop; + } + + if(!hash) { + hash = crypto.createHash('sha1').update(scripts[script], 'utf8').digest('hex'); + scriptCache[script] = hash; + } + + args[0] = hash; + args[args.length-1] = function(err) { + if(err && (err.message.indexOf('NOSCRIPT') >= 0)) { + console.log("info: loading script " + script + " into cache as " + scriptCache[script]); + + args[0] = scripts[script]; + args[args.length-1] = fn; + + return client.eval.apply(client, args); + } + else { + return fn.apply(self, arguments); + } + }; + + client.evalsha.apply(client, args); + return this; }; /** * Set state to `state`. * * @param {String} state + * @param {Function} fn * @return {Job} for chaining * @api public */ -Job.prototype.state = function(state){ - var client = this.client; - this.removeState(); - this._state = state; - this.set('state', state); - client.zadd('q:jobs', this._priority, this.id); - client.zadd('q:jobs:' + state, this._priority, this.id); - client.zadd('q:jobs:' + this.type + ':' + state, this._priority, this.id); - // increase available jobs, used by Worker#getJob() - if ('inactive' == state) client.lpush('q:' + this.type + ':jobs', 1); +scripts.stateLUA = + "local acquire = nil\n" + + "local stateChange = nil\n" + + "acquire = function(id, priority, group)\n" + + "if not group or group == '' then return 0 end\n" + + "priority = tonumber(priority)\n" + + "if not priority then priority = 0 end\n" + + "local owner = redis.call('get', 'q:lockowners:' .. group)\n" + + "if owner and owner ~= id then\n" + + "local oprio = tonumber(redis.call('hget', 'q:job:' .. owner, 'priority'))\n" + + "local ostate = redis.call('hget', 'q:job:' .. owner, 'state')\n" + + "local otype = redis.call('hget', 'q:job:' .. owner, 'type')\n" + + "if not oprio then oprio = 0 end\n" + + "if 'inactive' == ostate and otype and oprio > priority then\n" + + "if redis.call('zrem', 'q:jobs:' .. otype .. ':inactive', owner) == 1 then\n" + + "stateChange(tostring(owner), otype, 'staged', oprio, group, nil)\n" + + "return 1\n" + + "end\n" + + "end\n" + + "end\n" + + "if not owner and redis.call('zcard', 'q:staged:' .. group) ~= 0 then\n" + + "local best = redis.call('zrange', 'q:staged:' .. group, 0, 0)[1]\n" + + "redis.call('zrem', 'q:staged:' .. group, best)\n" + + "redis.call('set', 'q:lockowners:' .. group, best)\n" + + "local bprio = tonumber(redis.call('hget', 'q:job:' .. best, 'priority'))\n" + + "local btype = redis.call('hget', 'q:job:' .. best, 'type')\n" + + "stateChange(tostring(best), btype, 'inactive', bprio, group, nil)\n" + + "return 1\n" + + "end\n" + + "return 0\n" + + "end\n" + + "stateChange = function(id, type, state, priority, group, update_by)\n" + + "local old = redis.call('getset', 'q:job:' .. id .. ':state', state)\n" + + "if state == old then return state end\n" // state has already changed + + "if 'active' == state and update_by then redis.call('hset', 'q:job:' .. id, 'update_by', update_by) end\n" + + "if 'removed' == state then redis.call('zrem', 'q:jobs', id) end\n" + + "if old then\n" + + "redis.call('zrem', 'q:jobs:' .. old, id)\n" + + "redis.call('zrem', 'q:jobs:' .. type .. ':' .. old, id)\n" + + "end\n" + + "if ('complete' == state or 'removed' == state) and group then redis.call('zrem', 'q:staged:' .. group, id) end\n" + + "if 'removed' ~= state then\n" + + "redis.call('hset', 'q:job:' .. id, 'state', state)\n" + + "redis.call('zadd', 'q:jobs', priority, id)\n" + + "redis.call('zadd', 'q:jobs:' .. state, priority, id)\n" + + "redis.call('zadd', 'q:jobs:' .. type .. ':' .. state, priority, id)\n" + + "if 'inactive' == state then redis.call('lpush', 'q:' .. type .. ':jobs', 1) end\n" + + "if 'staged' == state and group then redis.call('zadd', 'q:staged:' .. group, priority, id) end\n" + + "end\n" + + "if group and ('complete' == state or 'removed' == state or ('staged' == state and 'inactive' == old)) then\n" + + "if redis.call('get', 'q:lockowners:' .. group) == id then redis.call('del', 'q:lockowners:' .. group) end\n" + + "end\n" + + "if acquire(id, priority, group) == 1 then return redis.call('hget', 'q:job:' .. id, 'state') end\n" + + "return state\n" + + "end\n" + + "return stateChange(ARGV[1], ARGV[2], ARGV[3], ARGV[4], ARGV[5], ARGV[6])\n" + ; + +Job.prototype.state = function(state, fn) { + var self = this; + var client = self.client; + var update_by = this._heartbeat ? Date.now() + this._heartbeat : null; + + self.cachedEval('stateLUA', 0, ''+self.id, self.type, state, self._priority, self._group, update_by, function(err, newstate) { + if(err) return (fn || noop)(err); + self._state = newstate; + if(fn) fn(); + }); + return this; }; @@ -457,13 +678,14 @@ Job.prototype.state = function(state){ Job.prototype.error = function(err){ if (0 == arguments.length) return this._error; - + var str, summary; + if ('string' == typeof err) { - var str = err - , summary = ''; + str = err; + summary = ''; } else { - var str = err.stack || err.message - , summary = str.split('\n')[0]; + str = err.stack || err.message || (''+err); + summary = str.split('\n')[0]; } this.set('failed_at', Date.now()); @@ -476,32 +698,45 @@ Job.prototype.error = function(err){ * Set state to "complete", and progress to 100%. */ -Job.prototype.complete = function(){ - return this.set('progress', 100).state('complete'); +Job.prototype.complete = function(fn){ + this.set('updated_at', Date.now()); + return this.set('progress', 100).state('complete', fn); }; /** * Set state to "failed". */ -Job.prototype.failed = function(){ - return this.state('failed'); +Job.prototype.failed = function(fn){ + return this.state('failed', fn); }; /** * Set state to "inactive". */ -Job.prototype.inactive = function(){ - return this.state('inactive'); +Job.prototype.inactive = function(fn){ + return this.state('inactive', fn); }; /** * Set state to "active". */ -Job.prototype.active = function(){ - return this.state('active'); +Job.prototype.active = function(fn){ + return this.state('active', fn); +}; + +/** + * Set state to "staged" if this jobs of this group should be staged. + */ + +Job.prototype.staged = function(fn){ + if(!this._group) + return this.inactive(fn); + else{ + return this.state('staged', fn); + } }; /** @@ -514,10 +749,12 @@ Job.prototype.active = function(){ Job.prototype.save = function(fn){ var client = this.client - , fn = fn || noop , max = this._max_attempts + , maxr = this._max_restarts + , group = this._group , self = this; - + fn = fn || noop; + // update if (this.id) return this.update(fn); @@ -528,6 +765,8 @@ Job.prototype.save = function(fn){ self.id = id; self._state = self._state || 'inactive'; if (max) client.hset(key, 'max_attempts', max); + if (maxr) client.hset(key, 'max_restarts', maxr); + if (group) client.hset(key, 'group', group); client.sadd('q:job:types', self.type); self.set('type', self.type); self.set('created_at', Date.now()); @@ -559,6 +798,12 @@ Job.prototype.update = function(fn){ // delay if (this._delay) this.set('delay', this._delay); + + // Heartbeat + if (this._heartbeat) { + this.set('heartbeat', this._heartbeat); + this.set('update_by', Date.now() + this._heartbeat); + } // updated timestamp this.set('updated_at', Date.now()); @@ -567,11 +812,41 @@ Job.prototype.update = function(fn){ this.set('priority', this._priority); // data - this.set('data', json, fn); + this.set('data', json); // state - this.state(this._state); + this.state(this._state, function(err) + { + if(err) return (fn || noop)(err); + + // search data + if(exports.indexing) + getSearch().index(json, this.id, fn); + else if(fn) + fn(); + }); + + return this; +}; + +/** + * Update the job and callback `fn(err)`. + * + * @param {Function} fn + * @api public + */ + +Job.prototype.saveData = function(fn){ + var json; + + // serialize json data + try { + json = JSON.stringify(this.data); + } catch (err) { + return fn(err); + } - // search data - getSearch().index(json, this.id); + // data + this.set('data', json, fn); + return this; }; diff --git a/lib/queue/worker.js b/lib/queue/worker.js index d9eb0807..ca72fd0b 100644 --- a/lib/queue/worker.js +++ b/lib/queue/worker.js @@ -57,16 +57,77 @@ Worker.prototype.__proto__ = EventEmitter.prototype; * @api private */ -Worker.prototype.start = function(fn){ +Worker.prototype.start = function(delay, fn, errfn){ var self = this; - self.getJob(function(err, job){ - if (err) self.error(err, job); - if (!job || err) return process.nextTick(function(){ self.start(fn); }); - self.process(job, fn); - }); + + if(typeof delay == 'function') errfn = fn, fn = delay, delay = false; + + if(errfn) self.errfn = errfn; + if(!self.errfn) self.errfn = Worker.errorCallback; + + var amount = 0; + + if(delay) + amount = Math.round(Math.random()*30000); + else + amount = Math.round(Math.random()*200); + + if(self.stopped) { + self.queue.stopped(self); + return self; + } + + setTimeout(function() + { + self.getJob(function(err, job, timeout){ + if (err) self.error(err, job); + + if(timeout) + return process.nextTick(function(){ self.start(false, fn); }); + + if (!job || err) + return process.nextTick(function(){ self.start(true, fn); }); + + self.process(job, fn); + }); + }, amount); + return this; }; +/** + * Tell the worker to stop processing jobs. + * + * @api private + */ + +Worker.prototype.stop = function() { + var self = this; + + self.stopped = true; + + // Push a few entries on the queue to trigger blpop to timeout + // Why delay and trigger more than one? Someone else might scoop up + // the event before realizing they are being restarted - so then they + // have a long delay before they end up restarting. + + setTimeout(function() { self.client.lpush('q:' + self.type + ':jobs', -1, function() {}); }, 2000); + setTimeout(function() { self.client.lpush('q:' + self.type + ':jobs', -1, function() {}); }, 2000); +}; + +/** + * Default error handler callback. + * + * @param {String} level error level (debug, info, warning, error) + * @param {String} message error message + * @param {Object} data map of additional fields to include in messages. if job is known should include 'job' + * @api private + */ + +Worker.errorCallback = function(level, message, data) { + console.log(level + ": " + message + " - " + JSON.stringify(data)); +}; + /** * Error handler, currently does nothing. * @@ -76,9 +137,16 @@ Worker.prototype.start = function(fn){ * @api private */ -Worker.prototype.error = function(err, job){ - // TODO: emit non "error" - console.error(err.stack || err.message); +Worker.prototype.error = function(err, job) { + var self = this; + self.errfn("warning", "Worker failure", { + job: job ? job.id : undefined, + type: self.type, + data: job ? job.data : undefined, + error: typeof err == 'object' ? err.message : err, + stack: err.stack + }); + return this; }; @@ -96,14 +164,32 @@ Worker.prototype.error = function(err, job){ Worker.prototype.failed = function(job, err, fn){ var self = this; events.emit(job.id, 'failed'); - job.failed().error(err); - self.error(err, job); - job.attempt(function(error, remaining, attempts, max){ - if (error) return self.error(error, job); - remaining - ? job.inactive() - : job.failed(); - self.start(fn); + + job.failed(function(err2) { + if(err2) { + self.errfn("error", "Worker.failed can't move job to failed", { job: job.id, type: self.type }); + } + + job.error(err); + self.error(err, job); + + job.attempt(function(error, remaining, attempts, max){ + if (error) + { + self.error(error, job); + return self.start(true, fn); + } + + var state = remaining ? 'inactive' : 'failed'; + + job[state](function(err3) { + if(err3) { + self.errfn("error", "Worker.failed can't move job to " + state, { job: job.id, type: self.type }); + } + + self.start(fn); + }); + }); }); }; @@ -122,15 +208,29 @@ Worker.prototype.failed = function(job, err, fn){ Worker.prototype.process = function(job, fn){ var self = this , start = new Date; - job.active(); - fn(job, function(err){ - if (err) return self.failed(job, err, fn); - job.complete(); - job.set('duration', job.duration = new Date - start); - self.emit('job complete', job); - events.emit(job.id, 'complete'); - self.start(fn); + + job.active(function(err) { + if(err) { + self.errfn("error", "Worker.process can't move job to active", { job: job.id, type: self.type }); + return self.failed(job, err, fn); + } + + fn(job, function(err){ + if (err) return self.failed(job, err, fn); + + job.complete(function(err) { + if(err) { + self.errfn("error", "Worker.process can't move job to complete", { job: job.id, type: self.type }); + } + + job.set('duration', job.duration = new Date - start); + self.emit('job complete', job); + events.emit(job.id, 'complete'); + self.start(fn); + }); + }); }); + return this; }; @@ -169,11 +269,92 @@ Worker.prototype.getJob = function(fn){ || (clients[self.type] = redis.createClient()); // BLPOP indicates we have a new inactive job to process - client.blpop('q:' + self.type + ':jobs', 0, function(err) { - self.zpop('q:jobs:' + self.type + ':inactive', function(err, id){ - if (err) return fn(err); + + var timeout = 45 + Math.round(Math.random()*15); + + client.blpop('q:' + self.type + ':jobs', timeout, function(err, data) { + + if(err) { + self.errfn("warning", "Worker.getJob waiting for job failed", { error: err.message, type: self.type }); + + // Attempt to revert the blpop + // (after a short pause in case redis had a hiccup) + + setTimeout(function() { + client.lpush('q:' + self.type + ':jobs', 1, function(err2) { + if(err2) self.errfn("error", "Worker.getJob could not retry blpop", { error: err2.message, type: self.type }); + }); + }, 2500); + + return fn(err); + } + + if(!data) + { + // We timed out so that the worker can check if it should stop + return fn(null, null, true); + } + + if((data.length == 2) && (data[1] == -1)) + { + // We received a notification that this is a shutdown + // If we're not really shutting down we'll ignore this quickly + return fn(null, null, true); + } + + self.zpop('q:jobs:' + self.type + ':inactive', function(err, id) { + if (err) { + self.errfn("warning", "Worker.getJob can't pop job", { error: err.message }); + + // Attempt to revert the blpop + // (after a short pause in case redis had a hiccup) + + setTimeout(function() { + client.lpush('q:' + self.type + ':jobs', 1, function(err2) { + if(err2) self.errfn("error", "Worker.getJob could not revert blpop", { error: err2.message, type: self.type }); + }); + }, 2500); + + return fn(err); // return the outer error + } + if (!id) return fn(); - Job.get(id, fn); + + Job.get(id, function(err, job) { + if(err || !job) { + self.errfn("warning", "Worker.getJob can't find next job", { job: id, error: err ? err.message : undefined, type: self.type }); + + // Attempt to put the job back on the queue if it still exists + // (after a short pause in case redis had a hiccup) + + setTimeout(function() { + var lua = + "local state = redis.call('hget', KEYS[1], 'state')\n" + + "local jtype = redis.call('hget', KEYS[1], 'type')\n" + + "if not state or not jtype or 'inactive' ~= state then return 0 end\n" + + "local priority = redis.call('hget', KEYS[1], 'priority')\n" + + "if not priority then priority = 0 end\n" + + "redis.call('zadd', KEYS[2], tonumber(priority), ARGV[1])\n" + + "redis.call('lpush', KEYS[3], 1)\n" + + "return 1" + ; + + var jkey = 'q:job:' + id; + var qkey = 'q:jobs:' + self.type + ':inactive'; + var bkey = 'q:' + self.type + ':jobs'; + + self.client.eval(lua, 3, jkey, qkey, bkey, id, function(err3, result) { + if(err3 || !result) { + self.errfn("error", "Worker.getJob can't add job back to kue", { job: id, error: err3 ? err3.message : undefined, result: result, type: self.type }); + } else { + self.errfn("info", "Worker.getJob added job back to kue", { job: id, type: self.type }); + } + }); + }, 2500); + } + + return fn(err, job); + }); }); }); };