diff --git a/src/a2a/client/transports/jsonrpc.py b/src/a2a/client/transports/jsonrpc.py index a565e640..a58a7cab 100644 --- a/src/a2a/client/transports/jsonrpc.py +++ b/src/a2a/client/transports/jsonrpc.py @@ -176,6 +176,8 @@ async def send_message_streaming( try: event_source.response.raise_for_status() async for sse in event_source.aiter_sse(): + if not sse.data: + continue response = SendStreamingMessageResponse.model_validate( json.loads(sse.data) ) diff --git a/src/a2a/client/transports/rest.py b/src/a2a/client/transports/rest.py index afc9dd08..96df1e02 100644 --- a/src/a2a/client/transports/rest.py +++ b/src/a2a/client/transports/rest.py @@ -154,6 +154,8 @@ async def send_message_streaming( try: event_source.response.raise_for_status() async for sse in event_source.aiter_sse(): + if not sse.data: + continue event = a2a_pb2.StreamResponse() Parse(sse.data, event) yield proto_utils.FromProto.stream_response(event) diff --git a/tests/client/transports/test_jsonrpc_client.py b/tests/client/transports/test_jsonrpc_client.py index edbcd6c7..0f6bba5b 100644 --- a/tests/client/transports/test_jsonrpc_client.py +++ b/tests/client/transports/test_jsonrpc_client.py @@ -6,6 +6,7 @@ import httpx import pytest +import respx from httpx_sse import EventSource, SSEError, ServerSentEvent @@ -466,6 +467,63 @@ async def test_send_message_streaming_success( == mock_stream_response_2.result.model_dump() ) + # Repro of https://github.com/a2aproject/a2a-python/issues/540 + @pytest.mark.asyncio + @respx.mock + async def test_send_message_streaming_comment_success( + self, + mock_agent_card: MagicMock, + ): + async with httpx.AsyncClient() as client: + transport = JsonRpcTransport( + httpx_client=client, agent_card=mock_agent_card + ) + params = MessageSendParams( + message=create_text_message_object(content='Hello stream') + ) + mock_stream_response_1 = SendMessageSuccessResponse( + id='stream_id_123', + jsonrpc='2.0', + result=create_text_message_object( + content='First part', role=Role.agent + ), + ) + mock_stream_response_2 = SendMessageSuccessResponse( + id='stream_id_123', + jsonrpc='2.0', + result=create_text_message_object( + content='Second part', role=Role.agent + ), + ) + + sse_content = ( + 'id: stream_id_1\n' + f'data: {mock_stream_response_1.model_dump_json()}\n\n' + ': keep-alive\n\n' + 'id: stream_id_2\n' + f'data: {mock_stream_response_2.model_dump_json()}\n\n' + ': keep-alive\n\n' + ) + + respx.post(mock_agent_card.url).mock( + return_value=httpx.Response( + 200, + headers={'Content-Type': 'text/event-stream'}, + content=sse_content, + ) + ) + + results = [ + item + async for item in transport.send_message_streaming( + request=params + ) + ] + + assert len(results) == 2 + assert results[0] == mock_stream_response_1.result + assert results[1] == mock_stream_response_2.result + @pytest.mark.asyncio async def test_send_request_http_status_error( self, mock_httpx_client: AsyncMock, mock_agent_card: MagicMock diff --git a/tests/client/transports/test_rest_client.py b/tests/client/transports/test_rest_client.py index cd68b443..c889ebaf 100644 --- a/tests/client/transports/test_rest_client.py +++ b/tests/client/transports/test_rest_client.py @@ -3,18 +3,23 @@ import httpx import pytest +import respx +from google.protobuf.json_format import MessageToJson from httpx_sse import EventSource, ServerSentEvent from a2a.client import create_text_message_object from a2a.client.errors import A2AClientHTTPError from a2a.client.transports.rest import RestTransport from a2a.extensions.common import HTTP_EXTENSION_HEADER +from a2a.grpc import a2a_pb2 from a2a.types import ( AgentCapabilities, AgentCard, MessageSendParams, + Role, ) +from a2a.utils import proto_utils @pytest.fixture @@ -88,6 +93,64 @@ async def test_send_message_with_default_extensions( }, ) + # Repro of https://github.com/a2aproject/a2a-python/issues/540 + @pytest.mark.asyncio + @respx.mock + async def test_send_message_streaming_comment_success( + self, + mock_agent_card: MagicMock, + ): + """Test that SSE comments are ignored.""" + async with httpx.AsyncClient() as client: + transport = RestTransport( + httpx_client=client, agent_card=mock_agent_card + ) + params = MessageSendParams( + message=create_text_message_object(content='Hello stream') + ) + + mock_stream_response_1 = a2a_pb2.StreamResponse( + msg=proto_utils.ToProto.message( + create_text_message_object( + content='First part', role=Role.agent + ) + ) + ) + mock_stream_response_2 = a2a_pb2.StreamResponse( + msg=proto_utils.ToProto.message( + create_text_message_object( + content='Second part', role=Role.agent + ) + ) + ) + + sse_content = ( + 'id: stream_id_1\n' + f'data: {MessageToJson(mock_stream_response_1, indent=None)}\n\n' + ': keep-alive\n\n' + 'id: stream_id_2\n' + f'data: {MessageToJson(mock_stream_response_2, indent=None)}\n\n' + ': keep-alive\n\n' + ) + + respx.post( + f'{mock_agent_card.url.rstrip("/")}/v1/message:stream' + ).mock( + return_value=httpx.Response( + 200, + headers={'Content-Type': 'text/event-stream'}, + content=sse_content, + ) + ) + + results = [] + async for item in transport.send_message_streaming(request=params): + results.append(item) + + assert len(results) == 2 + assert results[0].parts[0].root.text == 'First part' + assert results[1].parts[0].root.text == 'Second part' + @pytest.mark.asyncio @patch('a2a.client.transports.rest.aconnect_sse') async def test_send_message_streaming_with_new_extensions(