diff --git a/replit_river/client_session.py b/replit_river/client_session.py index b668035..b76252d 100644 --- a/replit_river/client_session.py +++ b/replit_river/client_session.py @@ -84,7 +84,7 @@ async def send_upload( """ stream_id = nanoid.generate() - output: Channel[Any] = Channel(1024) + output: Channel[Any] = Channel(1) self._streams[stream_id] = output first_message = True try: @@ -98,7 +98,8 @@ async def send_upload( payload=init_serializer(init), ) first_message = False - + # If this request is not closed and the session is killed, we should + # throws exception here async for item in request: control_flags = 0 if first_message: diff --git a/replit_river/session.py b/replit_river/session.py index 92fc206..37578aa 100644 --- a/replit_river/session.py +++ b/replit_river/session.py @@ -514,6 +514,7 @@ async def close(self, is_unexpected_close: bool) -> None: await self._task_manager.cancel_all_tasks() # TODO: unexpected_close should close stream differently here to # throw exception correctly. + logging.error(f"##### closing all streams here : {self._streams.values()}") for stream in self._streams.values(): stream.close() async with self._stream_lock: