diff --git a/src/agents/run.py b/src/agents/run.py index 52d395a13..ed9e082ee 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -13,6 +13,7 @@ from openai.types.responses.response_prompt_param import ( ResponsePromptParam, ) +from openai.types.responses.response_reasoning_item import ResponseReasoningItem from typing_extensions import NotRequired, TypedDict, Unpack from ._run_impl import ( @@ -48,6 +49,7 @@ HandoffCallItem, ItemHelpers, ModelResponse, + ReasoningItem, RunItem, ToolCallItem, ToolCallItemTypes, @@ -1097,6 +1099,7 @@ async def _run_single_turn_streamed( server_conversation_tracker: _ServerConversationTracker | None = None, ) -> SingleStepResult: emitted_tool_call_ids: set[str] = set() + emitted_reasoning_item_ids: set[str] = set() if should_run_agent_start_hooks: await asyncio.gather( @@ -1178,6 +1181,9 @@ async def _run_single_turn_streamed( conversation_id=conversation_id, prompt=prompt_config, ): + # Emit the raw event ASAP + streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event)) + if isinstance(event, ResponseCompletedEvent): usage = ( Usage( @@ -1217,7 +1223,16 @@ async def _run_single_turn_streamed( RunItemStreamEvent(item=tool_item, name="tool_called") ) - streamed_result._event_queue.put_nowait(RawResponsesStreamEvent(data=event)) + elif isinstance(output_item, ResponseReasoningItem): + reasoning_id: str | None = getattr(output_item, "id", None) + + if reasoning_id and reasoning_id not in emitted_reasoning_item_ids: + emitted_reasoning_item_ids.add(reasoning_id) + + reasoning_item = ReasoningItem(raw_item=output_item, agent=agent) + streamed_result._event_queue.put_nowait( + RunItemStreamEvent(item=reasoning_item, name="reasoning_item_created") + ) # Call hook just after the model response is finalized. if final_response is not None: @@ -1271,6 +1286,18 @@ async def _run_single_turn_streamed( ) ] + if emitted_reasoning_item_ids: + # Filter out reasoning items that were already emitted during streaming + items_to_filter = [ + item + for item in items_to_filter + if not ( + isinstance(item, ReasoningItem) + and (reasoning_id := getattr(item.raw_item, "id", None)) + and reasoning_id in emitted_reasoning_item_ids + ) + ] + # Filter out HandoffCallItem to avoid duplicates (already sent earlier) items_to_filter = [ item for item in items_to_filter if not isinstance(item, HandoffCallItem) diff --git a/tests/fake_model.py b/tests/fake_model.py index b38b3790a..d86870920 100644 --- a/tests/fake_model.py +++ b/tests/fake_model.py @@ -3,7 +3,33 @@ from collections.abc import AsyncIterator from typing import Any -from openai.types.responses import Response, ResponseCompletedEvent, ResponseUsage +from openai.types.responses import ( + Response, + ResponseCompletedEvent, + ResponseContentPartAddedEvent, + ResponseContentPartDoneEvent, + ResponseCreatedEvent, + ResponseFunctionCallArgumentsDeltaEvent, + ResponseFunctionCallArgumentsDoneEvent, + ResponseFunctionToolCall, + ResponseInProgressEvent, + ResponseOutputItemAddedEvent, + ResponseOutputItemDoneEvent, + ResponseOutputMessage, + ResponseOutputText, + ResponseReasoningSummaryPartAddedEvent, + ResponseReasoningSummaryPartDoneEvent, + ResponseReasoningSummaryTextDeltaEvent, + ResponseReasoningSummaryTextDoneEvent, + ResponseTextDeltaEvent, + ResponseTextDoneEvent, + ResponseUsage, +) +from openai.types.responses.response_reasoning_item import ResponseReasoningItem +from openai.types.responses.response_reasoning_summary_part_added_event import ( + Part as AddedEventPart, +) +from openai.types.responses.response_reasoning_summary_part_done_event import Part as DoneEventPart from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails from agents.agent_output import AgentOutputSchemaBase @@ -143,10 +169,151 @@ async def stream_response( ) raise output + response = get_response_obj(output, usage=self.hardcoded_usage) + sequence_number = 0 + + yield ResponseCreatedEvent( + type="response.created", + response=response, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseInProgressEvent( + type="response.in_progress", + response=response, + sequence_number=sequence_number, + ) + sequence_number += 1 + + for output_index, output_item in enumerate(output): + yield ResponseOutputItemAddedEvent( + type="response.output_item.added", + item=output_item, + output_index=output_index, + sequence_number=sequence_number, + ) + sequence_number += 1 + + if isinstance(output_item, ResponseReasoningItem): + if output_item.summary: + for summary_index, summary in enumerate(output_item.summary): + yield ResponseReasoningSummaryPartAddedEvent( + type="response.reasoning_summary_part.added", + item_id=output_item.id, + output_index=output_index, + summary_index=summary_index, + part=AddedEventPart(text=summary.text, type=summary.type), + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseReasoningSummaryTextDeltaEvent( + type="response.reasoning_summary_text.delta", + item_id=output_item.id, + output_index=output_index, + summary_index=summary_index, + delta=summary.text, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseReasoningSummaryTextDoneEvent( + type="response.reasoning_summary_text.done", + item_id=output_item.id, + output_index=output_index, + summary_index=summary_index, + text=summary.text, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseReasoningSummaryPartDoneEvent( + type="response.reasoning_summary_part.done", + item_id=output_item.id, + output_index=output_index, + summary_index=summary_index, + part=DoneEventPart(text=summary.text, type=summary.type), + sequence_number=sequence_number, + ) + sequence_number += 1 + + elif isinstance(output_item, ResponseFunctionToolCall): + yield ResponseFunctionCallArgumentsDeltaEvent( + type="response.function_call_arguments.delta", + item_id=output_item.call_id, + output_index=output_index, + delta=output_item.arguments, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseFunctionCallArgumentsDoneEvent( + type="response.function_call_arguments.done", + item_id=output_item.call_id, + output_index=output_index, + arguments=output_item.arguments, + sequence_number=sequence_number, + ) + sequence_number += 1 + + elif isinstance(output_item, ResponseOutputMessage): + for content_index, content_part in enumerate(output_item.content): + if isinstance(content_part, ResponseOutputText): + yield ResponseContentPartAddedEvent( + type="response.content_part.added", + item_id=output_item.id, + output_index=output_index, + content_index=content_index, + part=content_part, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseTextDeltaEvent( + type="response.output_text.delta", + item_id=output_item.id, + output_index=output_index, + content_index=content_index, + delta=content_part.text, + logprobs=[], + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseTextDoneEvent( + type="response.output_text.done", + item_id=output_item.id, + output_index=output_index, + content_index=content_index, + text=content_part.text, + logprobs=[], + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseContentPartDoneEvent( + type="response.content_part.done", + item_id=output_item.id, + output_index=output_index, + content_index=content_index, + part=content_part, + sequence_number=sequence_number, + ) + sequence_number += 1 + + yield ResponseOutputItemDoneEvent( + type="response.output_item.done", + item=output_item, + output_index=output_index, + sequence_number=sequence_number, + ) + sequence_number += 1 + yield ResponseCompletedEvent( type="response.completed", - response=get_response_obj(output, usage=self.hardcoded_usage), - sequence_number=0, + response=response, + sequence_number=sequence_number, ) diff --git a/tests/fastapi/test_streaming_context.py b/tests/fastapi/test_streaming_context.py index ee13045e4..f2b890394 100644 --- a/tests/fastapi/test_streaming_context.py +++ b/tests/fastapi/test_streaming_context.py @@ -25,5 +25,17 @@ async def test_streaming_context(): body = (await r.aread()).decode("utf-8") lines = [line for line in body.splitlines() if line] assert lines == snapshot( - ["agent_updated_stream_event", "raw_response_event", "run_item_stream_event"] + [ + "agent_updated_stream_event", + "raw_response_event", # ResponseCreatedEvent + "raw_response_event", # ResponseInProgressEvent + "raw_response_event", # ResponseOutputItemAddedEvent + "raw_response_event", # ResponseContentPartAddedEvent + "raw_response_event", # ResponseTextDeltaEvent + "raw_response_event", # ResponseTextDoneEvent + "raw_response_event", # ResponseContentPartDoneEvent + "raw_response_event", # ResponseOutputItemDoneEvent + "raw_response_event", # ResponseCompletedEvent + "run_item_stream_event", # MessageOutputItem + ] ) diff --git a/tests/test_agent_runner_streamed.py b/tests/test_agent_runner_streamed.py index 90071a3d7..25613f92a 100644 --- a/tests/test_agent_runner_streamed.py +++ b/tests/test_agent_runner_streamed.py @@ -695,11 +695,16 @@ async def test_streaming_events(): # Now lets check the events expected_item_type_map = { - "tool_call": 2, + # 3 tool_call_item events: + # 1. get_function_tool_call("foo", ...) + # 2. get_handoff_tool_call(agent_1) because handoffs are implemented via tool calls too + # 3. get_function_tool_call("bar", ...) + "tool_call": 3, + # Only 2 outputs, handoff tool call doesn't have corresponding tool_call_output event "tool_call_output": 2, - "message": 2, - "handoff": 1, - "handoff_output": 1, + "message": 2, # get_text_message("a_message") + get_final_output_message(...) + "handoff": 1, # get_handoff_tool_call(agent_1) + "handoff_output": 1, # handoff_output_item } total_expected_item_count = sum(expected_item_type_map.values()) diff --git a/tests/test_stream_events.py b/tests/test_stream_events.py index a2f0338d6..a2de208b5 100644 --- a/tests/test_stream_events.py +++ b/tests/test_stream_events.py @@ -2,15 +2,40 @@ import time import pytest +from openai.types.responses import ( + ResponseCompletedEvent, + ResponseContentPartAddedEvent, + ResponseContentPartDoneEvent, + ResponseCreatedEvent, + ResponseFunctionCallArgumentsDeltaEvent, + ResponseFunctionCallArgumentsDoneEvent, + ResponseInProgressEvent, + ResponseOutputItemAddedEvent, + ResponseOutputItemDoneEvent, + ResponseReasoningSummaryPartAddedEvent, + ResponseReasoningSummaryPartDoneEvent, + ResponseReasoningSummaryTextDeltaEvent, + ResponseReasoningSummaryTextDoneEvent, + ResponseTextDeltaEvent, + ResponseTextDoneEvent, +) +from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary from agents import Agent, HandoffCallItem, Runner, function_tool from agents.extensions.handoff_filters import remove_all_tools from agents.handoffs import handoff +from agents.items import MessageOutputItem, ReasoningItem, ToolCallItem, ToolCallOutputItem from .fake_model import FakeModel from .test_responses import get_function_tool_call, get_handoff_tool_call, get_text_message +def get_reasoning_item() -> ResponseReasoningItem: + return ResponseReasoningItem( + id="rid", type="reasoning", summary=[Summary(text="thinking", type="summary_text")] + ) + + @function_tool async def foo() -> str: await asyncio.sleep(3) @@ -108,3 +133,150 @@ async def foo(args: str) -> str: assert handoff_requested_seen, "handoff_requested event not observed" assert agent_switched_to_english, "Agent did not switch to EnglishAgent" + + +@pytest.mark.asyncio +async def test_complete_streaming_events(): + """Verify all streaming event types are emitted in correct order. + + Tests the complete event sequence including: + - Reasoning items with summary events + - Function call with arguments delta/done events + - Message output with content_part and text delta/done events + """ + model = FakeModel() + agent = Agent( + name="TestAgent", + model=model, + tools=[foo], + ) + + model.add_multiple_turn_outputs( + [ + [ + get_reasoning_item(), + get_function_tool_call("foo", '{"arg": "value"}'), + ], + [get_text_message("Final response")], + ] + ) + + result = Runner.run_streamed(agent, input="Hello") + + events = [] + async for event in result.stream_events(): + events.append(event) + + assert len(events) == 27, f"Expected 27 events but got {len(events)}" + + # Event 0: agent_updated_stream_event + assert events[0].type == "agent_updated_stream_event" + assert events[0].new_agent.name == "TestAgent" + + # Event 1: ResponseCreatedEvent (first turn started) + assert events[1].type == "raw_response_event" + assert isinstance(events[1].data, ResponseCreatedEvent) + + # Event 2: ResponseInProgressEvent + assert events[2].type == "raw_response_event" + assert isinstance(events[2].data, ResponseInProgressEvent) + + # Event 3: ResponseOutputItemAddedEvent (reasoning item) + assert events[3].type == "raw_response_event" + assert isinstance(events[3].data, ResponseOutputItemAddedEvent) + + # Event 4: ResponseReasoningSummaryPartAddedEvent + assert events[4].type == "raw_response_event" + assert isinstance(events[4].data, ResponseReasoningSummaryPartAddedEvent) + + # Event 5: ResponseReasoningSummaryTextDeltaEvent + assert events[5].type == "raw_response_event" + assert isinstance(events[5].data, ResponseReasoningSummaryTextDeltaEvent) + + # Event 6: ResponseReasoningSummaryTextDoneEvent + assert events[6].type == "raw_response_event" + assert isinstance(events[6].data, ResponseReasoningSummaryTextDoneEvent) + + # Event 7: ResponseReasoningSummaryPartDoneEvent + assert events[7].type == "raw_response_event" + assert isinstance(events[7].data, ResponseReasoningSummaryPartDoneEvent) + + # Event 8: ResponseOutputItemDoneEvent (reasoning item) + assert events[8].type == "raw_response_event" + assert isinstance(events[8].data, ResponseOutputItemDoneEvent) + + # Event 9: ReasoningItem run_item_stream_event + assert events[9].type == "run_item_stream_event" + assert events[9].name == "reasoning_item_created" + assert isinstance(events[9].item, ReasoningItem) + + # Event 10: ResponseOutputItemAddedEvent (function call) + assert events[10].type == "raw_response_event" + assert isinstance(events[10].data, ResponseOutputItemAddedEvent) + + # Event 11: ResponseFunctionCallArgumentsDeltaEvent + assert events[11].type == "raw_response_event" + assert isinstance(events[11].data, ResponseFunctionCallArgumentsDeltaEvent) + + # Event 12: ResponseFunctionCallArgumentsDoneEvent + assert events[12].type == "raw_response_event" + assert isinstance(events[12].data, ResponseFunctionCallArgumentsDoneEvent) + + # Event 13: ResponseOutputItemDoneEvent (function call) + assert events[13].type == "raw_response_event" + assert isinstance(events[13].data, ResponseOutputItemDoneEvent) + + # Event 14: ToolCallItem run_item_stream_event + assert events[14].type == "run_item_stream_event" + assert events[14].name == "tool_called" + assert isinstance(events[14].item, ToolCallItem) + + # Event 15: ResponseCompletedEvent (first turn ended) + assert events[15].type == "raw_response_event" + assert isinstance(events[15].data, ResponseCompletedEvent) + + # Event 16: ToolCallOutputItem run_item_stream_event + assert events[16].type == "run_item_stream_event" + assert events[16].name == "tool_output" + assert isinstance(events[16].item, ToolCallOutputItem) + + # Event 17: ResponseCreatedEvent (second turn started) + assert events[17].type == "raw_response_event" + assert isinstance(events[17].data, ResponseCreatedEvent) + + # Event 18: ResponseInProgressEvent + assert events[18].type == "raw_response_event" + assert isinstance(events[18].data, ResponseInProgressEvent) + + # Event 19: ResponseOutputItemAddedEvent + assert events[19].type == "raw_response_event" + assert isinstance(events[19].data, ResponseOutputItemAddedEvent) + + # Event 20: ResponseContentPartAddedEvent + assert events[20].type == "raw_response_event" + assert isinstance(events[20].data, ResponseContentPartAddedEvent) + + # Event 21: ResponseTextDeltaEvent + assert events[21].type == "raw_response_event" + assert isinstance(events[21].data, ResponseTextDeltaEvent) + + # Event 22: ResponseTextDoneEvent + assert events[22].type == "raw_response_event" + assert isinstance(events[22].data, ResponseTextDoneEvent) + + # Event 23: ResponseContentPartDoneEvent + assert events[23].type == "raw_response_event" + assert isinstance(events[23].data, ResponseContentPartDoneEvent) + + # Event 24: ResponseOutputItemDoneEvent + assert events[24].type == "raw_response_event" + assert isinstance(events[24].data, ResponseOutputItemDoneEvent) + + # Event 25: ResponseCompletedEvent (second turn ended) + assert events[25].type == "raw_response_event" + assert isinstance(events[25].data, ResponseCompletedEvent) + + # Event 26: MessageOutputItem run_item_stream_event + assert events[26].type == "run_item_stream_event" + assert events[26].name == "message_output_created" + assert isinstance(events[26].item, MessageOutputItem)