-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatcher.js
93 lines (84 loc) · 2.42 KB
/
dispatcher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
'use strict;'
var config = require('./config').config;
var util = require('util');
var schema = require('raintank-core/schema');
var queue = require('raintank-queue');
var consumer = new queue.Consumer({
mgmtUrl: config.queue.mgmtUrl,
consumerSocketAddr: config.queue.consumerSocketAddr
});
var producer = queue.Publisher;
producer.init({
publisherSocketAddr: config.queue.publisherSocketAddr,
partitions: config.queue.partitions,
});
function dispatch() {
var now = new Date().getTime();
// schedule next run.
next_run(now);
// get position from current timestamp.
var second = (now/1000) % 3600;
second = Math.floor(second);
var filter = {
position: second
}
// find all taskSchedules that should be run now.
schema.taskSchedules.model.find(filter).lean().exec(function(err, taskSchedules) {
if (err){
console.log('ERROR getting collectorTaskSchedules');
return;
}
if (taskSchedules.length < 1) {
return;
}
var messages = [];
taskSchedules.forEach(function(ts) {
ts.timestamp = now;
console.log('sending task to queue.');
messages.push(ts);
});
console.log("sending messages to queue.: %j", messages);
producer.send('tasks', messages);
});
}
var isLeader = false;
var timer;
function init() {
console.log('waiting for dispatcher_lock');
consumer.on('connect', function() {
consumer.join('lock', 'dispatcher');
});
consumer.on('ready', function(data) {
//who ever has the '0' partition is the leader.
if (data.partitions.indexOf(0) > -1) {
if (!isLeader) {
console.log('taking over as leader.');
isLeader = true;
next_run(new Date().getTime());
}
} else {
isLeader = false;
clearTimeout(timer);
}
});
consumer.on('disconnect', function() {
isLeader = false;
clearTimeout(timer);
});
}
function next_run(now) {
// schedule next run.
var drift = (now % 1000);
if (drift > 500) {
//TODO: send drift to statsD
console.log('WARNING: drift is ' + drift);
}
if (isLeader) {
timer = setTimeout(dispatch, (1000 - drift));
}
}
init();
process.on( "SIGINT", function() {
console.log('CLOSING [SIGINT]');
process.exit();
});