-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmn_service.py
342 lines (293 loc) · 12.8 KB
/
mn_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""
Module contains RequestHandler classes for Desktop / Mobile App interactions
"""
__author__ = 'morozov'
from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, HTTPError
from tornado.log import access_log, app_log, gen_log
import tornado.httputil
from tornado.options import define, options
from tornado.iostream import StreamClosedError
import sys
import os
import mn_httpserver as http
import mynotes as mn
import mn_instance as instance
class MN_Handler(RequestHandler):
# Base class for Desktop/Mobile App handlers
def __init__(self, application, request, **kwargs):
RequestHandler.__init__(self, application, request, **kwargs)
_headers = self.request.headers
self.ProductID = _headers.get(mn.MN_PRODUCT_ID,'')
self.RequestID = _headers.get(mn.MN_REQUEST_ID,'')
self._timeout = None
self._instance = application._instance
def finish(self, chunk=None):
"""Finishes this response, ending the HTTP request.
!!!! pass AssertionError if request is closed"""
if self._finished:
raise RuntimeError("finish() called twice. May be caused "
"by using async operations without the "
"@asynchronous decorator.")
if chunk is not None:
self.write(chunk)
# Automatically support ETags and add the Content-Length header if
# we have not flushed any content yet.
if not self._headers_written:
if (self._status_code == 200 and
self.request.method in ("GET", "HEAD") and
"Etag" not in self._headers):
etag = self.compute_etag()
if etag is not None:
self.set_header("Etag", etag)
inm = self.request.headers.get("If-None-Match")
if inm and inm.find(etag) != -1:
self._write_buffer = []
self.set_status(304)
if self._status_code == 304:
assert not self._write_buffer, "Cannot send body with 304"
self._clear_headers_for_304()
elif "Content-Length" not in self._headers:
content_length = sum(len(part) for part in self._write_buffer)
self.set_header("Content-Length", content_length)
if hasattr(self.request, "connection"):
# Now that the request is finished, clear the callback we
# set on the IOStream (which would otherwise prevent the
# garbage collection of the RequestHandler when there
# are keepalive connections)
self.request.connection.stream.set_close_callback(None)
if not self.application._wsgi:
# enhancement is here:
try:
self.flush(include_footers=True)
self.request.finish()
except AssertionError:
pass
self._log()
self._finished = True
self.on_finish()
def _request_summary(self):
# class specific summary
_h = self.request.headers
_h.get(mn.MN_RESPONSE_CODE,'--')
remote_ip = _h.get('X-Real-IP',self.request.remote_ip)
res = "(%s) %s %s [%s(%s)] (%s)" % \
(_h.get(mn.MN_RESPONSE_CODE,'--'),
self.request.method,
self.request.uri,
getattr(self,'ProductID','--') or '--',
getattr(self,'RequestID','--') or '--',
_h.get('X-Real-IP', _h.get('X-Forwarded-For', self.request.remote_ip)))
return res + self._headers.get(mn.MN_ERROR_MESSAGE,'')
def _handle_request_exception(self, e):
# If something goes wrong at one side of interaction,
# closes the other side as well (desktop / app)
_partner = None
_interaction = mn.get_Interaction(self.RequestID)
if _interaction and _interaction._source:
_interaction._completed = False
if _interaction._source == self:
_partner = _interaction._destination
else:
_partner = _interaction._source
#super(IWP_Handler, self)._handle_request_exception(e)
if isinstance(e, HTTPError):
if e.log_message:
format = "%d %s: " + e.log_message
args = [e.status_code, self._request_summary()] + list(e.args)
gen_log.warning(format, *args)
if e.status_code not in tornado.httputil.responses and not e.reason:
gen_log.error("Bad HTTP status code: %d", e.status_code)
self.send_error(500, exc_info=sys.exc_info())
else:
self.send_error(e.status_code, exc_info=sys.exc_info())
elif isinstance(e, StreamClosedError):
if _partner:
IOLoop.instance().add_callback (_partner.send_error, 503, exc_info=sys.exc_info())
self.send_error(503, exc_info=sys.exc_info())
else:
self.send_error(500, exc_info=sys.exc_info())
elif isinstance(e, http.StreamClosedWarning):
app_log.warning('%s write to closed stream', self._request_summary())
if _partner:
IOLoop.instance().add_callback (_partner.send_error, 503, exc_info=sys.exc_info())
self.send_error(503, exc_info=sys.exc_info())
else:
self.send_error(500, exc_info=sys.exc_info())
pass
else:
app_log.error("Uncaught exception %s\n%r", self._request_summary(),self.request, exc_info=True)
self.send_error(500, exc_info=sys.exc_info())
def get_error_html(self, status_code, **kwargs):
# adds error message header instead of error page
reason = ''
if 'exception' in kwargs:
reason = str(kwargs['exception'])
self.add_header(mn.MN_ERROR_MESSAGE, reason)
return ''
def process_agent(self):
# Starts interaction with cached app (if any)
# waits for app otherwise
# (Desktop .... <-> Mobile App)
_client = mn.get_Client(self.ProductID)
if _client:
mn.Interaction(_client, self)(_client, self)
else:
self._add_wait()
self._timeout = IOLoop.instance().add_timeout(mn.MN_AGENT_TIMEOUT,self._response_no_client)
def process_reply(self):
# Replies: Mobile App <- Desktop
_interaction = mn.get_Interaction(self.RequestID, validateID = self.ProductID)
if _interaction and not _interaction.client._closed() and not _interaction.agent:
_interaction._set_agent(self)
_interaction(self,_interaction.client, source_finish=True)
elif _interaction and _interaction.agent:
self.request.connection.no_keep_alive = True
self.send_error(501,exc_info = (HTTPError, HTTPError(501, 'RequestID is being replied')))
else:
self.request.connection.no_keep_alive = True
self.send_error(502,exc_info = (HTTPError, HTTPError(502, 'No client to reply')))
def _response_no_client(self):
self._remove_wait()
self.add_header(mn.MN_RESPONSE_TYPE, mn.MN_NO_CLIENT)
self.finish()
def _response_no_agent(self):
self._remove_client()
self.add_header(mn.MN_RESPONSE_TYPE, mn.MN_NO_AGENT)
self.finish()
def _response_no_reply(self):
if self._closed():
reason = 'Request closed'
else:
reason = 'No reply from agent'
self.add_header(mn.MN_RECYCLED, mn.MN_NO_REPLY)
self.send_error(504, exc_info = (HTTPError,HTTPError(504,reason)))
def _clear_transaction(self):
_interaction = mn.get_Interaction(self.RequestID)
if _interaction:
_interaction._clear_transaction()
def _add_wait(self):
mn.add_wait(self)
def _remove_wait(self):
mn.remove_wait(self)
def _add_client(self):
mn.add_client(self)
def _remove_client(self):
mn.remove_client(self)
def _add_cache(self):
mn.add_cache(self.ProductID)
def _closed(self):
return self.request.connection.stream.closed()
def _check_closed(self):
return self.request.connection.stream._check_closed()
class Agent_ready(MN_Handler):
# Desktop is ready to listen Mobile App requests
@tornado.web.asynchronous
def post(self):
self.process_agent()
def on_finish(self):
_interaction = mn.get_Interaction(self.RequestID)
if _interaction:
_interaction._clear_transaction()
class Agent_reply(MN_Handler):
# Desktop replies to Mobile App request
@tornado.web.asynchronous
def post(self):
self.process_reply()
class Client(MN_Handler, instance.MN_Instance_Handler):
# Mobile App connects to its Desktop (if any) to start transaction
def __init__(self, application, request, **kwargs):
MN_Handler.__init__(self, application, request, **kwargs)
self.mn_server = ''
self.mn_port = ''
self._search_count = 2
self._not_found_callback = self._response_no_agent
def process_client(self, repeat = True):
# Starts transaction with cached Desktop (if any);
# waits if Desktop is to be appeared soon;
# refers to new Desktop location if found;
_agent = None
if not repeat:
_agent = mn.get_Agent(self.ProductID)
if _agent:
mn.Interaction(self,_agent)(self,_agent)
elif not self._closed() and self._instance._isHere(self.ProductID) and mn.get_cache(self.ProductID):
if not repeat:
self._add_client()
self._timeout = IOLoop.instance().add_timeout(mn.MN_CLIENT_TIMEOUT, self.process_client)
elif self._closed() or self._instance._isHere(self.ProductID):
self._response_no_agent()
else:
self._find_desktop()
@tornado.web.asynchronous
def post(self):
self.process_client(repeat=False)
def on_finish(self):
_interaction = mn.get_Interaction(self.RequestID, remove=True)
if _interaction:
_interaction._clear(self.request.request_time()*1000)
class Agent_ping (MN_Handler):
# just 'ping' to choose the closest server
@tornado.web.asynchronous
def post(self):
self.finish()
define('port', default='80')
define('port_ssl', default='443')
define('host', default='', type=str)
define('certfile', default=None, type=str)
define('keyfile', default=None, type=str)
define('server')
define('master', default=None, type=tuple)
define('range_file', default='range.ini')
define('range_size', default=1000)
define('master_range', default=None)
define('sites', default=[], type=list)
define('http_max_clients', default=10)
def _set_max_clients():
if options.http_max_clients:
fake_client = instance.tornado.httpclient.AsyncHTTPClient(max_clients=options.http_max_clients)
del fake_client
tornado.options.add_parse_callback(_set_max_clients)
if __name__ == "__main__":
instance_conf = None
tornado.options.parse_command_line(final=False)
if options.port:
instance_conf = os.path.extsep.join(((os.path.join('instance', str(options.port))),'conf'))
tornado.options.parse_config_file("mynotes.conf", final=bool(not instance_conf))
if instance_conf and os.access(instance_conf, os.F_OK):
tornado.options.parse_config_file(instance_conf, final=False)
tornado.options.parse_command_line()
application = http.Application_mn([
(r"/ping", Agent_ping),
(r"/hello", instance.inst_Hello),
(r"/connected", instance.inst_Connected),
(r"/range", instance.inst_Range),
(r"/getuniversalid", instance.agent_getID),
(r"/connect", instance.agent_Connect),
(r"/client", instance.app_Client),
(r"/hello/.*", instance.inst_Hello_port),
(r"/connected/.*", instance.inst_Connected_port),
(r"/range/.*", instance.inst_Range_port),
(r"/find.*", instance.inst_Find),
(r"/client/.*", Client),
(r"/agentreply/.*", Agent_reply),
(r"/agent/.*", Agent_ready)],
instance=instance.mn_instance(
options.server, options.port,
master_instance=options.master,
range_file=options.range_file,
range_size=options.range_size,
master_range=options.master_range,
known_instances=options.sites
)
)
http_server = http.HTTPServer_mn(application)
http_server.listen(int(options.port), options.host)
if options.certfile and options.keyfile and options.port_ssl:
ssl_options={
"certfile": options.certfile,
"keyfile": options.keyfile,
}
https_server = http.HTTPServer_mn(application, ssl_options=ssl_options)
https_server.listen(options.port_ssl, options.host)
tornado.ioloop.IOLoop.instance().start()