Skip to content

Commit

Permalink
0.2.10: Catch the handshake exception outside of asyncio.wait_for (#36)
Browse files Browse the repository at this point in the history
Why
===
Sentry doesn't handle exception catching outside of asyncio.wait_for,
this leads to a large amount of sentry errors.

What changed
============
We create a wrapper to catch the exceptions before asyncio.wait_for
  • Loading branch information
zhenthebuilder authored Jul 1, 2024
1 parent e079735 commit 1272dab
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name="replit-river"
version="0.2.9"
version="0.2.10"
description="Replit river toolkit for Python"
authors = ["Replit <eng@replit.com>"]
license = "LICENSE"
Expand Down
44 changes: 33 additions & 11 deletions replit_river/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
from typing import Mapping, Tuple
from typing import Mapping, Optional, Tuple

import websockets
from websockets.exceptions import ConnectionClosed
Expand All @@ -9,6 +9,7 @@
from replit_river.messages import WebsocketClosedException
from replit_river.seq_manager import SessionStateMismatchException
from replit_river.server_transport import ServerTransport
from replit_river.session import Session
from replit_river.transport import TransportOptions

from .rpc import (
Expand Down Expand Up @@ -37,30 +38,51 @@ def add_rpc_handlers(
) -> None:
self._transport._handlers.update(rpc_handlers)

async def serve(self, websocket: WebSocketServerProtocol) -> None:
logging.debug(
"River server started establishing session with ws: %s", websocket.id
)
async def _handshake_to_get_session(
self, websocket: WebSocketServerProtocol
) -> Optional[Session]:
"""This is a wrapper to make sentry happy, sentry doesn't recognize the
exception handling outside of a task or asyncio.wait_for. So we need to catch
the errors specifically here.
https://docs.sentry.io/platforms/python/integrations/asyncio/#behavior
"""
try:
session = await asyncio.wait_for(
self._transport.handshake_to_get_session(websocket),
self._transport_options.session_disconnect_grace_ms / 1000,
)
return await self._transport.handshake_to_get_session(websocket)
except (websockets.exceptions.ConnectionClosed, WebsocketClosedException):
# it is fine if the ws is closed during handshake, we just close the ws
await websocket.close()
return
return None
except SessionStateMismatchException as e:
logging.info(
f"Session state mismatch, closing websocket: {e}", exc_info=True
)
await websocket.close()
return
return None
except Exception as e:
logging.error(
f"Error establishing handshake, closing websocket: {e}", exc_info=True
)
await websocket.close()
return None

async def serve(self, websocket: WebSocketServerProtocol) -> None:
logging.debug(
"River server started establishing session with ws: %s", websocket.id
)
try:
session = await asyncio.wait_for(
self._handshake_to_get_session(websocket),
self._transport_options.session_disconnect_grace_ms / 1000,
)
if not session:
return
except asyncio.TimeoutError:
logging.error("Handshake timeout, closing websocket")
await websocket.close()
return
except asyncio.CancelledError:
logging.error("Handshake cancelled, closing websocket")
await websocket.close()
return
logging.debug("River server session established, start serving messages")

Expand Down

0 comments on commit 1272dab

Please sign in to comment.