Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 15 additions & 85 deletions python/packages/core/agent_framework/_mcp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.

import json
import logging
import re
import sys
Expand All @@ -19,9 +18,9 @@
from mcp.shared.context import RequestContext
from mcp.shared.exceptions import McpError
from mcp.shared.session import RequestResponder
from pydantic import BaseModel, Field, create_model
from pydantic import BaseModel, create_model

from ._tools import AIFunction, HostedMCPSpecificApproval
from ._tools import AIFunction, HostedMCPSpecificApproval, _build_pydantic_model_from_json_schema
from ._types import (
ChatMessage,
Contents,
Expand Down Expand Up @@ -274,95 +273,26 @@ def _get_input_model_from_mcp_prompt(prompt: types.Prompt) -> type[BaseModel]:
if not prompt.arguments:
return create_model(f"{prompt.name}_input")

field_definitions: dict[str, Any] = {}
for prompt_argument in prompt.arguments:
# For prompts, all arguments are typically required and string type
# unless specified otherwise in the prompt argument
python_type = str # Default type for prompt arguments
# Convert prompt arguments to JSON schema format
properties: dict[str, Any] = {}
required: list[str] = []

# Create field definition for create_model
for prompt_argument in prompt.arguments:
# For prompts, all arguments are typically string type unless specified otherwise
properties[prompt_argument.name] = {
"type": "string",
"description": prompt_argument.description if hasattr(prompt_argument, "description") else "",
}
if prompt_argument.required:
field_definitions[prompt_argument.name] = (python_type, ...)
else:
field_definitions[prompt_argument.name] = (python_type, None)
required.append(prompt_argument.name)

return create_model(f"{prompt.name}_input", **field_definitions)
schema = {"properties": properties, "required": required}
return _build_pydantic_model_from_json_schema(prompt.name, schema)


def _get_input_model_from_mcp_tool(tool: types.Tool) -> type[BaseModel]:
"""Creates a Pydantic model from a tools parameters."""
properties = tool.inputSchema.get("properties", None)
required = tool.inputSchema.get("required", [])
definitions = tool.inputSchema.get("$defs", {})

# Check if 'properties' is missing or not a dictionary
if not properties:
return create_model(f"{tool.name}_input")

def resolve_type(prop_details: dict[str, Any]) -> type:
"""Resolve JSON Schema type to Python type, handling $ref."""
# Handle $ref by resolving the reference
if "$ref" in prop_details:
ref = prop_details["$ref"]
# Extract the reference path (e.g., "#/$defs/CustomerIdParam" -> "CustomerIdParam")
if ref.startswith("#/$defs/"):
def_name = ref.split("/")[-1]
if def_name in definitions:
# Resolve the reference and use its type
resolved = definitions[def_name]
return resolve_type(resolved)
# If we can't resolve the ref, default to dict for safety
return dict

# Map JSON Schema types to Python types
json_type = prop_details.get("type", "string")
match json_type:
case "integer":
return int
case "number":
return float
case "boolean":
return bool
case "array":
return list
case "object":
return dict
case _:
return str # default

field_definitions: dict[str, Any] = {}
for prop_name, prop_details in properties.items():
prop_details = json.loads(prop_details) if isinstance(prop_details, str) else prop_details

python_type = resolve_type(prop_details)
description = prop_details.get("description", "")

# Build field kwargs (description, array items schema, etc.)
field_kwargs: dict[str, Any] = {}
if description:
field_kwargs["description"] = description

# Preserve array items schema if present
if prop_details.get("type") == "array" and "items" in prop_details:
items_schema = prop_details["items"]
if items_schema and items_schema != {}:
field_kwargs["json_schema_extra"] = {"items": items_schema}

# Create field definition for create_model
if prop_name in required:
if field_kwargs:
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, ...)
else:
default_value = prop_details.get("default", None)
field_kwargs["default"] = default_value
if field_kwargs and any(k != "default" for k in field_kwargs):
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, default_value)

return create_model(f"{tool.name}_input", **field_definitions)
return _build_pydantic_model_from_json_schema(tool.name, tool.inputSchema)


def _normalize_mcp_name(name: str) -> str:
Expand Down
169 changes: 146 additions & 23 deletions python/packages/core/agent_framework/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from opentelemetry.metrics import Histogram
from pydantic import AnyUrl, BaseModel, Field, ValidationError, create_model
from pydantic.fields import FieldInfo

from ._logging import get_logger
from ._serialization import SerializationMixin
Expand Down Expand Up @@ -932,6 +931,151 @@ def _create_input_model_from_func(func: Callable[..., Any], name: str) -> type[B
}


def _build_pydantic_model_from_json_schema(
model_name: str,
schema: Mapping[str, Any],
) -> type[BaseModel]:
"""Creates a Pydantic model from JSON Schema with support for $refs, nested objects, and typed arrays.

Args:
model_name: The name of the model to be created.
schema: The JSON Schema definition (should contain 'properties', 'required', '$defs', etc.).

Returns:
The dynamically created Pydantic model class.
"""
properties = schema.get("properties")
required = schema.get("required", [])
definitions = schema.get("$defs", {})

# Check if 'properties' is missing or not a dictionary
if not properties:
return create_model(f"{model_name}_input")

