Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion tests/entrypoints/openai/test_response_api_with_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,57 @@ async def test_stateful_multi_turn(client: OpenAI, model_name: str):
assert response3.status == "completed"


@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
async def test_streaming_types(client: OpenAI, model_name: str):
prompts = [
"tell me a story about a cat in 20 words",
]

# this links the "done" type with the "start" type
# so every "done" type should have a corresponding "start" type
# and every open block should be closed by the end of the stream
pairs_of_event_types = {
"response.completed": "response.created",
"response.output_item.done": "response.output_item.added",
"response.content_part.done": "response.content_part.added",
"response.output_text.done": "response.output_text.delta",
"response.web_search_call.done": "response.web_search_call.added",
"response.reasoning_text.done": "response.reasoning_text.delta",
"response.reasoning_part.done": "response.reasoning_part.added",
}

for prompt in prompts:
response = await client.responses.create(
model=model_name,
input=prompt,
reasoning={"effort": "low"},
tools=[],
stream=True,
background=False,
)

stack_of_event_types = []
async for event in response:
if event.type == 'response.created':
stack_of_event_types.append(event.type)
elif event.type == 'response.completed':
assert stack_of_event_types[-1] == pairs_of_event_types[
event.type]
stack_of_event_types.pop()
if event.type.endswith("added"):
stack_of_event_types.append(event.type)
elif event.type.endswith("delta"):
if stack_of_event_types[-1] == event.type:
continue
stack_of_event_types.append(event.type)
elif event.type.endswith("done"):
assert stack_of_event_types[-1] == pairs_of_event_types[
event.type]
stack_of_event_types.pop()
assert len(stack_of_event_types) == 0


@pytest.mark.asyncio
@pytest.mark.parametrize("model_name", [MODEL_NAME])
@pytest.mark.parametrize("background", [True, False])
Expand Down Expand Up @@ -343,7 +394,10 @@ async def test_streaming(client: OpenAI, model_name: str, background: bool):
assert event.item_id == current_item_id

# verify content_index_id is correct
if event.type == "response.content_part.added":
if event.type in [
"response.content_part.added",
"response.reasoning_part.added"
]:
assert event.content_index != current_content_index
current_content_index = event.content_index
elif event.type in [
Expand Down
88 changes: 68 additions & 20 deletions vllm/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
ResponseReasoningTextDeltaEvent, ResponseReasoningTextDoneEvent,
ResponseStatus, ResponseWebSearchCallCompletedEvent,
ResponseWebSearchCallInProgressEvent, ResponseWebSearchCallSearchingEvent)
from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent)

# Backward compatibility for OpenAI client versions
try: # For older openai versions (< 1.100.0)
Expand Down Expand Up @@ -260,26 +262,6 @@ def get_logits_processors(processors: Optional[LogitsProcessors],
ResponseReasoningItem,
ResponseFunctionToolCall]

StreamingResponsesResponse: TypeAlias = Union[
ResponseCreatedEvent,
ResponseInProgressEvent,
ResponseCompletedEvent,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent,
ResponseReasoningTextDeltaEvent,
ResponseReasoningTextDoneEvent,
ResponseCodeInterpreterCallInProgressEvent,
ResponseCodeInterpreterCallCodeDeltaEvent,
ResponseWebSearchCallInProgressEvent,
ResponseWebSearchCallSearchingEvent,
ResponseWebSearchCallCompletedEvent,
ResponseCodeInterpreterCallCodeDoneEvent,
ResponseCodeInterpreterCallInterpretingEvent,
ResponseCodeInterpreterCallCompletedEvent,
]


class ResponsesRequest(OpenAIBaseModel):
# Ordered by official OpenAI API documentation
Expand Down Expand Up @@ -1978,6 +1960,72 @@ def from_request(
)


# TODO: this code can be removed once
# https://github.com/openai/openai-python/issues/2634 has been resolved
class ResponseReasoningPartDoneEvent(OpenAIBaseModel):
content_index: int
"""The index of the content part that is done."""

