Skip to content

Commit

Permalink
Add auto-reconnect for realtime session
Browse files Browse the repository at this point in the history
Add argument to set auto-reconnect option for RyverWS in Ryver and add
code for the actual reconnecting.
  • Loading branch information
tylertian123 committed Oct 1, 2020
1 parent 2006da9 commit 823a35b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 25 deletions.
5 changes: 3 additions & 2 deletions pyryver/ryver.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ async def create_forum(self, name: str, nickname: str = None, about: str = None,
return await self._create_groupchat(TYPE_FORUM, name, nickname, about, description)

@doc.acontexmanager
def get_live_session(self) -> ryver_ws.RyverWS:
def get_live_session(self, auto_reconnect: bool = False) -> ryver_ws.RyverWS:
"""
Get a live session.
Expand All @@ -549,6 +549,7 @@ def get_live_session(self) -> ryver_ws.RyverWS:
.. warning::
Live sessions **do not work** when using a custom integration token.
:param auto_reconnect: Whether to automatically reconnect on connection loss.
:return: The live websockets session.
"""
return ryver_ws.RyverWS(self)
return ryver_ws.RyverWS(self, auto_reconnect)
74 changes: 51 additions & 23 deletions pyryver/ryver_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ class RyverWS():
.. warning::
This **does not work** when using a custom integration token to sign in.
:param ryver: The :py:class:`Ryver` object this live session came from.
:param auto_reconnect: Whether to automatically reconnect on a connection loss.
"""

_VALID_ID_CHARS = string.ascii_letters + string.digits
Expand Down Expand Up @@ -257,26 +260,30 @@ async def _rx_task(self):
"""
try:
while True:
print("receiving data") # TODO: Remove
raw_msg = await self._ws.receive()
if raw_msg.type != WSMsgType.TEXT:
if raw_msg.type is not None and raw_msg.type < 0x100:
sys.stderr.write(f"Warning: Wrong message type received, expected TEXT (0x1), got {raw_msg.type}. Message ignored.\n")
else:
sys.stderr.write(f"Error: Received unexpected aiohttp specific type for WS message: {raw_msg.type}. Killing connection.\n")
# Connection lost!
for key, future in self._msg_ack_table.items():
future.set_exception(ConnectionLossError(f"Connection lost while performing operation {key[1]}"))
self._msg_ack_table.clear()
if self._on_connection_loss:
asyncio.ensure_future(self._on_connection_loss())

sys.stderr.write(f"Error: Received unexpected aiohttp specific type for WS message: {raw_msg.type}.\n")
if self.is_connected():
# Connection lost!
for key, future in self._msg_ack_table.items():
future.set_exception(ConnectionLossError(f"Connection lost while performing operation {key[1]}"))
self._msg_ack_table.clear()
if self._on_connection_loss:
asyncio.ensure_future(self._on_connection_loss())
if self._auto_reconnect:
await self.close(cancel_rx=False)
while not self.is_connected():
await asyncio.sleep(5.0)
await self.try_reconnect()
if not self._auto_reconnect:
# Has to be done inside a task as terminate() potentially calls close()
# which in turn waits for _rx_task to finish, creating a deadlock
asyncio.ensure_future(self.terminate())
# Block to force a context switch and make sure connection is closed
await asyncio.sleep(0.1)
# Block to force a context switch
await asyncio.sleep(0.1)
else:
try:
msg = json.loads(raw_msg.data)
Expand Down Expand Up @@ -320,12 +327,19 @@ async def _ping_task(self):
"type": "ping"
}, timeout=5.0)
except asyncio.TimeoutError:
# Connection lost!
for key, future in self._msg_ack_table.items():
future.set_exception(ConnectionLossError(f"Connection lost while performing operation {key[1]}"))
self._msg_ack_table.clear()
if self._on_connection_loss:
asyncio.ensure_future(self._on_connection_loss())
if self.is_connected():
# Connection lost!
for key, future in self._msg_ack_table.items():
future.set_exception(ConnectionLossError(f"Connection lost while performing operation {key[1]}"))
self._msg_ack_table.clear()
if self._on_connection_loss is not None:
asyncio.ensure_future(self._on_connection_loss())
# Auto reconnect
if self._auto_reconnect:
await self.close(cancel_ping=False)
while not self.is_connected():
await asyncio.sleep(5.0)
await self.try_reconnect()
await asyncio.sleep(10)
except asyncio.CancelledError:
return
Expand Down Expand Up @@ -614,23 +628,37 @@ async def try_reconnect(self, timeout: float = None) -> bool:
except (aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError):
return False

async def close(self):
async def close(self, cancel_rx: bool = True, cancel_ping: bool = True):
"""
Close the session.
Any future operation after closing will result in a :py:exc:`ClosedError` being raised.
Any future operation after closing will result in a :py:exc:`ClosedError` being
raised, unless the session is reconnected using :py:meth:`RyverWS.start()` or
:py:meth:`RyverWS.try_reconnect()`.
TODO: Clarify terminate()
:param cancel_rx: Whether to cancel the rx task. For internal use only.
:param cancel_ping: Whether to cancel the ping task. For internal use only.
"""
self._closed = True
# Cancel all tasks
self._ping_task_handle.cancel()
self._rx_task_handle.cancel()
# Cancel tasks
if cancel_rx:
self._rx_task_handle.cancel()
if cancel_ping:
self._ping_task_handle.cancel()
await self._ws.close()
# Terminate any messages waiting for acks with an exception
for future in self._msg_ack_table.values():
future.set_exception(ClosedError("Connection closed"))
self._msg_ack_table.clear()
# Wait until tasks terminate
await asyncio.gather(self._ping_task_handle, self._rx_task_handle)
if cancel_rx and cancel_ping:
await asyncio.gather(self._ping_task_handle, self._rx_task_handle)
elif cancel_rx:
await self._rx_task_handle
elif cancel_ping:
await self._ping_task_handle

async def terminate(self):
"""
Expand Down

0 comments on commit 823a35b

Please sign in to comment.