-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsockets_server.py
52 lines (37 loc) · 1.68 KB
/
sockets_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import websockets
from request import IResponder, Request, Response
from websockets.asyncio.server import ServerConnection, serve
from websockets.protocol import State
from models import ModelManager
class SocketResponder(IResponder):
def __init__(self, websocket: ServerConnection):
self.websocket = websocket
async def raw_response(self, response: Response):
if self.websocket.state == State.OPEN:
await self.websocket.send("\n" + response.toJSON())
async def response(self, response: Response):
await self.raw_response(response)
async def intermediate_response(self, response: Response):
await self.raw_response(response)
async def handler(websocket: ServerConnection, model_manager: ModelManager):
responder = SocketResponder(websocket)
try:
async for message in websocket:
try:
request = Request.from_json(message)
await request.handle(model_manager, responder)
except Exception as e:
await Response.new_no_id_error(str(e)).send(responder)
except websockets.exceptions.ConnectionClosedError:
pass
def start(host: str, port: int, model_manager: ModelManager):
import threading
thread = threading.Thread(target=task, args=(host, port, model_manager))
thread.start()
def task(host: str, port: int, model_manager: ModelManager):
return asyncio.run(run(host, port, model_manager))
async def run(host: str, port: int, model_manager: ModelManager):
print(f"Starting sockets server on {host}:{port}")
server = await serve(lambda ws: handler(ws, model_manager), host, port)
await server.serve_forever()