item_id: str
"""The ID of the output item that the content part was added to."""

output_index: int
"""The index of the output item that the content part was added to."""

part: ResponseReasoningTextContent
"""The content part that is done."""

sequence_number: int
"""The sequence number of this event."""

type: Literal["response.reasoning_part.done"]
"""The type of the event. Always `response.reasoning_part.done`."""


# TODO: this code can be removed once
# https://github.com/openai/openai-python/issues/2634 has been resolved
class ResponseReasoningPartAddedEvent(OpenAIBaseModel):
content_index: int
"""The index of the content part that is done."""

item_id: str
"""The ID of the output item that the content part was added to."""

output_index: int
"""The index of the output item that the content part was added to."""

part: ResponseReasoningTextContent
"""The content part that is done."""

sequence_number: int
"""The sequence number of this event."""

type: Literal["response.reasoning_part.added"]
"""The type of the event. Always `response.reasoning_part.added`."""


StreamingResponsesResponse: TypeAlias = Union[
ResponseCreatedEvent,
ResponseInProgressEvent,
ResponseCompletedEvent,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseContentPartAddedEvent,
ResponseContentPartDoneEvent,
ResponseReasoningTextDeltaEvent,
ResponseReasoningTextDoneEvent,
ResponseReasoningPartAddedEvent,
ResponseReasoningPartDoneEvent,
ResponseCodeInterpreterCallInProgressEvent,
ResponseCodeInterpreterCallCodeDeltaEvent,
ResponseWebSearchCallInProgressEvent,
ResponseWebSearchCallSearchingEvent,
ResponseWebSearchCallCompletedEvent,
ResponseCodeInterpreterCallCodeDoneEvent,
ResponseCodeInterpreterCallInterpretingEvent,
ResponseCodeInterpreterCallCompletedEvent,
]

BatchRequestInputBody = Union[ChatCompletionRequest, EmbeddingRequest,
ScoreRequest, RerankRequest]

Expand Down
32 changes: 20 additions & 12 deletions vllm/entrypoints/openai/serving_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
InputTokensDetails,
OutputTokensDetails,
RequestResponseMetadata,
ResponseReasoningPartAddedEvent,
ResponseReasoningPartDoneEvent,
ResponsesRequest,
ResponsesResponse, ResponseUsage,
StreamingResponsesResponse)
Expand Down Expand Up @@ -1280,14 +1282,13 @@ async def _process_harmony_streaming_events(
# Deal with tool call here
pass
elif previous_item.channel == "analysis":
content = ResponseReasoningTextContent(
text=previous_item.content[0].text,
type="reasoning_text",
)
reasoning_item = ResponseReasoningItem(
type="reasoning",
content=[
ResponseReasoningTextContent(
text=previous_item.content[0].text,
type="reasoning_text",
),
],
content=[content],
status="completed",
id=current_item_id,
summary=[],
Expand All @@ -1301,6 +1302,15 @@ async def _process_harmony_streaming_events(
content_index=current_content_index,
text=previous_item.content[0].text,
))
yield _increment_sequence_number_and_return(
ResponseReasoningPartDoneEvent(
type="response.reasoning_part.done",
sequence_number=-1,
item_id=current_item_id,
output_index=current_output_index,
content_index=current_content_index,
part=content,
))
yield _increment_sequence_number_and_return(
ResponseOutputItemDoneEvent(
type="response.output_item.done",
Expand Down Expand Up @@ -1412,17 +1422,15 @@ async def _process_harmony_streaming_events(
))
current_content_index += 1
yield _increment_sequence_number_and_return(
ResponseContentPartAddedEvent(
type="response.content_part.added",
ResponseReasoningPartAddedEvent(
type="response.reasoning_part.added",
sequence_number=-1,
output_index=current_output_index,
item_id=current_item_id,
content_index=current_content_index,
part=ResponseOutputText(
type="output_text",
part=ResponseReasoningTextContent(
text="",
annotations=[],
logprobs=[],
type="reasoning_text",
),
))
yield _increment_sequence_number_and_return(
Expand Down