Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 79 additions & 10 deletions livekit-agents/livekit/agents/llm/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@
) from e


from .tool_context import RawFunctionTool, ToolError, function_tool
from .tool_context import (
RawFunctionTool,
ToolError,
function_tool,
get_function_info,
get_raw_function_info,
is_function_tool,
is_raw_function_tool,
)

MCPTool = RawFunctionTool

Expand Down Expand Up @@ -149,11 +157,22 @@ def client_streams(

class MCPServerHTTP(MCPServer):
"""
HTTP-based MCP server to detect transport type based on URL path.

- URLs ending with 'sse' use Server-Sent Events (SSE) transport
- URLs ending with 'mcp' use streamable HTTP transport
- For other URLs, defaults to SSE transport for backward compatibility
HTTP-based MCP server with configurable transport type and tool filtering.

Args:
url: The URL of the MCP server
transport_type: Explicit transport type - "sse" or "streamable_http".
If None, transport type is auto-detected from URL path:
- URLs ending with 'sse' use Server-Sent Events (SSE) transport
- URLs ending with 'mcp' use streamable HTTP transport
- For other URLs, defaults to SSE transport for backward compatibility
allowed_tools: Optional list of tool names to filter. If provided, only
tools whose names are in this list will be available. If None, all
tools from the server will be available.
headers: Optional HTTP headers to include in requests
timeout: Connection timeout in seconds (default: 5)
sse_read_timeout: SSE read timeout in seconds (default: 300)
client_session_timeout_seconds: Client session timeout in seconds (default: 5)

Note: SSE transport is being deprecated in favor of streamable HTTP transport.
See: https://github.com/modelcontextprotocol/modelcontextprotocol/pull/206
Expand All @@ -162,6 +181,8 @@ class MCPServerHTTP(MCPServer):
def __init__(
self,
url: str,
transport_type: str | None = None,
allowed_tools: list[str] | None = None,
headers: dict[str, Any] | None = None,
timeout: float = 5,
sse_read_timeout: float = 60 * 5,
Expand All @@ -172,18 +193,29 @@ def __init__(
self.headers = headers
self._timeout = timeout
self._sse_read_timeout = sse_read_timeout
self._use_streamable_http = self._should_use_streamable_http(url)
self._allowed_tools = set(allowed_tools) if allowed_tools else None

# Determine transport type: explicit > URL-based detection
if transport_type is not None:
if transport_type not in ("sse", "streamable_http"):
raise ValueError(
f"transport_type must be 'sse' or 'streamable_http', got '{transport_type}'"
)
self._use_streamable_http = transport_type == "streamable_http"
else:
# Fall back to URL-based detection for backward compatibility
self._use_streamable_http = self._should_use_streamable_http(url)

def _should_use_streamable_http(self, url: str) -> bool:
"""
Determine transport type based on URL path.
Determine transport type based on URL path (for backward compatibility).

Returns True for streamable HTTP if URL ends with 'mcp',
False for SSE if URL ends with 'sse' or for backward compatibility.
"""
parsed_url = urlparse(url)
path_lower = parsed_url.path.lower().rstrip("/")
return path_lower.endswith("mcp")
return path_lower.endswith("/mcp")

def client_streams(
self,
Expand Down Expand Up @@ -213,9 +245,46 @@ def client_streams(
sse_read_timeout=self._sse_read_timeout,
)

async def list_tools(self) -> list[MCPTool]:
"""
List tools from the MCP server, filtered by allowed_tools if specified.
"""
all_tools = await super().list_tools()

# If no filter is set, return all tools
if self._allowed_tools is None:
return all_tools

# Filter tools by name
return self._filter_tools(all_tools)

def _filter_tools(self, tools: list[MCPTool]) -> list[MCPTool]:
"""
Filter tools by allowed_tools if specified.
"""
if self._allowed_tools is None:
return tools

filtered_tools = []
for tool in tools:
# Get tool name based on tool type
if is_function_tool(tool):
tool_name = get_function_info(tool).name
elif is_raw_function_tool(tool):
tool_name = get_raw_function_info(tool).name
else:
# Fallback: skip tools we can't identify
continue

if tool_name in self._allowed_tools:
filtered_tools.append(tool)

return filtered_tools

def __repr__(self) -> str:
transport_type = "streamable_http" if self._use_streamable_http else "sse"
return f"MCPServerHTTP(url={self.url}, transport={transport_type})"
allowed_str = f", allowed_tools={list(self._allowed_tools)}" if self._allowed_tools else ""
return f"MCPServerHTTP(url={self.url}, transport={transport_type}{allowed_str})"


class MCPServerStdio(MCPServer):
Expand Down
Loading