Skip to content

[Bug]: Error events (A2AError) are not treated as final events in EventConsumer #137

@kingstonwen

Description

@kingstonwen

What happened?

Hi team,
I’m working with the A2A Python SDK and have a question about the handling of error events in the event queue/consumer system.

Context:

  • When I enqueue an A2AError (or JSONRPCError) event in a non-streaming request, the consumer does not treat it as a final event.
  • As a result, after the agent finishes and the queue is closed, the consumer continues waiting for more events and eventually raises a "Queue is closed" error.
  • When I enqueue a Task or Message event, this does not happen.

Example

In my execute() function, I have tried the following code for both blocking and streaming cases.

...
error = A2AError(
    root=InternalError()
)
event_queue.enqueue_event(error)
...

client-side result

{'error': {'code': -32603, 'message': 'Queue is closed.'}, 'id': 'e6a3c967-d0fe-44b2-9f29-75e9b6fff6af', 'jsonrpc': '2.0'}

Question:

Is this the intended design? Should error events be considered terminal/final events in the event stream, or is there a recommended pattern for handling errors differently?
Is there a best practice for signaling the end of the event stream when an error occurs, or am I perhaps not using the event queue as intended?

Relevant log output

Queue is closed. Event will not be dequeued.
Unhandled exception: Queue is closed.
Traceback (most recent call last):
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/apps/starlette_app.py", line 186, in _handle_requests
    return await self._process_non_streaming_request(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request_id, a2a_request, call_context
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/apps/starlette_app.py", line 262, in _process_non_streaming_request
    handler_result = await self.handler.on_message_send(
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request_obj, context
        ^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 158, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/jsonrpc_handler.py", line 81, in on_message_send
    task_or_message = await self.request_handler.on_message_send(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        request.params, context
        ^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 158, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 223, in on_message_send
    ) = await result_aggregator.consume_and_break_on_interrupt(consumer)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 116, in consume_and_break_on_interrupt
    async for event in event_stream:
    ...<20 lines>...
            break
  File ".../a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 92, in consume_all
    event = await asyncio.wait_for(
            ^^^^^^^^^^^^^^^^^^^^^^^
        self.queue.dequeue_event(), timeout=self._timeout
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    )
    ^
  File ".../.local/share/uv/python/cpython-3.13.3-macos-aarch64-none/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 158, in async_wrapper
    result = await func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../agents/a2a-server-dify/.venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 92, in dequeue_event
    raise asyncio.QueueEmpty('Queue is closed.')
asyncio.queues.QueueEmpty: Queue is closed.
Request Error (ID: e6a3c967-d0fe-44b2-9f29-75e9b6fff6af): Code=-32603, Message='Queue is closed.'

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions