This repository has been archived by the owner on Jul 13, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
94 lines (75 loc) · 2.47 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
var uuid = require('node-uuid');
var ThoonkObject = require('thoonk').ThoonkBaseObject;
function Job(name, thoonk) {
ThoonkObject.call(this, name, thoonk);
this.bredis = this.thoonk._getBlockingRedis(name);
this.lredis = this.thoonk.lredis;
this.redis = this.thoonk.redis;
this.subscribables = [
'publish',
'retract',
'finish',
'retry',
'stall'
];
}
Job.prototype = Object.create(ThoonkObject.prototype);
Job.prototype.constructor = Job;
Job.prototype.objtype = 'job';
Job.prototype.scriptdir = __dirname + '/scripts';
(function () {
this.publish = function (item, opts, cb) {
var jobId = opts.id || uuid();
var args = [jobId, JSON.stringify(item), '' + Date.now()];
if (opts.priority) {
args.push(opts.priority);
}
if (opts.onFinish) {
this.once('job.id.finish:' + jobId, opts.onFinish);
}
this.runScript('publish', args, cb);
};
this.put = this.publish;
this.get = function (timeout, cb) {
var self = this;
this.bredis.brpop('job.ids:' + this.name, timeout || 0, function (err, args) {
if (args && args[1]) {
self.runScript('get', [args[1], '' + Date.now()], cb);
} else {
cb('Timeout');
}
});
};
this.finish = function (jobId, result, cb) {
var args = [jobId];
if (result) {
args.push(result);
}
this.runScript('finish', args, cb);
};
this.cancel = function (jobId, cb) {
this.runScript('cancel', [jobId], cb);
};
this.stall = function (jobId, cb) {
this.runScript('stall', [jobId], cb);
};
this.retry = function (jobId, cb) {
this.runScript('retry', [jobId, '' + Date.now()], cb);
};
this.retract = function (jobId, cb) {
this.runScript('retract', [jobId], cb);
};
this.getFailureCount = function (jobId, cb) {
this.redis.hget('job.cancelled:' + this.name, jobId, cb);
};
this.handleEvent = function (channel, msg) {
var objSplit = channel.split(':');
var typeSplit = objSplit[0].split('.');
var eventName = typeSplit[2];
if (~['publish', 'finish'].indexOf(eventName)) {
var msgSplit = msg.split('\x00');
this.emit('job.id.' + eventName + ':' + msgSplit[0], null, msgSplit[0], msgSplit[1] || null);
}
};
}).call(Job.prototype);
module.exports = Job;