From f72c65e441d3a0b409aef8f1523f7939cdb17c12 Mon Sep 17 00:00:00 2001 From: igalshilman Date: Mon, 5 May 2025 08:24:16 +0000 Subject: [PATCH] Add an explict http input closed event --- python/restate/server_context.py | 9 +-------- python/restate/server_types.py | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/python/restate/server_context.py b/python/restate/server_context.py index 542e91f..daadb49 100644 --- a/python/restate/server_context.py +++ b/python/restate/server_context.py @@ -318,14 +318,7 @@ async def leave(self): # {'type': 'http.request', 'body': b'', 'more_body': True} # {'type': 'http.request', 'body': b'', 'more_body': False} # {'type': 'http.disconnect'} - while True: - event = await self.receive() - if event is None: - break - if event.get('type') == 'http.disconnect': - break - if event.get('type') == 'http.request' and event.get('more_body', False) is False: - break + await self.receive.block_until_http_input_closed() # finally, we close our side # it is important to do it, after the other side has closed his side, # because some asgi servers (like hypercorn) will remove the stream diff --git a/python/restate/server_types.py b/python/restate/server_types.py index f316c44..b375b75 100644 --- a/python/restate/server_types.py +++ b/python/restate/server_types.py @@ -98,14 +98,19 @@ class ReceiveChannel: def __init__(self, receive: Receive) -> None: self._queue = asyncio.Queue[Union[ASGIReceiveEvent, RestateEvent]]() + self._http_input_closed = asyncio.Event() + self._disconnected = asyncio.Event() async def loop(): """Receive loop.""" - while True: + while not self._disconnected.is_set(): event = await receive() + if event.get('type') == 'http.request' and not event.get('more_body', False): + self._http_input_closed.set() + elif event.get('type') == 'http.disconnect': + self._http_input_closed.set() + self._disconnected.set() await self._queue.put(event) - if event.get('type') == 'http.disconnect': - break self._task = asyncio.create_task(loop()) @@ -115,12 +120,18 @@ async def __call__(self) -> ASGIReceiveEvent | RestateEvent: self._queue.task_done() return what + async def block_until_http_input_closed(self) -> None: + """Wait until the HTTP input is closed""" + await self._http_input_closed.wait() + async def enqueue_restate_event(self, what: RestateEvent): """Add a message.""" await self._queue.put(what) async def close(self): """Close the channel.""" + self._http_input_closed.set() + self._disconnected.set() if self._task.done(): return self._task.cancel()