-
Notifications
You must be signed in to change notification settings - Fork 331
Description
What happened?
DefaultRequestHandler hangs on_message_send when errors in the consumer occur. on_message_stream does not hang because it puts the _cleanup_producer into a background task but that task will be in a bad state since the producer is still awaiting on updater.x.
Minimal reproducible example with a failing TaskStore is demonstrated below.
import asyncio
import uuid
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.context import ServerCallContext
from a2a.server.events import EventQueue
from a2a.server.request_handlers.default_request_handler import (
DefaultRequestHandler,
)
from a2a.server.tasks import TaskStore
from a2a.server.tasks.task_updater import TaskUpdater
from a2a.types import (
Message,
MessageSendParams,
Part,
Role,
Task,
TaskState,
TextPart,
)
class FailingTaskStore(TaskStore):
"""Task store that fails on save to simulate a poisoned configuration."""
async def get(
self, task_id: str, context: ServerCallContext | None = None
) -> Task | None:
"""Return None for simplicity."""
return None
async def save(
self, task: Task, context: ServerCallContext | None = None
) -> None:
"""Always fail to simulate task store error."""
raise RuntimeError(
'This is an Error!'
)
async def delete(
self, task_id: str, context: ServerCallContext | None = None
) -> None:
"""No-op for simplicity."""
class HelloWorldAgent:
"""Hello World Agent."""
async def invoke(self) -> str:
return 'Hello World'
class HelloWorldAgentExecutor(AgentExecutor):
"""Test Agent Implementation."""
def __init__(self):
self.agent = HelloWorldAgent()
async def execute(
self,
context: RequestContext,
event_queue: EventQueue,
) -> None:
updater = TaskUpdater(
event_queue,
task_id=context.task_id or str(uuid.uuid4()),
context_id=context.context_id or str(uuid.uuid4()),
)
# raise ValueError("Simulated error during task execution")
if not context.task_id:
await updater.submit()
await updater.update_status(TaskState.working)
result = await self.agent.invoke()
await updater.add_artifact([Part(root=TextPart(text=result))])
await updater.complete()
async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise NotImplementedError('cancel not supported')
async def test_hanging_on_task_save_error() -> None:
"""Test that demonstrates hanging when task save fails.
"""
agent = HelloWorldAgentExecutor()
task_store = FailingTaskStore()
handler = DefaultRequestHandler(
agent_executor=agent, task_store=task_store
)
params = MessageSendParams(
message=Message(
role=Role.user,
parts=[TextPart(text='Test message')],
message_id=str(uuid.uuid4()),
)
)
await asyncio.wait_for(
handler.on_message_send(params), timeout=10.0
)
if __name__ == '__main__':
asyncio.run(test_hanging_on_task_save_error())Log Output
Agent execution failed
Traceback (most recent call last):
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 325, in on_message_send
) = await result_aggregator.consume_and_break_on_interrupt(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 132, in consume_and_break_on_interrupt
await self.task_manager.process(event)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 206, in process
await self.save_task_event(event)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 137, in save_task_event
task: Task = await self.ensure_task(event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 187, in ensure_task
await self._save_task(task)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 240, in _save_task
await self.task_store.save(task, self._call_context)
File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 36, in save
raise RuntimeError(
'This is an Error!'
)
RuntimeError: This is an Error!
Traceback (most recent call last):
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 325, in on_message_send
) = await result_aggregator.consume_and_break_on_interrupt(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 132, in consume_and_break_on_interrupt
await self.task_manager.process(event)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 206, in process
await self.save_task_event(event)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 137, in save_task_event
task: Task = await self.ensure_task(event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 187, in ensure_task
await self._save_task(task)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/tasks/task_manager.py", line 240, in _save_task
await self.task_store.save(task, self._call_context)
File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 36, in save
raise RuntimeError(
'This is an Error!'
)
RuntimeError: This is an Error!
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/xxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
return await fut
^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 342, in on_message_send
await self._cleanup_producer(producer_task, task_id)
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 438, in _cleanup_producer
await producer_task
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 197, in _run_event_stream
await queue.close()
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 196, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/path/to/root/.venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 175, in close
await asyncio.gather(
self.queue.join(), *(child.close() for child in self._children)
)
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/queues.py", line 248, in join
await self._finished.wait()
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/locks.py", line 213, in wait
await fut
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 104, in <module>
asyncio.run(test_hanging_on_task_save_error())
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/runners.py", line 195, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/base_events.py", line 725, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/path/to/root/a2a-python/tests/server/test_task_save_error_hanging.py", line 99, in test_hanging_on_task_save_error
await asyncio.wait_for(
handler.on_message_send(params), timeout=10.0
)
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/tasks.py", line 506, in wait_for
async with timeouts.timeout(timeout):
~~~~~~~~~~~~~~~~^^^^^^^^^
File "/xxxxxxx/.pyenv/versions/3.13.9/lib/python3.13/asyncio/timeouts.py", line 116, in __aexit__
raise TimeoutError from exc_val
TimeoutErrorCode of Conduct
- I agree to follow this project's Code of Conduct