diff --git a/tests/entrypoints/openai/test_response_api_with_harmony.py b/tests/entrypoints/openai/test_response_api_with_harmony.py index eceaff672112..8d974d56b445 100644 --- a/tests/entrypoints/openai/test_response_api_with_harmony.py +++ b/tests/entrypoints/openai/test_response_api_with_harmony.py @@ -287,6 +287,57 @@ async def test_stateful_multi_turn(client: OpenAI, model_name: str): assert response3.status == "completed" +@pytest.mark.asyncio +@pytest.mark.parametrize("model_name", [MODEL_NAME]) +async def test_streaming_types(client: OpenAI, model_name: str): + prompts = [ + "tell me a story about a cat in 20 words", + ] + + # this links the "done" type with the "start" type + # so every "done" type should have a corresponding "start" type + # and every open block should be closed by the end of the stream + pairs_of_event_types = { + "response.completed": "response.created", + "response.output_item.done": "response.output_item.added", + "response.content_part.done": "response.content_part.added", + "response.output_text.done": "response.output_text.delta", + "response.web_search_call.done": "response.web_search_call.added", + "response.reasoning_text.done": "response.reasoning_text.delta", + "response.reasoning_part.done": "response.reasoning_part.added", + } + + for prompt in prompts: + response = await client.responses.create( + model=model_name, + input=prompt, + reasoning={"effort": "low"}, + tools=[], + stream=True, + background=False, + ) + + stack_of_event_types = [] + async for event in response: + if event.type == 'response.created': + stack_of_event_types.append(event.type) + elif event.type == 'response.completed': + assert stack_of_event_types[-1] == pairs_of_event_types[ + event.type] + stack_of_event_types.pop() + if event.type.endswith("added"): + stack_of_event_types.append(event.type) + elif event.type.endswith("delta"): + if stack_of_event_types[-1] == event.type: + continue + stack_of_event_types.append(event.type) + elif event.type.endswith("done"): + assert stack_of_event_types[-1] == pairs_of_event_types[ + event.type] + stack_of_event_types.pop() + assert len(stack_of_event_types) == 0 + + @pytest.mark.asyncio @pytest.mark.parametrize("model_name", [MODEL_NAME]) @pytest.mark.parametrize("background", [True, False]) @@ -343,7 +394,10 @@ async def test_streaming(client: OpenAI, model_name: str, background: bool): assert event.item_id == current_item_id # verify content_index_id is correct - if event.type == "response.content_part.added": + if event.type in [ + "response.content_part.added", + "response.reasoning_part.added" + ]: assert event.content_index != current_content_index current_content_index = event.content_index elif event.type in [ diff --git a/vllm/entrypoints/openai/protocol.py b/vllm/entrypoints/openai/protocol.py index 2505e493625d..4d8d32e87919 100644 --- a/vllm/entrypoints/openai/protocol.py +++ b/vllm/entrypoints/openai/protocol.py @@ -31,6 +31,8 @@ ResponseReasoningTextDeltaEvent, ResponseReasoningTextDoneEvent, ResponseStatus, ResponseWebSearchCallCompletedEvent, ResponseWebSearchCallInProgressEvent, ResponseWebSearchCallSearchingEvent) +from openai.types.responses.response_reasoning_item import ( + Content as ResponseReasoningTextContent) # Backward compatibility for OpenAI client versions try: # For older openai versions (< 1.100.0) @@ -260,26 +262,6 @@ def get_logits_processors(processors: Optional[LogitsProcessors], ResponseReasoningItem, ResponseFunctionToolCall] -StreamingResponsesResponse: TypeAlias = Union[ - ResponseCreatedEvent, - ResponseInProgressEvent, - ResponseCompletedEvent, - ResponseOutputItemAddedEvent, - ResponseOutputItemDoneEvent, - ResponseContentPartAddedEvent, - ResponseContentPartDoneEvent, - ResponseReasoningTextDeltaEvent, - ResponseReasoningTextDoneEvent, - ResponseCodeInterpreterCallInProgressEvent, - ResponseCodeInterpreterCallCodeDeltaEvent, - ResponseWebSearchCallInProgressEvent, - ResponseWebSearchCallSearchingEvent, - ResponseWebSearchCallCompletedEvent, - ResponseCodeInterpreterCallCodeDoneEvent, - ResponseCodeInterpreterCallInterpretingEvent, - ResponseCodeInterpreterCallCompletedEvent, -] - class ResponsesRequest(OpenAIBaseModel): # Ordered by official OpenAI API documentation @@ -1978,6 +1960,72 @@ def from_request( ) +# TODO: this code can be removed once +# https://github.com/openai/openai-python/issues/2634 has been resolved +class ResponseReasoningPartDoneEvent(OpenAIBaseModel): + content_index: int + """The index of the content part that is done.""" + + item_id: str + """The ID of the output item that the content part was added to.""" + + output_index: int + """The index of the output item that the content part was added to.""" + + part: ResponseReasoningTextContent + """The content part that is done.""" + + sequence_number: int + """The sequence number of this event.""" + + type: Literal["response.reasoning_part.done"] + """The type of the event. Always `response.reasoning_part.done`.""" + + +# TODO: this code can be removed once +# https://github.com/openai/openai-python/issues/2634 has been resolved +class ResponseReasoningPartAddedEvent(OpenAIBaseModel): + content_index: int + """The index of the content part that is done.""" + + item_id: str + """The ID of the output item that the content part was added to.""" + + output_index: int + """The index of the output item that the content part was added to.""" + + part: ResponseReasoningTextContent + """The content part that is done.""" + + sequence_number: int + """The sequence number of this event.""" + + type: Literal["response.reasoning_part.added"] + """The type of the event. Always `response.reasoning_part.added`.""" + + +StreamingResponsesResponse: TypeAlias = Union[ + ResponseCreatedEvent, + ResponseInProgressEvent, + ResponseCompletedEvent, + ResponseOutputItemAddedEvent, + ResponseOutputItemDoneEvent, + ResponseContentPartAddedEvent, + ResponseContentPartDoneEvent, + ResponseReasoningTextDeltaEvent, + ResponseReasoningTextDoneEvent, + ResponseReasoningPartAddedEvent, + ResponseReasoningPartDoneEvent, + ResponseCodeInterpreterCallInProgressEvent, + ResponseCodeInterpreterCallCodeDeltaEvent, + ResponseWebSearchCallInProgressEvent, + ResponseWebSearchCallSearchingEvent, + ResponseWebSearchCallCompletedEvent, + ResponseCodeInterpreterCallCodeDoneEvent, + ResponseCodeInterpreterCallInterpretingEvent, + ResponseCodeInterpreterCallCompletedEvent, +] + BatchRequestInputBody = Union[ChatCompletionRequest, EmbeddingRequest, ScoreRequest, RerankRequest] diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index 469d74272b0e..4894623aeac2 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -58,6 +58,8 @@ InputTokensDetails, OutputTokensDetails, RequestResponseMetadata, + ResponseReasoningPartAddedEvent, + ResponseReasoningPartDoneEvent, ResponsesRequest, ResponsesResponse, ResponseUsage, StreamingResponsesResponse) @@ -1280,14 +1282,13 @@ async def _process_harmony_streaming_events( # Deal with tool call here pass elif previous_item.channel == "analysis": + content = ResponseReasoningTextContent( + text=previous_item.content[0].text, + type="reasoning_text", + ) reasoning_item = ResponseReasoningItem( type="reasoning", - content=[ - ResponseReasoningTextContent( - text=previous_item.content[0].text, - type="reasoning_text", - ), - ], + content=[content], status="completed", id=current_item_id, summary=[], @@ -1301,6 +1302,15 @@ async def _process_harmony_streaming_events( content_index=current_content_index, text=previous_item.content[0].text, )) + yield _increment_sequence_number_and_return( + ResponseReasoningPartDoneEvent( + type="response.reasoning_part.done", + sequence_number=-1, + item_id=current_item_id, + output_index=current_output_index, + content_index=current_content_index, + part=content, + )) yield _increment_sequence_number_and_return( ResponseOutputItemDoneEvent( type="response.output_item.done", @@ -1412,17 +1422,15 @@ async def _process_harmony_streaming_events( )) current_content_index += 1 yield _increment_sequence_number_and_return( - ResponseContentPartAddedEvent( - type="response.content_part.added", + ResponseReasoningPartAddedEvent( + type="response.reasoning_part.added", sequence_number=-1, output_index=current_output_index, item_id=current_item_id, content_index=current_content_index, - part=ResponseOutputText( - type="output_text", + part=ResponseReasoningTextContent( text="", - annotations=[], - logprobs=[], + type="reasoning_text", ), )) yield _increment_sequence_number_and_return(