Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 79 additions & 8 deletions vllm/entrypoints/context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import contextlib
import json
import logging
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -57,9 +59,14 @@ def render_for_completion(self) -> list[int]:

@abstractmethod
async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
pass

@abstractmethod
async def cleanup_session(self) -> None:
raise NotImplementedError("Should not be called.")


class SimpleContext(ConversationContext):

Expand Down Expand Up @@ -89,9 +96,13 @@ def render_for_completion(self) -> list[int]:
raise NotImplementedError("Should not be called.")

async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
pass

async def cleanup_session(self) -> None:
raise NotImplementedError("Should not be called.")


class HarmonyContext(ConversationContext):

Expand All @@ -103,6 +114,7 @@ def __init__(
self._messages = messages
self.available_tools = available_tools
self._tool_sessions: dict[str, Union[ClientSession, Tool]] = {}
self.called_tools: set[str] = set()

self.parser = get_streamable_parser_for_assistant()
self.num_init_messages = len(messages)
Expand Down Expand Up @@ -234,7 +246,8 @@ def need_builtin_tool_call(self) -> bool:
last_msg = self.messages[-1]
recipient = last_msg.recipient
return recipient is not None and (recipient.startswith("browser.")
or recipient.startswith("python"))
or recipient.startswith("python") or
recipient.startswith("container."))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is container tool?


async def call_tool(self) -> list[Message]:
if not self.messages:
Expand All @@ -248,6 +261,9 @@ async def call_tool(self) -> list[Message]:
elif recipient.startswith("python"):
return await self.call_python_tool(
self._tool_sessions["python"], last_msg)
elif recipient.startswith("container."):
return await self.call_container_tool(
self._tool_sessions["container"], last_msg)
raise ValueError("No tool call found")

