-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.js
172 lines (149 loc) · 5.49 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
var celery = Npm.require('celery-shoot');
var Future = Npm.require('fibers/future');
/**
* A dictionary of CeleryClients
* @type {Object}
*/
CeleryClients = {};
/**
* Creates a new `CeleryClient` and stores it in `CeleryClients`
* @param name
* @constructor
*/
CeleryClient = function (name){
this._name = name;
this._connected = false;
CeleryClients[this._name] = this;
};
_.extend(CeleryClient.prototype, {
/**
* Connects synchronously, proxies `conf` through to celery.createClient
*
* @param {Object} conf
* @param {String} [conf.BROKER_URL='amqp://']
* @param {String} [conf.RESULT_BACKEND]
* @param {String} [conf.DEFAULT_QUEUE='celery']
* @param {String} [conf.DEFAULT_EXCHANGE='celery']
* @param {String} [conf.DEFAULT_EXCHANGE_TYPE='direct']
* @param {String} [conf.DEFAULT_ROUTING_KEY='celery']
* @param {String} [conf.RESULT_EXCHANGE='celeryresults']
* @param {String} [conf.EVENT_EXCHANGE='celeryev']
* @param {Boolean} [conf.SEND_TASK_SENT_EVENT=false]
* @param {Number} [conf.TASK_RESULT_EXPIRES=86400000]
* @param {Object} [conf.ROUTES={}]
*
* @throws Error
*
* @returns {Boolean}
*/
connect: function(conf){
var _this = this,
future = new Future();
this._client = celery.createClient(conf);
this._client.on('connect', function CeleryClient_connect(){
debug_client(_this._name, 'connected');
_this._connected = true;
if (!future.isResolved()){
future.return(true);
}
});
this._client.on('error', function CeleryClient_error(err){
debug_client(_this._name, 'connection error', err, err.stack);
if (!future.isResolved()){
future.throw(err);
}
});
this._client.on('end', function CeleryClient_end(){
debug_client(_this._name, 'connection closed');
_this._connected = false;
});
return future.wait()
},
/**
* Calls a Celery Task, returning futures for the result.
*
* @param {String} method
* @param {Array} args
* @param {Boolean} [trackStarted=false] if true, will return two futures, [0] for task started, and [1] for the result.
* @returns {Future|Future[]}
*/
call: function(method, args, trackStarted){
var _this = this,
result = new Future(),
started, celery_result;
// provide a separate `Future` if `trackStarted` is provided
if (!!trackStarted){
started = new Future();
}
// check that the client is currently connected.
if (!this._connected){
// causes both `started` & `result` to throw
started && result.proxy(started);
result.throw(new Error(debug_call_stmt(_this._name, method, null, "Call failed. Celery isn't connected")));
} else {
// call the method on the actual `client`
celery_result = this._client.call(method, args);
debug_call(this._name, method, celery_result.taskid, 'called');
// register event handlers for failure
celery_result.on('failure', CeleryClient_call_onError);
celery_result.on('revoked', CeleryClient_call_onError);
celery_result.on('rejected', CeleryClient_call_onError);
celery_result.on('ignored', CeleryClient_call_onError);
// register event handler for `started`
started && celery_result.on('started', CeleryClient_call_onStarted);
// register event handler for `success`
celery_result.on('success', CeleryClient_call_onSuccess);
}
if (started){
return [started, result];
} else {
return result;
}
function CeleryClient_call_onError(message){
debug_call(_this._name, method, celery_result.taskid, "Call failed [" + message.status + "]", message.traceback);
// only propagate the error to `started` if it hasn't resolved.
if (started && !started.isResolved()){
result.proxy(started);
}
result.throw(new Error(debug_call_stmt(_this._name, method, null, "Call failed [" + message.status + "]")));
}
function CeleryClient_call_onStarted(){
debug_call(_this._name, method, celery_result.taskid, "Started");
if (started.isResolved()) return;
started.return(true);
}
function CeleryClient_call_onSuccess(message){
// if `started` wasn't resolved -
if (started && !started.isResolved()){
debug_call(_this._name, method, celery_result.taskid, "Started (Warning: `started` was event forced)");
started.return(true);
}
debug_call(_this._name, method, celery_result.taskid, "Success", message.result);
result.return(message.result)
}
}
});
// debug code
var debug = function(){};
var debug_client = function(client_name, message){};
var debug_call = function(client_name, method_name, task_id, message){};
var debug_call_stmt = function(client_name, method_name, task_id, message){
return 'CeleryClient[' + client_name + ']#call(' + JSON.stringify(method_name) + ')[task_id:' + JSON.stringify(task_id) +'] - ' + message
};
if (!!process.env['METEOR_CELERY_DEBUG']){
debug = function(){
console.log.apply(console, arguments);
};
debug_client = function(client_name, message){
var log_message = 'CeleryClient[' + client_name + '] - ' + message,
args = [].slice.call(arguments, 2);
args.unshift(log_message);
console.log.apply(console, args);
};
debug_call = function(client_name, method_name, task_id, message){
var log_message = debug_call_stmt(client_name, method_name, task_id, message),
args = [].slice.call(arguments, 4);
args.unshift(log_message);
console.log.apply(console, args);
};
}