-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqdis.js
145 lines (101 loc) · 4.43 KB
/
qdis.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
var redis = require("redis");
var restify = require('restify');
var RestAPI = require('./lib/restapi.js');
//globals so that closures only need to be created once
var pub_queues = [];
var map = {};
var global_channel;
var sub_client = redis.createClient();
var pub_client = redis.createClient();
var rest_api = new RestAPI( map, pub_client, sub_client );
sub_client.on("error", function (err) {
console.log("sub error " + err);
});
pub_client.on("error", function (err) {
console.log("pub error " + err);
});
var print_error = function (err, replies) { if ( err ) { console.log( "multi error!: " + err ) } };
//this get's the pub queue's R
var on_lindex_response = function( err, reply ) {
//if the queue ( redis list ) is empty, it might be a race with the
//undelivered message checker
if ( err ) {
//TODO log this so we know it happened, but otherwise just bail
return;
}
var sub_queues = map[global_channel];
var pub_queue = global_channel; //they are named the same
// multi's only ensure ordering and all the commands will get executed
// (or none) not rollback on error of one of the
// commands: http://redis.io/topics/transactions
var multi = pub_client.multi();
//pop from the pub_queue, but only if transaction succeeds
multi.rpop( pub_queue );
for (var i = 0; i < sub_queues.length; i++ ) {
var sub_queue = sub_queues[i];
multi.lpush( sub_queue, reply );
// regardless of the result above, this publish happens
// TODO we should wait for the multi result before publishing?
multi.publish( sub_queue, "1" );
}
// drains multi queue and runs atomically
multi.exec( print_error );
};
var channel_message_queue = [];
var on_message = function (channel, message, fake ) {
//set the global channel for on_lindex_response closure
//the channel name is also used as the name of the list
//get the last item
global_channel = channel;
pub_client.lindex( channel, -1, on_lindex_response );
};
//under certain conditions there may be undelivered messages in pub queues...
//for instance, say qdis goes down, but Redis stays up, then we restart redis
//we need to periodically check for this and deliver them, we do this
//by faking out on_message to avoid race conditions
var check_for_undelivered_messages = function() {
for( var i = 0; i< pub_queues.length; i++ ) {
pub_client.llen( pub_queues[i], on_llen_complete_generator( pub_queues[i] ) );
}
};
var on_llen_complete_generator = function( pub_queue ) {
var on_llen_complete = function( err, length ) {
if ( length === 0 ) {
return;
}
//for now we just trigger N fake on_message calls
//TODO eventually the message should probably length of the list instead
for( var i = 0; i < length; i++ ) {
on_message( pub_queue, 1, true );
}
};
return on_llen_complete;
};
//everytime we reconnect we need to re-subscribe
var undelivered_check_internval_id;
var on_sub_client_connect = function() {
//get our maps, and set out our automatic publications
pub_client.hgetall( '_qdis_mappings', function( err, redis_map ) {
for ( var pub_queue in redis_map ) {
pub_queues.push( pub_queue );
sub_queues = JSON.parse(redis_map[pub_queue]);
map[pub_queue] = sub_queues;
}
//setup our subscriptions now that we're ready to handle them
for (var i = 0; i< pub_queues.length; i++ ) {
sub_client.subscribe( pub_queues[i] );
}
check_for_undelivered_messages();
//once every second to check again, just incase
undelivered_check_internval_id = setTimeout( check_for_undelivered_messages, 1000 );
});
};
var on_connection_end = function() {
clearInterval(undelivered_check_internval_id);
undelivered_check_internval_id = undefined;
};
//these only need to be set once!
sub_client.on( 'message', on_message );
sub_client.on( 'connect', on_sub_client_connect );
sub_client.on( 'end', on_connection_end );
rest_api.listen(6380);