def render_for_completion(self) -> list[int]:
Expand All @@ -256,6 +272,7 @@ def render_for_completion(self) -> list[int]:
async def call_search_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
self.called_tools.add("browser")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
tool_name = last_msg.recipient.split(".")[1]
Expand All @@ -265,12 +282,16 @@ async def call_search_tool(self, tool_session: Union["ClientSession",
content = TextContent(text=result_str)
author = Author(role=Role.TOOL, name=last_msg.recipient)
return [
Message(author=author, content=[content], recipient=Role.ASSISTANT)
Message(author=author,
content=[content],
recipient=Role.ASSISTANT,
channel=last_msg.channel)
]

async def call_python_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
self.called_tools.add("python")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
param = {
Expand All @@ -290,13 +311,63 @@ async def call_python_tool(self, tool_session: Union["ClientSession",
]

async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
if tool_server:
for tool_name in self.available_tools:
if tool_name not in self._tool_sessions:
self._tool_sessions[
tool_name] = await exit_stack.enter_async_context(
tool_server.new_session(tool_name))
tool_session = await exit_stack.enter_async_context(
tool_server.new_session(tool_name, request_id))
self._tool_sessions[tool_name] = tool_session
exit_stack.push_async_exit(self.cleanup_session)

async def call_container_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
"""
Call container tool. Expect this to be run in a stateful docker
with command line terminal.
The official container tool would at least
expect the following format:
- for tool name: exec
- args:
{
"cmd":List[str] "command to execute",
"workdir":optional[str] "current working directory",
"env":optional[object/dict] "environment variables",
"session_name":optional[str] "session name",
"timeout":optional[int] "timeout in seconds",
"user":optional[str] "user name",
}
"""
self.called_tools.add("container")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
tool_name = last_msg.recipient.split(".")[1].split(" ")[0]
args = json.loads(last_msg.content[0].text)
result = await tool_session.call_tool(tool_name, args)
result_str = result.content[0].text
content = TextContent(text=result_str)
author = Author(role=Role.TOOL, name=last_msg.recipient)
return [
Message(author=author,
content=[content],
recipient=Role.ASSISTANT,
channel=last_msg.channel)
]

async def cleanup_session(self, *args, **kwargs) -> None:
"""Can be used as coro to used in __aexit__"""

async def cleanup_tool_session(tool_session):
if not isinstance(tool_session, Tool):
logger.info("Cleaning up tool session for %s",
tool_session._client_info)
with contextlib.suppress(Exception):
await tool_session.call_tool("cleanup_session", {})

await asyncio.gather(*(cleanup_tool_session(self._tool_sessions[tool])
for tool in self.called_tools))


class StreamingHarmonyContext(HarmonyContext):
Expand Down
50 changes: 44 additions & 6 deletions vllm/entrypoints/harmony_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent)
from openai.types.responses.tool import Tool
from openai_harmony import (Author, Conversation, DeveloperContent,
HarmonyEncodingName, Message, ReasoningEffort,
Role, StreamableParser, SystemContent, TextContent,
ToolDescription, load_harmony_encoding)
from openai_harmony import (Author, ChannelConfig, Conversation,
DeveloperContent, HarmonyEncodingName, Message,
ReasoningEffort, Role, StreamableParser,
SystemContent, TextContent, ToolDescription,
load_harmony_encoding)

from vllm import envs
from vllm.entrypoints.openai.protocol import (ChatCompletionToolsParam,
ResponseInputOutputItem)
from vllm.utils import random_uuid
Expand All @@ -33,6 +35,20 @@

_harmony_encoding = None

# Builtin tools that should be included in the system message when
# they are available and requested by the user.
# Tool args are provided by MCP tool descriptions. Output
# of the tools are stringified.
BUILTIN_TOOLS = {
"web_search_preview",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment about to explain the tools? What's input and output expectation?

"code_interpreter",
"container",
}


def has_custom_tools(tool_types: list[str]) -> bool:
return not set(tool_types).issubset(BUILTIN_TOOLS)


def get_encoding():
global _harmony_encoding
Expand All @@ -48,10 +64,19 @@ def get_system_message(
start_date: Optional[str] = None,
browser_description: Optional[str] = None,
python_description: Optional[str] = None,
container_description: Optional[str] = None,
instructions: Optional[str] = None,
with_custom_tools: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for self: update system message for chat completions once #22386 is in

) -> Message:
sys_msg_content = SystemContent.new()
if model_identity is not None:
sys_msg_content = sys_msg_content.with_model_identity(model_identity)
if (instructions is not None
and envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS):
current_identity = sys_msg_content.model_identity
new_identity = (f'{current_identity}\n{instructions}'
if current_identity else instructions)
sys_msg_content = sys_msg_content.with_model_identity(new_identity)
if reasoning_effort is not None:
sys_msg_content = sys_msg_content.with_reasoning_effort(
REASONING_EFFORT[reasoning_effort])
Expand All @@ -63,6 +88,14 @@ def get_system_message(
sys_msg_content = sys_msg_content.with_tools(browser_description)
if python_description is not None:
sys_msg_content = sys_msg_content.with_tools(python_description)
if container_description is not None:
sys_msg_content = sys_msg_content.with_tools(container_description)
if not with_custom_tools:
Copy link
Contributor

@Ithanil Ithanil Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, this basically covers #23167 , so if the present PR gets merged, mine is obsolete.

channel_config = sys_msg_content.channel_config
invalid_channel = "commentary"
new_config = ChannelConfig.require_channels(
[c for c in channel_config.valid_channels if c != invalid_channel])
sys_msg_content = sys_msg_content.with_channel_config(new_config)
sys_msg = Message.from_role_and_content(Role.SYSTEM, sys_msg_content)
return sys_msg

Expand All @@ -86,14 +119,17 @@ def get_developer_message(
tools: Optional[list[Union[Tool, ChatCompletionToolsParam]]] = None,
) -> Message:
dev_msg_content = DeveloperContent.new()
if instructions is not None:
if (instructions is not None
and not envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS):
dev_msg_content = dev_msg_content.with_instructions(instructions)
if tools is not None:
function_tools: list[Union[Tool, ChatCompletionToolsParam]] = []
for tool in tools:
if tool.type in ("web_search_preview", "code_interpreter"):
if tool.type in ("web_search_preview", "code_interpreter",
"container"):
# These are built-in tools that are added to the system message.
pass

elif tool.type == "function":
function_tools.append(tool)
else:
Expand Down Expand Up @@ -136,6 +172,8 @@ def parse_response_input(
TextContent(text=text_prefix + c["text"]) for c in content
]
msg = Message.from_role_and_contents(role, contents)
if role == "assistant":
msg = msg.with_channel("final")
elif response_msg["type"] == "function_call_output":
call_id = response_msg["call_id"]
call_response: Optional[ResponseFunctionToolCall] = None
Expand Down
33 changes: 26 additions & 7 deletions vllm/entrypoints/openai/serving_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
SimpleContext, StreamingHarmonyContext)
from vllm.entrypoints.harmony_utils import (
get_developer_message, get_stop_tokens_for_assistant_actions,
get_system_message, get_user_message, parse_output_message,
parse_remaining_state, parse_response_input, render_for_completion)
get_system_message, get_user_message, has_custom_tools,
parse_output_message, parse_remaining_state, parse_response_input,
render_for_completion)
from vllm.entrypoints.logger import RequestLogger
# yapf conflicts with isort for this block
# yapf: disable
Expand Down Expand Up @@ -266,6 +267,8 @@ async def create_responses(
builtin_tool_list.append("browser")
if self.tool_server.has_tool("python"):
builtin_tool_list.append("python")
if self.tool_server.has_tool("container"):
builtin_tool_list.append("container")

if self.tool_server is not None:
available_tools = builtin_tool_list
Expand Down Expand Up @@ -448,7 +451,8 @@ async def responses_full_generator(

async with AsyncExitStack() as exit_stack:
try:
await context.init_tool_sessions(self.tool_server, exit_stack)
await context.init_tool_sessions(self.tool_server, exit_stack,
request.request_id)
async for _ in result_generator:
pass
except asyncio.CancelledError:
Expand Down Expand Up @@ -710,13 +714,21 @@ def _construct_input_messages_with_harmony(
# New conversation.
reasoning_effort = (request.reasoning.effort
if request.reasoning else None)
# Temporary: OpenAI types doesn't have container tool
# so we used MCP to cover that, up for change
tool_types = [tool.type for tool in request.tools]
if envs.VLLM_GPT_OSS_USE_CONTAINER_TOOL:
tool_types.append("container")
enable_browser = ("web_search_preview" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("browser"))
enable_code_interpreter = ("code_interpreter" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("python"))
enable_container = ("container" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("container"))
with_custom_tools = has_custom_tools(tool_types)
sys_msg = get_system_message(
reasoning_effort=reasoning_effort,
browser_description=self.tool_server.get_tool_description(
Expand All @@ -725,11 +737,17 @@ def _construct_input_messages_with_harmony(
python_description=self.tool_server.get_tool_description(
"python") if enable_code_interpreter
and self.tool_server is not None else None,
container_description=self.tool_server.get_tool_description(
"container")
if enable_container and self.tool_server is not None else None,
instructions=request.instructions,
with_custom_tools=with_custom_tools,
)
messages.append(sys_msg)
dev_msg = get_developer_message(request.instructions,
request.tools)
messages.append(dev_msg)
if with_custom_tools:
dev_msg = get_developer_message(
instructions=request.instructions, tools=request.tools)
messages.append(dev_msg)
else:
# Continue the previous conversation.
# FIXME(woosuk): Currently, request params like reasoning and
Expand Down Expand Up @@ -1613,7 +1631,8 @@ def _send_event(event: BaseModel):
async with AsyncExitStack() as exit_stack:
processer = None
if self.use_harmony:
await context.init_tool_sessions(self.tool_server, exit_stack)
await context.init_tool_sessions(self.tool_server, exit_stack,
request.request_id)
processer = self._process_harmony_streaming_events
else:
processer = self._process_simple_streaming_events
Expand Down
16 changes: 10 additions & 6 deletions vllm/entrypoints/tool_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def get_tool_description(self,
pass

@abstractmethod
def new_session(self, tool_name: str) -> AbstractAsyncContextManager[Any]:
def new_session(self, tool_name: str,
session_id: str) -> AbstractAsyncContextManager[Any]:
"""
Create a session for the tool.
"""
Expand Down Expand Up @@ -124,7 +125,8 @@ async def add_tool_server(self, server_url: str):
description=tool.description,
parameters=tool.inputSchema)
for tool in list_tools_response.tools
])
],
)
self.harmony_tool_descriptions[tool_from_mcp.name] = tool_from_mcp
if tool_from_mcp.name not in self.urls:
self.urls[tool_from_mcp.name] = url
Expand All @@ -142,14 +144,16 @@ def get_tool_description(self, tool_name: str):
return self.harmony_tool_descriptions.get(tool_name)

@asynccontextmanager
async def new_session(self, tool_name: str):
async def new_session(self, tool_name: str, session_id: str):
from mcp import ClientSession
from mcp.client.sse import sse_client
url = self.urls.get(tool_name)
headers = {"x-session-id": session_id}
if not url:
raise KeyError(f"Tool '{tool_name}' is not supported")
async with sse_client(url=url) as streams, ClientSession(
*streams) as session:
async with sse_client(url=url,
headers=headers) as streams, ClientSession(
*streams) as session:
await session.initialize()
yield session
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the session cleanup logic, maybe just add it here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now tool sessions are not managed in tool sever lifecycle but with context lifecycle so it has to be attached there unless we do a refactor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think regardless of exit_stack.push_async_exit(context.cleanup_session) or implementing the exist logic here, the cleanup will be called when the code goes out the async with AsyncExitStack() as exit_stack: block?

And I thought different requests should have difference session so tool sessions should not be managed in tool server lifecycle. Is this understanding correct?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in tool server, everything should be managed in context as it as the actual management of sessions and per request based


Expand Down Expand Up @@ -182,7 +186,7 @@ def get_tool_description(self,
raise ValueError(f"Unknown tool {tool_name}")

@asynccontextmanager
async def new_session(self, tool_name: str):
async def new_session(self, tool_name: str, session_id: str):
if tool_name not in self.tools:
raise KeyError(f"Tool '{tool_name}' is not supported")
yield self.tools[tool_name]
Loading