Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pydantic_ai_slim/pydantic_ai/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1792,9 +1792,11 @@ async def __aenter__(self) -> Self:
"""
async with self._enter_lock:
if self._entered_count == 0:
self._exit_stack = AsyncExitStack()
toolset = self._get_toolset()
await self._exit_stack.enter_async_context(toolset)
async with AsyncExitStack() as exit_stack:
toolset = self._get_toolset()
await exit_stack.enter_async_context(toolset)

self._exit_stack = exit_stack.pop_all()
self._entered_count += 1
return self

Expand Down
37 changes: 18 additions & 19 deletions pydantic_ai_slim/pydantic_ai/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,24 @@ async def __aenter__(self) -> Self:
"""
async with self._enter_lock:
if self._running_count == 0:
self._exit_stack = AsyncExitStack()

self._read_stream, self._write_stream = await self._exit_stack.enter_async_context(
self.client_streams()
)
client = ClientSession(
read_stream=self._read_stream,
write_stream=self._write_stream,
sampling_callback=self._sampling_callback if self.allow_sampling else None,
logging_callback=self.log_handler,
read_timeout_seconds=timedelta(seconds=self.read_timeout),
)
self._client = await self._exit_stack.enter_async_context(client)

with anyio.fail_after(self.timeout):
await self._client.initialize()

if log_level := self.log_level:
await self._client.set_logging_level(log_level)
async with AsyncExitStack() as exit_stack:
self._read_stream, self._write_stream = await exit_stack.enter_async_context(self.client_streams())
client = ClientSession(
read_stream=self._read_stream,
write_stream=self._write_stream,
sampling_callback=self._sampling_callback if self.allow_sampling else None,
logging_callback=self.log_handler,
read_timeout_seconds=timedelta(seconds=self.read_timeout),
)
self._client = await exit_stack.enter_async_context(client)

with anyio.fail_after(self.timeout):
await self._client.initialize()

if log_level := self.log_level:
await self._client.set_logging_level(log_level)

self._exit_stack = exit_stack.pop_all()
self._running_count += 1
return self

Expand Down
7 changes: 4 additions & 3 deletions pydantic_ai_slim/pydantic_ai/toolsets/combined.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ def __post_init__(self):
async def __aenter__(self) -> Self:
async with self._enter_lock:
if self._entered_count == 0:
self._exit_stack = AsyncExitStack()
for toolset in self.toolsets:
await self._exit_stack.enter_async_context(toolset)
async with AsyncExitStack() as exit_stack:
for toolset in self.toolsets:
await exit_stack.enter_async_context(toolset)
self._exit_stack = exit_stack.pop_all()
self._entered_count += 1
return self

Expand Down
14 changes: 14 additions & 0 deletions tests/test_mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ async def test_reentrant_context_manager():
pass


async def test_context_manager_initialization_error() -> None:
"""Test if streams are closed if client fails to initialize."""
server = MCPServerStdio('python', ['-m', 'tests.mcp_server'])
from mcp.client.session import ClientSession

with patch.object(ClientSession, 'initialize', side_effect=Exception):
with pytest.raises(Exception):
async with server:
pass

assert server._read_stream._closed # pyright: ignore[reportPrivateUsage]
assert server._write_stream._closed # pyright: ignore[reportPrivateUsage]


async def test_stdio_server_with_tool_prefix(run_context: RunContext[int]):
server = MCPServerStdio('python', ['-m', 'tests.mcp_server'], tool_prefix='foo')
async with server:
Expand Down
25 changes: 25 additions & 0 deletions tests/test_toolsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
from dataclasses import dataclass, replace
from typing import TypeVar
from unittest.mock import AsyncMock

import pytest
from inline_snapshot import snapshot
Expand Down Expand Up @@ -469,3 +470,27 @@ async def test_context_manager():
async with toolset:
assert server1.is_running
assert server2.is_running


class InitializationError(Exception):
pass


async def test_context_manager_failed_initialization():
"""Test if MCP servers stop if any MCP server fails to initialize."""
try:
from pydantic_ai.mcp import MCPServerStdio
except ImportError: # pragma: lax no cover
pytest.skip('mcp is not installed')

server1 = MCPServerStdio('python', ['-m', 'tests.mcp_server'])
server2 = AsyncMock()
server2.__aenter__.side_effect = InitializationError

toolset = CombinedToolset([server1, server2])

with pytest.raises(InitializationError):
async with toolset:
pass

assert server1.is_running is False