def _resolve_type(prop_details: dict[str, Any], parent_name: str = "") -> type:
"""Resolve JSON Schema type to Python type, handling $ref, nested objects, and typed arrays.

Args:
prop_details: The JSON Schema property details
parent_name: Name to use for creating nested models (for uniqueness)

Returns:
Python type annotation (could be int, str, list[str], or a nested Pydantic model)
"""
# Handle $ref by resolving the reference
if "$ref" in prop_details:
ref = prop_details["$ref"]
# Extract the reference path (e.g., "#/$defs/CustomerIdParam" -> "CustomerIdParam")
if ref.startswith("#/$defs/"):
def_name = ref.split("/")[-1]
if def_name in definitions:
# Resolve the reference and use its type
resolved = definitions[def_name]
return _resolve_type(resolved, def_name)
# If we can't resolve the ref, default to dict for safety
return dict

# Map JSON Schema types to Python types
json_type = prop_details.get("type", "string")
match json_type:
case "integer":
return int
case "number":
return float
case "boolean":
return bool
case "array":
# Handle typed arrays
items_schema = prop_details.get("items")
if items_schema and isinstance(items_schema, dict):
# Recursively resolve the item type
item_type = _resolve_type(items_schema, f"{parent_name}_item")
# Return list[ItemType] instead of bare list
return list[item_type] # type: ignore
# If no items schema or invalid, return bare list
return list
case "object":
# Handle nested objects by creating a nested Pydantic model
nested_properties = prop_details.get("properties")
nested_required = prop_details.get("required", [])

if nested_properties and isinstance(nested_properties, dict):
# Create the name for the nested model
nested_model_name = f"{parent_name}_nested" if parent_name else "NestedModel"

# Recursively build field definitions for the nested model
nested_field_definitions: dict[str, Any] = {}
for nested_prop_name, nested_prop_details in nested_properties.items():
nested_prop_details = (
json.loads(nested_prop_details)
if isinstance(nested_prop_details, str)
else nested_prop_details
)

nested_python_type = _resolve_type(
nested_prop_details, f"{nested_model_name}_{nested_prop_name}"
)
nested_description = nested_prop_details.get("description", "")

# Build field kwargs for nested property
nested_field_kwargs: dict[str, Any] = {}
if nested_description:
nested_field_kwargs["description"] = nested_description

# Create field definition
if nested_prop_name in nested_required:
nested_field_definitions[nested_prop_name] = (
(
nested_python_type,
Field(**nested_field_kwargs),
)
if nested_field_kwargs
else (nested_python_type, ...)
)
else:
nested_field_kwargs["default"] = nested_prop_details.get("default", None)
nested_field_definitions[nested_prop_name] = (
nested_python_type,
Field(**nested_field_kwargs),
)

# Create and return the nested Pydantic model
return create_model(nested_model_name, **nested_field_definitions) # type: ignore

# If no properties defined, return bare dict
return dict
case _:
return str # default

field_definitions: dict[str, Any] = {}
for prop_name, prop_details in properties.items():
prop_details = json.loads(prop_details) if isinstance(prop_details, str) else prop_details

python_type = _resolve_type(prop_details, f"{model_name}_{prop_name}")
description = prop_details.get("description", "")

# Build field kwargs (description, etc.)
field_kwargs: dict[str, Any] = {}
if description:
field_kwargs["description"] = description

# Create field definition for create_model
if prop_name in required:
if field_kwargs:
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, ...)
else:
default_value = prop_details.get("default", None)
field_kwargs["default"] = default_value
if field_kwargs and any(k != "default" for k in field_kwargs):
field_definitions[prop_name] = (python_type, Field(**field_kwargs))
else:
field_definitions[prop_name] = (python_type, default_value)

return create_model(f"{model_name}_input", **field_definitions)


def _create_model_from_json_schema(tool_name: str, schema_json: Mapping[str, Any]) -> type[BaseModel]:
"""Creates a Pydantic model from a given JSON Schema.

Expand All @@ -948,29 +1092,8 @@ def _create_model_from_json_schema(tool_name: str, schema_json: Mapping[str, Any
f"JSON schema for tool '{tool_name}' must contain a 'properties' key of type dict. "
f"Got: {schema_json.get('properties', None)}"
)
# Extract field definitions with type annotations
field_definitions: dict[str, tuple[type, FieldInfo]] = {}
for field_name, field_schema in schema_json["properties"].items():
field_args: dict[str, Any] = {}
if (field_description := field_schema.get("description", None)) is not None:
field_args["description"] = field_description
if (field_default := field_schema.get("default", None)) is not None:
field_args["default"] = field_default
field_type = field_schema.get("type", None)
if field_type is None:
raise ValueError(
f"Missing 'type' for field '{field_name}' in JSON schema. "
f"Got: {field_schema}, Supported types: {list(TYPE_MAPPING.keys())}"
)
python_type = TYPE_MAPPING.get(field_type)
if python_type is None:
raise ValueError(
f"Unsupported type '{field_type}' for field '{field_name}' in JSON schema. "
f"Got: {field_schema}, Supported types: {list(TYPE_MAPPING.keys())}"
)
field_definitions[field_name] = (python_type, Field(**field_args))

return create_model(f"{tool_name}_input", **field_definitions) # type: ignore[call-overload, no-any-return]
return _build_pydantic_model_from_json_schema(tool_name, schema_json)


@overload
Expand Down
2 changes: 1 addition & 1 deletion python/packages/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
# connectors and functions
"openai>=1.99.0",
"azure-identity>=1,<2",
"mcp[ws]>=1.13",
"mcp[ws]>=1.23",
"packaging>=24.1",
]

Expand Down
Loading
Loading