-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
108 lines (93 loc) · 3.11 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
const Redis = require('ioredis');
const Queue = require('bull');
const UUID = require('uuid');
const Job = require('bull/lib/job');
const JobResponses = {};
Queue.prototype.add = function (data, opts) {
let jobId = UUID.v4();
let job = Job.create(this, data, Object.assign({
jobId
}, opts));
job.response = new Promise((resolve, reject) => {
JobResponses[jobId] = {
resolve,
reject
};
});
return job;
};
const connect = (config) => {
if ('string' == typeof config.port) return new Redis(config.port, config.opts);
return new Redis(config.port, config.host, config.opts);
}
const queue = (name, port, host, opts) => {
if ('string' == typeof port) {
return new Queue(name, port, opts)
}
return new Queue(name, port, host, opts);
}
module.exports = {
QueueManager: class {
constructor(port = 6379, host = 'localhost', db = 0, opts = {}) {
this.config = {
port,
host,
opts: Object.assign({
db: db
}, opts, host instanceof Object ? host : {})
};
this.queues = {};
}
init() {
this.client = connect(this.config);
this.subscriber = connect(this.config);
this.createClient = function (type) {
switch (type) {
case 'client':
return this.client;
case 'subscriber':
return this.subscriber;
default:
return connect(this.config);
}
}
return this;
}
queue(name, port = 6379, host = 'localhost', db = 0, opts = {}) {
let q = this.queues[name];
if (!q) {
if (Object.keys(arguments).length == 1) {
q = queue(name, this.config.port, this.config.host, Object.assign({
db,
createClient: (type) => this.createClient(type)
}, this.config.opts));
} else {
q = queue(name, port, host, Object.assign({
db
}, opts, host instanceof Object ? host : {}));
}
this.queues[name] = q;
q.on('global:completed', function (job, result) {
if (JobResponses[job.jobId]) {
JobResponses[job.jobId].resolve(result);
}
delete JobResponses[job.jobId];
});
q.on('global:failed', function (job, err) {
if (JobResponses[job.jobId]) {
JobResponses[job.jobId].reject(err);
}
delete JobResponses[job.jobId];
});
}
return q;
}
shutdown() {
for (let q of Object.values(this.queues)) {
q.close();
}
this.client.quit();
this.subscriber.quit();
}
}
}