Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions tests/entrypoints/openai/test_response_api_with_harmony.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ async def test_streaming(client: OpenAI, model_name: str, background: bool):
background=background,
)

current_item_id = ""
current_content_index = -1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test case doesn't capture multiple subsequent streaming items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i'm not sure if i understand your question? I haven't enabled the tool calling for streaming yet, so currently we're only testing reasoningOutput -> finalOutput items.


events = []
current_event_mode = None
resp_id = None
Expand All @@ -329,6 +332,26 @@ async def test_streaming(client: OpenAI, model_name: str, background: bool):
current_event_mode = event.type
print(f"\n[{event.type}] ", end="", flush=True)

# verify current_item_id is correct
if event.type == "response.output_item.added":
assert event.item.id != current_item_id
current_item_id = event.item.id
elif event.type in [
"response.output_text.delta",
"response.reasoning_text.delta"
]:
assert event.item_id == current_item_id

# verify content_index_id is correct
if event.type == "response.content_part.added":
assert event.content_index != current_content_index
current_content_index = event.content_index
elif event.type in [
"response.output_text.delta",
"response.reasoning_text.delta"
]:
assert event.content_index == current_content_index

if "text.delta" in event.type:
print(event.delta, end="", flush=True)
elif "reasoning_text.delta" in event.type:
Expand Down
10 changes: 8 additions & 2 deletions vllm/entrypoints/openai/serving_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,9 +1260,9 @@ async def _process_harmony_streaming_events(
_increment_sequence_number_and_return: Callable[[BaseModel],
BaseModel],
) -> AsyncGenerator[BaseModel, None]:
current_content_index = 0 # FIXME: this number is never changed
current_content_index = -1
current_output_index = 0
current_item_id = "" # FIXME: this number is never changed
current_item_id: str = ""
sent_output_item_added = False

async for ctx in result_generator:
Expand Down Expand Up @@ -1353,6 +1353,7 @@ async def _process_harmony_streaming_events(
and ctx.parser.current_recipient is None):
if not sent_output_item_added:
sent_output_item_added = True
current_item_id = f"msg_{random_uuid()}"
yield _increment_sequence_number_and_return(
openai_responses_types.
ResponseOutputItemAddedEvent(
Expand All @@ -1368,6 +1369,7 @@ async def _process_harmony_streaming_events(
status="in_progress",
),
))
current_content_index += 1
yield _increment_sequence_number_and_return(
openai_responses_types.
ResponseContentPartAddedEvent(
Expand Down Expand Up @@ -1398,6 +1400,7 @@ async def _process_harmony_streaming_events(
and ctx.parser.current_recipient is None):
if not sent_output_item_added:
sent_output_item_added = True
current_item_id = f"msg_{random_uuid()}"
yield _increment_sequence_number_and_return(
openai_responses_types.
ResponseOutputItemAddedEvent(
Expand All @@ -1412,6 +1415,7 @@ async def _process_harmony_streaming_events(
status="in_progress",
),
))
current_content_index += 1
yield _increment_sequence_number_and_return(
openai_responses_types.
ResponseContentPartAddedEvent(
Expand Down Expand Up @@ -1444,6 +1448,7 @@ async def _process_harmony_streaming_events(
) and ctx.parser.current_recipient == "python":
if not sent_output_item_added:
sent_output_item_added = True
current_item_id = f"tool_{random_uuid()}"
yield _increment_sequence_number_and_return(
openai_responses_types.
ResponseOutputItemAddedEvent(
Expand Down Expand Up @@ -1516,6 +1521,7 @@ async def _process_harmony_streaming_events(
raise ValueError(
f"Unknown function name: {function_name}")

current_item_id = f"tool_{random_uuid()}"
yield _increment_sequence_number_and_return(
openai_responses_types.ResponseOutputItemAddedEvent(
type="response.output_item.added",
Expand Down