Skip to content

Commit

Permalink
Improve websocket reconnection (#473)
Browse files Browse the repository at this point in the history
* clean up workspace; fix register_codec; improve reconnection

* upgrade imjoy-rpc

* set DISCONNECT_DELAY
  • Loading branch information
oeway authored Jul 10, 2023
1 parent 1cd89b0 commit c0ab9cb
Show file tree
Hide file tree
Showing 18 changed files with 68 additions and 36 deletions.
2 changes: 1 addition & 1 deletion hypha/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.15.27"
"version": "0.15.28"
}
2 changes: 1 addition & 1 deletion hypha/built-in/echo-service.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<title>ImJoy Plugin Template</title>
<meta name="description" content="Template for ImJoy plugin">
<meta name="author" content="ImJoy-Team">
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js"></script>
</head>

<body>
Expand Down
2 changes: 1 addition & 1 deletion hypha/built-in/imjoy-plugin-parser.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<title>ImJoy Plugin Template</title>
<meta name="description" content="Template for ImJoy plugin">
<meta name="author" content="ImJoy-Team">
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js"></script>
<scrpt src="https://cdn.jsdelivr.net/npm/imjoy-core@0.13.82/src/pluginParser.js"></scrpt>
</head>

Expand Down
2 changes: 1 addition & 1 deletion hypha/built-in/playground.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<meta name="description" content="A playground for developing ImJoy plugins">
<meta name="author" content="ImJoy-Team">
<link rel="stylesheet" data-name="vs/editor/editor.main" href="https://cdnjs.cloudflare.com/ajax/libs/monaco-editor/0.20.0/min/vs/editor/editor.main.min.css">
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js"></script>
<scrpt src="https://cdn.jsdelivr.net/npm/imjoy-core@0.13.82/src/pluginParser.js"></scrpt>
<script>
window.default_plugin = `
Expand Down
2 changes: 1 addition & 1 deletion hypha/built-in/test.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<title>ImJoy Plugin Template</title>
<meta name="description" content="Template for ImJoy plugin">
<meta name="author" content="ImJoy-Team">
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js"></script>
</head>

<body>
Expand Down
2 changes: 1 addition & 1 deletion hypha/core/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def generate_presigned_token(


def generate_reconnection_token(
user_info: UserInfo, client_id: str, workspace: str, expires_in: int = 10800
user_info: UserInfo, client_id: str, workspace: str, expires_in: int = 60
):
"""Generate a token for reconnection."""
current_time = time.time()
Expand Down
3 changes: 2 additions & 1 deletion hypha/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ def __init__(
public_base_url=None,
local_base_url=None,
redis_uri=None,
disconnect_delay=30,
):
"""Initialize the redis store."""
self._all_users: Dict[str, UserInfo] = {} # uid:user_info
self._all_workspaces: Dict[str, WorkspaceInfo] = {} # wid:workspace_info
self._workspace_loader = None
self._app = app
self.disconnect_delay = 1
self.disconnect_delay = disconnect_delay
self._codecs = {}
self._disconnected_plugins = []
self.public_base_url = public_base_url
Expand Down
8 changes: 7 additions & 1 deletion hypha/core/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async def get_connection_info(self, context=None):
client_id,
user_info.id,
)
expires_in = 60 * 60 * 3 # 3 hours
expires_in = 60 # 1 minute
token = generate_reconnection_token(
user_info, client_id, ws, expires_in=expires_in
)
Expand Down Expand Up @@ -565,6 +565,8 @@ async def delete_client(self, client_id: str, workspace: str = None):
len(remain_clients),
)
self._event_bus.emit("client_deleted", client_info.dict())
# Commented below because we want to allow reconnection
# self.delete_if_empty()

def _create_rpc(
self, client_id: str, default_context=None, user_info: UserInfo = None
Expand Down Expand Up @@ -885,6 +887,9 @@ async def get_workspace(self, workspace: str = None, context=None):
wm = await rpc.get_remote_service(
workspace + "/workspace-manager:default", timeout=10
)
wm.rpc = rpc
wm.disconnect = rpc.disconnect
wm.register_codec = rpc.register_codec
return wm
except asyncio.TimeoutError:
logger.info(
Expand Down Expand Up @@ -951,6 +956,7 @@ async def delete(self, force: bool = False):
await self.remove_clients(self._workspace)
await self._redis.hdel("workspaces", self._workspace)
self._event_bus.emit("workspace_removed", winfo.dict())
logger.info("Workspace %s deleted.", self._workspace)
else:
client_keys = await self._redis.hkeys(f"{self._workspace}:clients")
if b"workspace-manager" in client_keys:
Expand Down
1 change: 1 addition & 0 deletions hypha/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def create_application(args):
public_base_url=public_base_url,
local_base_url=local_base_url,
redis_uri=args.redis_uri,
disconnect_delay=float(env.get("DISCONNECT_DELAY", "30")),
)

start_builtin_services(application, store, args)
Expand Down
2 changes: 1 addition & 1 deletion hypha/templates/web-python-plugin.html
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
async def run():
try:
await micropip.install(["imjoy-rpc==0.5.31", {% for req in requirements %}"{{req}}", {% endfor %}])
await micropip.install(["imjoy-rpc==0.5.32", {% for req in requirements %}"{{req}}", {% endfor %}])
js.__resolve()
except Exception as e:
js.__reject(traceback.format_exc())
Expand Down
2 changes: 1 addition & 1 deletion hypha/templates/web-worker-plugin.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
self.onmessage = function(e) {
const config = e.data

importScripts("https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js")
importScripts("https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js")

hyphaWebsocketClient.connectToServer(config).then(async (api)=>{
await hyphaWebsocketClient.loadRequirements([{% for req in requirements %}"{{req}}", {% endfor %}])
Expand Down
2 changes: 1 addition & 1 deletion hypha/templates/window-plugin.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<title>ImJoy Plugin (window)</title>
<meta name="description" content="Template for ImJoy plugin">
<meta name="author" content="ImJoy-Team">
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.31/dist/hypha-rpc-websocket.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/imjoy-rpc@0.5.32/dist/hypha-rpc-websocket.min.js"></script>
</head>

<body>
Expand Down
3 changes: 3 additions & 0 deletions hypha/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ async def ready_function(p):

async def _example_hypha_startup(server):
"""An example hypha startup module."""
assert server.register_codec
assert server.rpc
assert server.disconnect
await server.register_service(
{
"id": "example-startup-service",
Expand Down
65 changes: 42 additions & 23 deletions hypha/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
logger = logging.getLogger("websocket-server")
logger.setLevel(logging.INFO)

DISCONNECT_DELAY = 180


class WebsocketServer:
"""Represent an Websocket server."""
Expand Down Expand Up @@ -53,12 +51,20 @@ async def disconnect(code):
return

parent_client = None
workspace_manager = None
if reconnection_token:
logger.info(
f"Reconnecting client via token: {reconnection_token[:5]}..."
)
user_info, ws, cid = parse_reconnection_token(reconnection_token)
if await store.get_workspace(ws) is None:
if client_id not in disconnected_clients:
logger.warning(
"Client %s was not in the disconnected client list", client_id
)
await disconnect(code=status.WS_1003_UNSUPPORTED_DATA)
return
_workspace = await store.get_workspace(ws)
if _workspace is None:
logger.error(
"Failed to recover the connection (client: %s),"
" workspace has been removed: %s",
Expand All @@ -67,15 +73,17 @@ async def disconnect(code):
)
await disconnect(code=status.WS_1003_UNSUPPORTED_DATA)
return

if client_id != cid:
logger.error(
"Client ID mismatch in the reconnection token %s", client_id
)
await disconnect(code=status.WS_1003_UNSUPPORTED_DATA)
return
logger.info("Client successfully reconnected: %s", cid)
disconnected_clients[client_id].cancel()
del disconnected_clients[client_id]
if client_id in disconnected_clients:
disconnected_clients[client_id].cancel()
del disconnected_clients[client_id]
else:
if token:
try:
Expand Down Expand Up @@ -129,9 +137,10 @@ async def disconnect(code):
await disconnect(code=status.WS_1003_UNSUPPORTED_DATA)
return
try:
workspace_manager = await store.get_workspace_manager(
workspace, setup=True
)
if not workspace_manager:
workspace_manager = await store.get_workspace_manager(
workspace, setup=True
)
except Exception as exp:
logger.error(
"Failed to get workspace manager %s, error: %s", workspace, exp
Expand All @@ -150,7 +159,9 @@ async def disconnect(code):
await disconnect(code=status.WS_1003_UNSUPPORTED_DATA)
return

if await workspace_manager.check_client_exists(client_id):
if not reconnection_token and await workspace_manager.check_client_exists(
client_id
):
logger.error(
"Another client with the same id %s"
" already connected to workspace: %s",
Expand All @@ -171,29 +182,32 @@ async def disconnect(code):
)
conn.on_message(websocket.send_bytes)

await workspace_manager.register_client(
ClientInfo(
id=client_id,
parent=parent_client,
workspace=workspace_manager._workspace,
user_info=user_info,
if not reconnection_token:
await workspace_manager.register_client(
ClientInfo(
id=client_id,
parent=parent_client,
workspace=workspace_manager._workspace,
user_info=user_info,
)
)
)

try:
while True:
data = await websocket.receive_bytes()
await conn.emit_message(data)
except WebSocketDisconnect as exp:
logger.info("Client disconnected: %s", client_id)
try:
await workspace_manager.delete_client(client_id)
except KeyError:
logger.info("Client already deleted: %s", client_id)

if exp.code in [
status.WS_1000_NORMAL_CLOSURE,
status.WS_1001_GOING_AWAY,
]:
logger.info("Client disconnected normally: %s", client_id)
try:
await workspace_manager.delete_client(client_id)
except KeyError:
logger.info("Client already deleted: %s", client_id)
try:
# Clean up if the client is disconnected normally
await workspace_manager.delete()
Expand All @@ -205,13 +219,18 @@ async def disconnect(code):
" unexpectedly: %s (will be removed in %s seconds)",
client_id,
exp,
DISCONNECT_DELAY,
store.disconnect_delay,
)

async def delayed_remove(client_id, workspace_manager):
try:
await asyncio.sleep(DISCONNECT_DELAY)
del disconnected_clients[client_id]
await asyncio.sleep(store.disconnect_delay)
try:
await workspace_manager.delete_client(client_id)
except KeyError:
logger.info("Client already deleted: %s", client_id)
if client_id in disconnected_clients:
del disconnected_clients[client_id]
await workspace_manager.delete()
except asyncio.CancelledError:
pass
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ aioboto3==11.0.1
aiofiles==23.1.0
base58==2.1.1
fastapi==0.98.0
imjoy-rpc==0.5.31
imjoy-rpc==0.5.32
jinja2==3.1.2
lxml==4.9.2
msgpack==1.0.5
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
REQUIREMENTS = [
"aiofiles",
"fastapi>=0.70.0",
"imjoy-rpc>=0.5.31",
"imjoy-rpc>=0.5.32",
"msgpack>=1.0.2",
"numpy",
"pydantic[email]>=1.8.2,<2.0.0",
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

JWT_SECRET = str(uuid.uuid4())
os.environ["JWT_SECRET"] = JWT_SECRET
os.environ["DISCONNECT_DELAY"] = "0.1"
test_env = os.environ.copy()


Expand Down
1 change: 1 addition & 0 deletions tests/test_startup_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def test_launch_external_services(fastapi_server):
assert external_service.id == "external-test-service"
assert await external_service.test(1) == 100
await proc.kill()
await asyncio.sleep(0.1)
with pytest.raises(
Exception, match=r".*IndexError: Service not found: external-test-service.*"
):
Expand Down

0 comments on commit c0ab9cb

Please sign in to comment.