Skip to content

Commit

Permalink
0.1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenthebuilder committed Apr 24, 2024
1 parent 6a387c5 commit 1e37254
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name="replit-river"
version="0.1.7-beta.9"
version="0.1.8"
description="Replit river toolkit for Python"
authors = ["Replit <eng@replit.com>"]
license = "LICENSE"
Expand Down
1 change: 0 additions & 1 deletion replit_river/client_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
from typing import Optional, Tuple

import nanoid
import websockets
from pydantic import ValidationError
from websockets import (
Expand Down
3 changes: 2 additions & 1 deletion replit_river/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ async def send_message(
try:
await self._buffer.put(msg)
except Exception:
# We should close the session when there are too many messages in buffer
# We should close the session when there are too many messages in
# buffer
await self.close(True)
return
await self._send_transport_message(
Expand Down
59 changes: 37 additions & 22 deletions replit_river/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
from typing import Any, Optional, Set

from aiochannel import ChannelClosed
from replit_river.error_schema import ERROR_CODE_STREAM_CLOSED, RiverException


Expand All @@ -13,6 +12,7 @@ def __init__(self) -> None:
self.background_tasks: Set[asyncio.Task] = set()

async def cancel_all_tasks(self) -> None:
"""Asynchronously cancels all tasks managed by this instance."""
# Convert it to a list to avoid RuntimeError: Set changed size during iteration
for task in list(self.background_tasks):
await self.cancel_task(task, self.background_tasks)
Expand All @@ -22,37 +22,42 @@ async def cancel_task(
task_to_remove: asyncio.Task[Any],
background_tasks: Set[asyncio.Task],
) -> None:
"""Cancels a given task and ensures it is removed from the set of managed tasks.
Args:
task_to_remove: The asyncio.Task instance to cancel.
background_tasks: Set of all tasks being tracked.
"""
task_to_remove.cancel()
try:
await task_to_remove
if task_to_remove in background_tasks:
background_tasks.remove(task_to_remove)
exception = task_to_remove.exception()
except (asyncio.CancelledError, ChannelClosed):
# This is expected when websocket is closed
except asyncio.CancelledError:
# If we cancel the task manager we will get called here as well,
# if we want to handle the cancellation differently we can do it here.
logging.debug("Task was cancelled %r", task_to_remove)
return
except Exception:
logging.error("Error retrieving task exception", exc_info=True)
return
if exception:
if (
isinstance(exception, RiverException)
and exception.code == ERROR_CODE_STREAM_CLOSED
):
except RiverException as e:
if e.code == ERROR_CODE_STREAM_CLOSED:
# Task is cancelled
pass
else:
logging.error(
"Task resulted in an exception",
exc_info=exception,
)
logging.error("Exception on cancelling task: %r", e, exc_info=True)
except Exception as e:
logging.error("Exception on cancelling task: %r", e, exc_info=True)
finally:
# Remove the task from the set regardless of the outcome
background_tasks.discard(task_to_remove)

def _task_done_callback(
self,
task_to_remove: asyncio.Task[Any],
background_tasks: Set[asyncio.Task],
) -> None:
"""Callback to be executed when a task is done. It removes the task from the set
and logs any exceptions.
Args:
task_to_remove: The asyncio.Task that has completed.
background_tasks: Set of all tasks being tracked.
"""
if task_to_remove in background_tasks:
background_tasks.remove(task_to_remove)
try:
Expand All @@ -71,13 +76,23 @@ def _task_done_callback(
pass
else:
logging.error(
"Task resulted in an exception",
exc_info=exception,
"Exception on cancelling task: %r", exception, exc_info=True
)

async def create_task(
self, fn: Any, tg: Optional[asyncio.TaskGroup] = None
) -> asyncio.Task:
"""Creates a task from a callable and adds it to the background tasks set.
Args:
fn: A callable to be executed in the task.
tg: Optional asyncio.TaskGroup for managing the task lifecycle.
TODO: tg is hard to understand when passed all the way here, we should
refactor to make this easier to understand.
Returns:
The created asyncio.Task.
"""
if tg:
task = tg.create_task(fn)
else:
Expand Down
3 changes: 2 additions & 1 deletion replit_river/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
from typing import Dict, Optional, Tuple

import nanoid # type: ignore
from websockets import WebSocketCommonProtocol

from replit_river.messages import FailedSendingMessageException
from replit_river.rpc import (
GenericRpcHandler,
)
from replit_river.session import Session
from replit_river.transport_options import TransportOptions
from websockets import WebSocketCommonProtocol


class Transport:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest
import asyncio
from typing import Any

import pytest

from replit_river.rate_limiter import LeakyBucketRateLimit
from replit_river.transport_options import ConnectionRetryOptions
Expand Down

0 comments on commit 1e37254

Please sign in to comment.