-
Notifications
You must be signed in to change notification settings - Fork 694
/
Copy pathwebsockets_chat_asyncio.py
146 lines (120 loc) · 4.31 KB
/
websockets_chat_asyncio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!./uwsgi --http-socket :9090 --asyncio 100 --module tests.websockets_chat_asyncio --greenlet
import uwsgi
import asyncio
import asyncio_redis
import time
import greenlet
class GreenFuture(asyncio.Future):
def __init__(self):
super().__init__()
self.greenlet = greenlet.getcurrent()
self.add_done_callback(lambda f: f.greenlet.switch())
def result(self):
while True:
if self.done():
return super().result()
self.greenlet.parent.switch()
@asyncio.coroutine
def redis_open(f):
connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379)
f.set_result(connection)
f.greenlet.switch()
@asyncio.coroutine
def redis_subscribe(f):
connection = yield from asyncio_redis.Connection.create(host='localhost', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['foobar'])
f.set_result(subscriber)
f.greenlet.switch()
def ws_recv_msg(g):
g.has_ws_msg = True
g.switch()
@asyncio.coroutine
def redis_wait(subscriber, f):
reply = yield from subscriber.next_published()
f.set_result(reply.value)
f.greenlet.switch()
@asyncio.coroutine
def redis_publish(connection, msg):
yield from connection.publish('foobar', msg.decode('utf-8'))
def application(env, sr):
ws_scheme = 'ws'
if 'HTTPS' in env or env['wsgi.url_scheme'] == 'https':
ws_scheme = 'wss'
if env['PATH_INFO'] == '/':
sr('200 OK', [('Content-Type', 'text/html')])
return ("""
<html>
<head>
<script language="Javascript">
var s = new WebSocket("%s://%s/foobar/");
s.onopen = function() {
alert("connected !!!");
s.send("ciao");
};
s.onmessage = function(e) {
var bb = document.getElementById('blackboard')
var html = bb.innerHTML;
bb.innerHTML = html + '<br/>' + e.data;
};
s.onerror = function(e) {
alert(e);
}
s.onclose = function(e) {
alert("connection closed");
}
function invia() {
var value = document.getElementById('testo').value;
s.send(value);
}
</script>
</head>
<body>
<h1>WebSocket</h1>
<input type="text" id="testo"/>
<input type="button" value="invia" onClick="invia();"/>
<div id="blackboard" style="width:640px;height:480px;background-color:black;color:white;border: solid 2px red;overflow:auto">
</div>
</body>
</html>
""" % (ws_scheme, env['HTTP_HOST'])).encode()
elif env['PATH_INFO'] == '/favicon.ico':
return b""
elif env['PATH_INFO'] == '/foobar/':
uwsgi.websocket_handshake()
print("websockets...")
# a future for waiting for redis connection
f = GreenFuture()
asyncio.Task(redis_subscribe(f))
# the result() method will switch greenlets if needed
subscriber = f.result()
# open another redis connection for publishing messages
f0 = GreenFuture()
t = asyncio.Task(redis_open(f0))
connection = f0.result()
myself = greenlet.getcurrent()
myself.has_ws_msg = False
# start monitoring websocket events
asyncio.get_event_loop().add_reader(uwsgi.connection_fd(), ws_recv_msg, myself)
# add a 4 seconds timer to manage ping/pong
asyncio.get_event_loop().call_later(4, ws_recv_msg, myself)
# add a coroutine for redis messages
f = GreenFuture()
asyncio.Task(redis_wait(subscriber, f))
# switch again
f.greenlet.parent.switch()
while True:
# any redis message in the queue ?
if f.done():
msg = f.result()
uwsgi.websocket_send("[%s] %s" % (time.time(), msg))
# restart coroutine
f = GreenFuture()
asyncio.Task(redis_wait(subscriber, f))
if myself.has_ws_msg:
myself.has_ws_msg = False
msg = uwsgi.websocket_recv_nb()
if msg:
asyncio.Task(redis_publish(connection, msg))
# switch again
f.greenlet.parent.switch()