-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Added RFC 6570 complaint form style query expansion as optional param… #427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
79bc4b8
33fd246
01025ea
397569a
543e86c
bfcbf6f
b076a13
7af828a
2ed668c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,24 +4,36 @@ | |
|
||
import inspect | ||
import re | ||
import urllib.parse | ||
from collections.abc import Callable | ||
from typing import Any | ||
|
||
from pydantic import BaseModel, Field, TypeAdapter, validate_call | ||
|
||
from mcp.server.fastmcp.resources.types import FunctionResource, Resource | ||
from mcp.server.fastmcp.utilities.func_metadata import ( | ||
use_defaults_on_optional_validation_error, | ||
) | ||
|
||
|
||
class ResourceTemplate(BaseModel): | ||
"""A template for dynamically creating resources.""" | ||
|
||
uri_template: str = Field(description="URI template with parameters (e.g. weather://{city}/current)") | ||
uri_template: str = Field(description="URI template with parameters (e.g. weather://{city}/current{?units,format})") | ||
name: str = Field(description="Name of the resource") | ||
title: str | None = Field(description="Human-readable title of the resource", default=None) | ||
description: str | None = Field(description="Description of what the resource does") | ||
mime_type: str = Field(default="text/plain", description="MIME type of the resource content") | ||
fn: Callable[..., Any] = Field(exclude=True) | ||
parameters: dict[str, Any] = Field(description="JSON schema for function parameters") | ||
required_params: set[str] = Field( | ||
default_factory=set, | ||
description="Set of required parameters from the path component", | ||
) | ||
optional_params: set[str] = Field( | ||
default_factory=set, | ||
description="Set of optional parameters specified in the query component", | ||
) | ||
|
||
@classmethod | ||
def from_function( | ||
|
@@ -34,40 +46,127 @@ def from_function( | |
mime_type: str | None = None, | ||
) -> ResourceTemplate: | ||
"""Create a template from a function.""" | ||
func_name = name or fn.__name__ | ||
original_fn = fn | ||
func_name = name or original_fn.__name__ | ||
if func_name == "<lambda>": | ||
raise ValueError("You must provide a name for lambda functions") | ||
|
||
# Get schema from TypeAdapter - will fail if function isn't properly typed | ||
parameters = TypeAdapter(fn).json_schema() | ||
# Get schema from TypeAdapter using the original function for correct schema | ||
parameters = TypeAdapter(original_fn).json_schema() | ||
|
||
# ensure the arguments are properly cast | ||
fn = validate_call(fn) | ||
# First, apply pydantic's validation and coercion | ||
validated_fn = validate_call(original_fn) | ||
|
||
# Then, apply our decorator to handle default fallback for optional params | ||
final_fn = use_defaults_on_optional_validation_error(validated_fn) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove, we should fail fast and not silently when invalid params are provided by clients. |
||
|
||
# Extract required and optional params from the original function's signature | ||
required_params, optional_params = cls._analyze_function_params(original_fn) | ||
|
||
# Extract path parameters from URI template | ||
path_params: set[str] = set(re.findall(r"{(\w+)}", re.sub(r"{(\?.+?)}", "", uri_template))) | ||
|
||
# Extract query parameters from the URI template if present | ||
query_param_match = re.search(r"{(\?(?:\w+,)*\w+)}", uri_template) | ||
query_params: set[str] = set() | ||
if query_param_match: | ||
# Extract query parameters from {?param1,param2,...} syntax | ||
query_str = query_param_match.group(1) | ||
query_params = set(query_str[1:].split(",")) # Remove the leading '?' and split | ||
|
||
# Validate path parameters match required function parameters | ||
if path_params != required_params: | ||
raise ValueError( | ||
f"Mismatch between URI path parameters {path_params} and required function parameters {required_params}" | ||
) | ||
|
||
# Validate query parameters are a subset of optional function parameters | ||
if not query_params.issubset(optional_params): | ||
invalid_params: set[str] = query_params - optional_params | ||
raise ValueError( | ||
f"Query parameters {invalid_params} do not match optional function parameters {optional_params}" | ||
) | ||
|
||
return cls( | ||
uri_template=uri_template, | ||
name=func_name, | ||
title=title, | ||
description=description or fn.__doc__ or "", | ||
description=description or original_fn.__doc__ or "", | ||
mime_type=mime_type or "text/plain", | ||
fn=fn, | ||
fn=final_fn, | ||
parameters=parameters, | ||
required_params=required_params, | ||
optional_params=optional_params, | ||
) | ||
|
||
@staticmethod | ||
def _analyze_function_params(fn: Callable[..., Any]) -> tuple[set[str], set[str]]: | ||
"""Analyze function signature to extract required and optional parameters. | ||
This should operate on the original, unwrapped function. | ||
""" | ||
# Ensure we are looking at the original function if it was wrapped elsewhere | ||
original_fn_for_analysis = inspect.unwrap(fn) | ||
required_params: set[str] = set() | ||
optional_params: set[str] = set() | ||
|
||
signature = inspect.signature(original_fn_for_analysis) | ||
for name, param in signature.parameters.items(): | ||
# Parameters with default values are optional | ||
if param.default is param.empty: | ||
required_params.add(name) | ||
else: | ||
optional_params.add(name) | ||
|
||
return required_params, optional_params | ||
|
||
def matches(self, uri: str) -> dict[str, Any] | None: | ||
"""Check if URI matches template and extract parameters.""" | ||
# Convert template to regex pattern | ||
pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)") | ||
match = re.match(f"^{pattern}$", uri) | ||
if match: | ||
return match.groupdict() | ||
return None | ||
# Split URI into path and query parts | ||
if "?" in uri: | ||
path, query = uri.split("?", 1) | ||
else: | ||
path, query = uri, "" | ||
|
||
# Remove the query parameter part from the template for matching | ||
path_template = re.sub(r"{(\?.+?)}", "", self.uri_template) | ||
|
||
# Convert template to regex pattern for path part | ||
pattern = path_template.replace("{", "(?P<").replace("}", ">[^/]+)") | ||
match = re.match(f"^{pattern}$", path) | ||
|
||
if not match: | ||
return None | ||
|
||
# Extract path parameters | ||
params = match.groupdict() | ||
|
||
# Parse and add query parameters if present | ||
if query: | ||
query_params = urllib.parse.parse_qs(query) | ||
for key, value in query_params.items(): | ||
if key in self.optional_params: | ||
# Use the first value if multiple are provided | ||
params[key] = value[0] if value else None | ||
|
||
return params | ||
|
||
async def create_resource(self, uri: str, params: dict[str, Any]) -> Resource: | ||
"""Create a resource from the template with the given parameters.""" | ||
try: | ||
# Call function and check if result is a coroutine | ||
result = self.fn(**params) | ||
# Prepare parameters for function call | ||
# For optional parameters not in URL, use their default values | ||
|
||
# First add extracted parameters | ||
fn_params = { | ||
name: value | ||
for name, value in params.items() | ||
if name in self.required_params or name in self.optional_params | ||
} | ||
|
||
# self.fn is now multiply-decorated: | ||
# 1. validate_call for coercion/validation | ||
# 2. our new decorator for default fallback on optional param validation err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't use a fallback, just fail if the optional params are supplied incorrectly |
||
result = self.fn(**fn_params) | ||
if inspect.iscoroutine(result): | ||
result = await result | ||
|
||
|
@@ -80,4 +179,6 @@ async def create_resource(self, uri: str, params: dict[str, Any]) -> Resource: | |
fn=lambda: result, # Capture result in closure | ||
) | ||
except Exception as e: | ||
# This will catch errors from validate_call (e.g., for required params) | ||
# or from our decorator if retry also fails, or any other errors. | ||
raise ValueError(f"Error creating resource from template: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
original_fn
,fn
,final_fn
- this gets quite confusing.I think if we remove the
use_defaults_on_optional_validation_error
we shouldn't need any of these and can just stick tofn