diff --git a/vllm/entrypoints/context.py b/vllm/entrypoints/context.py index 7723c5d5cbcf..9012639457ca 100644 --- a/vllm/entrypoints/context.py +++ b/vllm/entrypoints/context.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import asyncio +import contextlib import json import logging from abc import ABC, abstractmethod @@ -57,9 +59,14 @@ def render_for_completion(self) -> list[int]: @abstractmethod async def init_tool_sessions(self, tool_server: Optional[ToolServer], - exit_stack: AsyncExitStack) -> None: + exit_stack: AsyncExitStack, + request_id: str) -> None: pass + @abstractmethod + async def cleanup_session(self) -> None: + raise NotImplementedError("Should not be called.") + class SimpleContext(ConversationContext): @@ -89,9 +96,13 @@ def render_for_completion(self) -> list[int]: raise NotImplementedError("Should not be called.") async def init_tool_sessions(self, tool_server: Optional[ToolServer], - exit_stack: AsyncExitStack) -> None: + exit_stack: AsyncExitStack, + request_id: str) -> None: pass + async def cleanup_session(self) -> None: + raise NotImplementedError("Should not be called.") + class HarmonyContext(ConversationContext): @@ -103,6 +114,7 @@ def __init__( self._messages = messages self.available_tools = available_tools self._tool_sessions: dict[str, Union[ClientSession, Tool]] = {} + self.called_tools: set[str] = set() self.parser = get_streamable_parser_for_assistant() self.num_init_messages = len(messages) @@ -234,7 +246,8 @@ def need_builtin_tool_call(self) -> bool: last_msg = self.messages[-1] recipient = last_msg.recipient return recipient is not None and (recipient.startswith("browser.") - or recipient.startswith("python")) + or recipient.startswith("python") or + recipient.startswith("container.")) async def call_tool(self) -> list[Message]: if not self.messages: @@ -248,6 +261,9 @@ async def call_tool(self) -> list[Message]: elif recipient.startswith("python"): return await self.call_python_tool( self._tool_sessions["python"], last_msg) + elif recipient.startswith("container."): + return await self.call_container_tool( + self._tool_sessions["container"], last_msg) raise ValueError("No tool call found") def render_for_completion(self) -> list[int]: @@ -256,6 +272,7 @@ def render_for_completion(self) -> list[int]: async def call_search_tool(self, tool_session: Union["ClientSession", Tool], last_msg: Message) -> list[Message]: + self.called_tools.add("browser") if isinstance(tool_session, Tool): return await tool_session.get_result(self) tool_name = last_msg.recipient.split(".")[1] @@ -265,12 +282,16 @@ async def call_search_tool(self, tool_session: Union["ClientSession", content = TextContent(text=result_str) author = Author(role=Role.TOOL, name=last_msg.recipient) return [ - Message(author=author, content=[content], recipient=Role.ASSISTANT) + Message(author=author, + content=[content], + recipient=Role.ASSISTANT, + channel=last_msg.channel) ] async def call_python_tool(self, tool_session: Union["ClientSession", Tool], last_msg: Message) -> list[Message]: + self.called_tools.add("python") if isinstance(tool_session, Tool): return await tool_session.get_result(self) param = { @@ -290,13 +311,63 @@ async def call_python_tool(self, tool_session: Union["ClientSession", ] async def init_tool_sessions(self, tool_server: Optional[ToolServer], - exit_stack: AsyncExitStack) -> None: + exit_stack: AsyncExitStack, + request_id: str) -> None: if tool_server: for tool_name in self.available_tools: if tool_name not in self._tool_sessions: - self._tool_sessions[ - tool_name] = await exit_stack.enter_async_context( - tool_server.new_session(tool_name)) + tool_session = await exit_stack.enter_async_context( + tool_server.new_session(tool_name, request_id)) + self._tool_sessions[tool_name] = tool_session + exit_stack.push_async_exit(self.cleanup_session) + + async def call_container_tool(self, tool_session: Union["ClientSession", + Tool], + last_msg: Message) -> list[Message]: + """ + Call container tool. Expect this to be run in a stateful docker + with command line terminal. + The official container tool would at least + expect the following format: + - for tool name: exec + - args: + { + "cmd":List[str] "command to execute", + "workdir":optional[str] "current working directory", + "env":optional[object/dict] "environment variables", + "session_name":optional[str] "session name", + "timeout":optional[int] "timeout in seconds", + "user":optional[str] "user name", + } + """ + self.called_tools.add("container") + if isinstance(tool_session, Tool): + return await tool_session.get_result(self) + tool_name = last_msg.recipient.split(".")[1].split(" ")[0] + args = json.loads(last_msg.content[0].text) + result = await tool_session.call_tool(tool_name, args) + result_str = result.content[0].text + content = TextContent(text=result_str) + author = Author(role=Role.TOOL, name=last_msg.recipient) + return [ + Message(author=author, + content=[content], + recipient=Role.ASSISTANT, + channel=last_msg.channel) + ] + + async def cleanup_session(self, *args, **kwargs) -> None: + """Can be used as coro to used in __aexit__""" + + async def cleanup_tool_session(tool_session): + if not isinstance(tool_session, Tool): + logger.info("Cleaning up tool session for %s", + tool_session._client_info) + with contextlib.suppress(Exception): + await tool_session.call_tool("cleanup_session", {}) + + await asyncio.gather(*(cleanup_tool_session(self._tool_sessions[tool]) + for tool in self.called_tools)) class StreamingHarmonyContext(HarmonyContext): diff --git a/vllm/entrypoints/harmony_utils.py b/vllm/entrypoints/harmony_utils.py index d1ff06425fcb..524e73678f14 100644 --- a/vllm/entrypoints/harmony_utils.py +++ b/vllm/entrypoints/harmony_utils.py @@ -16,11 +16,13 @@ from openai.types.responses.response_reasoning_item import ( Content as ResponseReasoningTextContent) from openai.types.responses.tool import Tool -from openai_harmony import (Author, Conversation, DeveloperContent, - HarmonyEncodingName, Message, ReasoningEffort, - Role, StreamableParser, SystemContent, TextContent, - ToolDescription, load_harmony_encoding) +from openai_harmony import (Author, ChannelConfig, Conversation, + DeveloperContent, HarmonyEncodingName, Message, + ReasoningEffort, Role, StreamableParser, + SystemContent, TextContent, ToolDescription, + load_harmony_encoding) +from vllm import envs from vllm.entrypoints.openai.protocol import (ChatCompletionToolsParam, ResponseInputOutputItem) from vllm.utils import random_uuid @@ -33,6 +35,20 @@ _harmony_encoding = None +# Builtin tools that should be included in the system message when +# they are available and requested by the user. +# Tool args are provided by MCP tool descriptions. Output +# of the tools are stringified. +BUILTIN_TOOLS = { + "web_search_preview", + "code_interpreter", + "container", +} + + +def has_custom_tools(tool_types: list[str]) -> bool: + return not set(tool_types).issubset(BUILTIN_TOOLS) + def get_encoding(): global _harmony_encoding @@ -48,10 +64,19 @@ def get_system_message( start_date: Optional[str] = None, browser_description: Optional[str] = None, python_description: Optional[str] = None, + container_description: Optional[str] = None, + instructions: Optional[str] = None, + with_custom_tools: bool = False, ) -> Message: sys_msg_content = SystemContent.new() if model_identity is not None: sys_msg_content = sys_msg_content.with_model_identity(model_identity) + if (instructions is not None + and envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS): + current_identity = sys_msg_content.model_identity + new_identity = (f'{current_identity}\n{instructions}' + if current_identity else instructions) + sys_msg_content = sys_msg_content.with_model_identity(new_identity) if reasoning_effort is not None: sys_msg_content = sys_msg_content.with_reasoning_effort( REASONING_EFFORT[reasoning_effort]) @@ -63,6 +88,14 @@ def get_system_message( sys_msg_content = sys_msg_content.with_tools(browser_description) if python_description is not None: sys_msg_content = sys_msg_content.with_tools(python_description) + if container_description is not None: + sys_msg_content = sys_msg_content.with_tools(container_description) + if not with_custom_tools: + channel_config = sys_msg_content.channel_config + invalid_channel = "commentary" + new_config = ChannelConfig.require_channels( + [c for c in channel_config.valid_channels if c != invalid_channel]) + sys_msg_content = sys_msg_content.with_channel_config(new_config) sys_msg = Message.from_role_and_content(Role.SYSTEM, sys_msg_content) return sys_msg @@ -86,14 +119,17 @@ def get_developer_message( tools: Optional[list[Union[Tool, ChatCompletionToolsParam]]] = None, ) -> Message: dev_msg_content = DeveloperContent.new() - if instructions is not None: + if (instructions is not None + and not envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS): dev_msg_content = dev_msg_content.with_instructions(instructions) if tools is not None: function_tools: list[Union[Tool, ChatCompletionToolsParam]] = [] for tool in tools: - if tool.type in ("web_search_preview", "code_interpreter"): + if tool.type in ("web_search_preview", "code_interpreter", + "container"): # These are built-in tools that are added to the system message. pass + elif tool.type == "function": function_tools.append(tool) else: @@ -136,6 +172,8 @@ def parse_response_input( TextContent(text=text_prefix + c["text"]) for c in content ] msg = Message.from_role_and_contents(role, contents) + if role == "assistant": + msg = msg.with_channel("final") elif response_msg["type"] == "function_call_output": call_id = response_msg["call_id"] call_response: Optional[ResponseFunctionToolCall] = None diff --git a/vllm/entrypoints/openai/serving_responses.py b/vllm/entrypoints/openai/serving_responses.py index a102d4a4a5e6..c5177bdf5375 100644 --- a/vllm/entrypoints/openai/serving_responses.py +++ b/vllm/entrypoints/openai/serving_responses.py @@ -44,8 +44,9 @@ SimpleContext, StreamingHarmonyContext) from vllm.entrypoints.harmony_utils import ( get_developer_message, get_stop_tokens_for_assistant_actions, - get_system_message, get_user_message, parse_output_message, - parse_remaining_state, parse_response_input, render_for_completion) + get_system_message, get_user_message, has_custom_tools, + parse_output_message, parse_remaining_state, parse_response_input, + render_for_completion) from vllm.entrypoints.logger import RequestLogger # yapf conflicts with isort for this block # yapf: disable @@ -266,6 +267,8 @@ async def create_responses( builtin_tool_list.append("browser") if self.tool_server.has_tool("python"): builtin_tool_list.append("python") + if self.tool_server.has_tool("container"): + builtin_tool_list.append("container") if self.tool_server is not None: available_tools = builtin_tool_list @@ -448,7 +451,8 @@ async def responses_full_generator( async with AsyncExitStack() as exit_stack: try: - await context.init_tool_sessions(self.tool_server, exit_stack) + await context.init_tool_sessions(self.tool_server, exit_stack, + request.request_id) async for _ in result_generator: pass except asyncio.CancelledError: @@ -710,13 +714,21 @@ def _construct_input_messages_with_harmony( # New conversation. reasoning_effort = (request.reasoning.effort if request.reasoning else None) + # Temporary: OpenAI types doesn't have container tool + # so we used MCP to cover that, up for change tool_types = [tool.type for tool in request.tools] + if envs.VLLM_GPT_OSS_USE_CONTAINER_TOOL: + tool_types.append("container") enable_browser = ("web_search_preview" in tool_types and self.tool_server is not None and self.tool_server.has_tool("browser")) enable_code_interpreter = ("code_interpreter" in tool_types and self.tool_server is not None and self.tool_server.has_tool("python")) + enable_container = ("container" in tool_types + and self.tool_server is not None + and self.tool_server.has_tool("container")) + with_custom_tools = has_custom_tools(tool_types) sys_msg = get_system_message( reasoning_effort=reasoning_effort, browser_description=self.tool_server.get_tool_description( @@ -725,11 +737,17 @@ def _construct_input_messages_with_harmony( python_description=self.tool_server.get_tool_description( "python") if enable_code_interpreter and self.tool_server is not None else None, + container_description=self.tool_server.get_tool_description( + "container") + if enable_container and self.tool_server is not None else None, + instructions=request.instructions, + with_custom_tools=with_custom_tools, ) messages.append(sys_msg) - dev_msg = get_developer_message(request.instructions, - request.tools) - messages.append(dev_msg) + if with_custom_tools: + dev_msg = get_developer_message( + instructions=request.instructions, tools=request.tools) + messages.append(dev_msg) else: # Continue the previous conversation. # FIXME(woosuk): Currently, request params like reasoning and @@ -1613,7 +1631,8 @@ def _send_event(event: BaseModel): async with AsyncExitStack() as exit_stack: processer = None if self.use_harmony: - await context.init_tool_sessions(self.tool_server, exit_stack) + await context.init_tool_sessions(self.tool_server, exit_stack, + request.request_id) processer = self._process_harmony_streaming_events else: processer = self._process_simple_streaming_events diff --git a/vllm/entrypoints/tool_server.py b/vllm/entrypoints/tool_server.py index 2f28595f27c6..3f413df7108a 100644 --- a/vllm/entrypoints/tool_server.py +++ b/vllm/entrypoints/tool_server.py @@ -86,7 +86,8 @@ def get_tool_description(self, pass @abstractmethod - def new_session(self, tool_name: str) -> AbstractAsyncContextManager[Any]: + def new_session(self, tool_name: str, + session_id: str) -> AbstractAsyncContextManager[Any]: """ Create a session for the tool. """ @@ -124,7 +125,8 @@ async def add_tool_server(self, server_url: str): description=tool.description, parameters=tool.inputSchema) for tool in list_tools_response.tools - ]) + ], + ) self.harmony_tool_descriptions[tool_from_mcp.name] = tool_from_mcp if tool_from_mcp.name not in self.urls: self.urls[tool_from_mcp.name] = url @@ -142,14 +144,16 @@ def get_tool_description(self, tool_name: str): return self.harmony_tool_descriptions.get(tool_name) @asynccontextmanager - async def new_session(self, tool_name: str): + async def new_session(self, tool_name: str, session_id: str): from mcp import ClientSession from mcp.client.sse import sse_client url = self.urls.get(tool_name) + headers = {"x-session-id": session_id} if not url: raise KeyError(f"Tool '{tool_name}' is not supported") - async with sse_client(url=url) as streams, ClientSession( - *streams) as session: + async with sse_client(url=url, + headers=headers) as streams, ClientSession( + *streams) as session: await session.initialize() yield session @@ -182,7 +186,7 @@ def get_tool_description(self, raise ValueError(f"Unknown tool {tool_name}") @asynccontextmanager - async def new_session(self, tool_name: str): + async def new_session(self, tool_name: str, session_id: str): if tool_name not in self.tools: raise KeyError(f"Tool '{tool_name}' is not supported") yield self.tools[tool_name] diff --git a/vllm/envs.py b/vllm/envs.py index 50783eeb95a4..927bea3bf953 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -168,6 +168,8 @@ VLLM_ALLREDUCE_USE_SYMM_MEM: bool = False VLLM_TUNED_CONFIG_FOLDER: Optional[str] = None VLLM_DISABLE_PAD_FOR_CUDAGRAPH: bool = False + VLLM_GPT_OSS_USE_CONTAINER_TOOL: bool = False + VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS: bool = False VLLM_CUSTOM_SCOPES_FOR_PROFILING: bool = False @@ -1201,6 +1203,15 @@ def get_vllm_port() -> Optional[int]: "VLLM_TUNED_CONFIG_FOLDER": lambda: os.getenv("VLLM_TUNED_CONFIG_FOLDER", None), + # Allows vllm use container tool + "VLLM_GPT_OSS_USE_CONTAINER_TOOL": + lambda: bool(int(os.getenv("VLLM_GPT_OSS_USE_CONTAINER_TOOL", "0"))), + + # Allows harmony instructions to be injected on system messages + "VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS": + lambda: bool( + int(os.getenv("VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS", "0"))), + # Add optional custom scopes for profiling, disable to avoid overheads "VLLM_CUSTOM_SCOPES_FOR_PROFILING": lambda: bool(int(os.getenv("VLLM_CUSTOM_SCOPES_FOR_PROFILING", "0"))),