Skip to content

[Bug]: message/stream does not stream properly #111

@matino

Description

@matino

What happened?

I was trying to get streaming through SSE working, and it didn't work properly.

I wrote a simple ADK agent executor:

class ADKAgentExecutor(AgentExecutor):
    def __init__(self, runner: Runner) -> None:
        self.runner = runner
        self.run_config = RunConfig(streaming_mode=StreamingMode.SSE)
        self.user_id = "self"

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        task_updater.start_work()

        session = await self._get_session(context.context_id)
        message = types.UserContent(
            parts=a2a_parts_to_genai(context.message.parts),
        )

        async for event in self.runner.run_async(
            session_id=session.id,
            user_id=self.user_id,
            new_message=message,
            run_config=self.run_config,
        ):
            parts = genai_parts_to_a2a(event.content.parts)
            if event.is_final_response():
                task_updater.add_artifact(parts)
                task_updater.complete()
                break
            if not event.get_function_calls():
                task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(parts),
                )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise ServerError(error=UnsupportedOperationError())

    async def _get_session(self, session_id: str) -> Session:
        return await self.runner.session_service.get_session(
            app_name=self.runner.app_name, user_id=self.user_id, session_id=session_id
        ) or await self.runner.session_service.create_session(
            app_name=self.runner.app_name, user_id=self.user_id, session_id=session_id
        )

When testing the app with curl:

curl -N --http2 -H "Accept:text/event-stream" -X POST http://0.0.0.0:8000/ -d '{"method": "message/stream", "params": {"message": {"role": "user", "parts": [{"kind": "text", "text": "Some random quesiton"}], "messageId": "1"}}}'

I can see the very first chunk of response appear immediately:

data: {"jsonrpc":"2.0","result":{"contextId":"6661ac28-40fc-4102-ae12-fb09b0c2dc24","final":false,"kind":"status-update","status":{"state":"working"},"taskId":"88edd064-7d95-40c3-88f4-e41cb4009b19"}}

Then, the response is buffered and all the chunks are returned at the same time after a few seconds.

The solution, is to sleep small amount of time in the async for event in self.runner.run_async loop:

        async for event in self.runner.run_async(
            session_id=session.id,
            user_id=self.user_id,
            new_message=message,
            run_config=self.run_config,
        ):
            parts = genai_parts_to_a2a(event.content.parts)
            if event.is_final_response():
                task_updater.add_artifact(parts)
                task_updater.complete()
                break
            if not event.get_function_calls():
                task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(parts),
                )

                # Hack for SSE
                await asyncio.sleep(0)

Only then, the response chunks started to show up one after another.
I don't think that's the right solution though :)

Related discussion:
#79

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions