Skip to content

Commit 163ae13

Browse files
committed
watch stuck active jobs
1 parent daab829 commit 163ae13

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

lib/q.js

+41-1
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,45 @@ q.series = function (actions, callback) {
4949
async.series(tasks, callback);
5050
};
5151

52-
module.exports = q;
52+
/*
53+
* Watch for active stuck jobs
54+
*
55+
* @param {Number} watch stuck active jobs every sec seconds
56+
*/
57+
58+
q.watchStuckActiveJobs = function (sec) {
59+
60+
sec = sec || 5 * 60;
61+
setTimeout(watch, sec * 1000);
62+
63+
function watch () {
64+
q.client.zrange('q:jobs:active', 1, -1, function (error, ids) {
65+
if (error) return console.log(error)
66+
67+
var tasks = ids.map(function (id) {
68+
return function (next) {
69+
kue.Job.get(id, function (error, job) {
70+
if (error) return next(error);
71+
72+
// re-enqueue stuck active jobs
73+
if ( moment(+job.updated_at).isBefore(moment().subtract(5, 'minutes')) ) {
74+
return job.state('inactive', next);
75+
}
76+
77+
next(null);
78+
});
79+
}
80+
});
5381

82+
var done = function (error) {
83+
if (error) console.log(error);
84+
q.watchStuckActiveJobs(sec);
85+
}
86+
87+
async.parallelLimit(tasks, 10, done)
88+
});
89+
}
90+
91+
};
92+
93+
module.exports = q;

lib/worker.js

+6
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,9 @@ q.process('build-pdf:upload-to-s3', config.limit, function (job, done) {
4646
});
4747

4848
});
49+
50+
/*
51+
* Watch stuck active jobs
52+
*/
53+
54+
q.watchStuckActiveJobs();

0 commit comments

Comments
 (0)