-
Notifications
You must be signed in to change notification settings - Fork 124
/
Copy pathqueue.js
117 lines (95 loc) · 3 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
const EventEmitter = require('events').EventEmitter;
const errors = require('../errors');
class Queue extends EventEmitter {
constructor() {
super();
this.queue = [];
this.users = {};
this.running = false;
}
size() {
return this.queue.length;
}
process(concurrency, controller, handler) {
this.handler = handler;
this.concurrency = concurrency;
this.processing = 0;
this.start();
// Monkey patch to ensure queue processing size is roughly equal to amount of bots ready
setInterval(() => {
// Update concurrency level, possible bots went offline or otherwise
const oldConcurrency = this.concurrency;
this.concurrency = controller.getReadyAmount();
if (this.concurrency > oldConcurrency) {
for (let i = 0; i < this.concurrency - oldConcurrency; i++) {
this.checkQueue();
}
}
}, 50);
}
addJob(job, max_attempts) {
if (!(job.ip in this.users)) {
this.users[job.ip] = 0;
}
for (const link of job.getRemainingLinks()) {
this.queue.push({
data: link,
max_attempts: max_attempts,
attempts: 0,
ip: job.ip,
});
this.users[job.ip]++;
this.checkQueue();
}
}
checkQueue() {
if (!this.running) return;
if (this.queue.length > 0 && this.processing < this.concurrency) {
// there is a free bot, process the job
let job = this.queue.shift();
this.processing += 1;
this.handler(job).then((delay) => {
if (!delay) delay = 0;
// Allow users to request again before the promise resolve delay
this.users[job.ip]--;
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve();
}, delay);
});
}).catch((err) => {
if (err !== errors.NoBotsAvailable) {
job.attempts++;
}
if (job.attempts === job.max_attempts) {
// job failed
this.emit('job failed', job, err);
this.users[job.ip]--;
}
else {
// try again
this.queue.unshift(job);
}
}).then(() => {
this.processing -= 1;
this.checkQueue();
});
}
}
start() {
if (!this.running) {
this.running = true;
this.checkQueue();
}
}
pause() {
if (this.running) this.running = false;
}
/**
* Returns number of requests the ip currently has queued
*/
getUserQueuedAmt(ip) {
return this.users[ip] || 0;
}
}
module.exports = Queue;