From 234ec9d5007aeb43b4ef7dd14af903f6f64e81bc Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 23 Sep 2025 15:27:38 +0200 Subject: [PATCH 01/11] Add MCPTool/MCPToolset warm_up --- .../tools/mcp/mcp_tool.py | 58 ++++++++++++++++++- .../tools/mcp/mcp_toolset.py | 40 +++++++++++-- integrations/mcp/tests/test_mcp_tool.py | 26 +++++++++ 3 files changed, 119 insertions(+), 5 deletions(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index c43f25a6bc..9db8313c07 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -814,6 +814,7 @@ def __init__( description: str | None = None, connection_timeout: int = 30, invocation_timeout: int = 30, + eager_connect: bool = True, ): """ Initialize the MCP tool. @@ -823,6 +824,9 @@ def __init__( :param description: Custom description (if None, server description will be used) :param connection_timeout: Timeout in seconds for server connection :param invocation_timeout: Default timeout in seconds for tool invocations + :param eager_connect: If True (default), connect to server during initialization. + If False, defer connection until warm up or first tool use + whichever comes first. :raises MCPConnectionError: If connection to the server fails :raises MCPToolNotFoundError: If no tools are available or the requested tool is not found :raises TimeoutError: If connection times out @@ -832,6 +836,19 @@ def __init__( self._server_info = server_info self._connection_timeout = connection_timeout self._invocation_timeout = invocation_timeout + self._eager_connect = eager_connect + self._client: MCPClient | None = None + self._worker: _MCPClientSessionManager | None = None + self._lock = threading.RLock() + + # don't connect now; initialize permissively + if not eager_connect: + # Permissive placeholder JSON Schema so the Tool is valid + # without discovering the remote schema during validation. + # Replaced with the strict schema on first use. + params = {"type": "object", "properties": {}, "additionalProperties": True} + super().__init__(name=name, description=description or "", parameters=params, function=self._invoke_tool) + return logger.debug(f"TOOL: Initializing MCPTool '{name}'") @@ -906,9 +923,15 @@ def _invoke_tool(self, **kwargs: Any) -> str: """ logger.debug(f"TOOL: Invoking tool '{self.name}' with args: {kwargs}") try: + # Connect on first use if eager_connect is turned off + if not self._eager_connect: + self._ensure_connected() async def invoke(): logger.debug(f"TOOL: Inside invoke coroutine for '{self.name}'") + # This should never happen, and mypy doesn't know that + if self._client is None: + raise MCPConnectionError(message="Not connected to an MCP server", operation="call_tool") result = await asyncio.wait_for( self._client.call_tool(self.name, kwargs), timeout=self._invocation_timeout ) @@ -939,7 +962,10 @@ async def ainvoke(self, **kwargs: Any) -> str: :raises TimeoutError: If the operation times out """ try: - return await asyncio.wait_for(self._client.call_tool(self.name, kwargs), timeout=self._invocation_timeout) + if not self._eager_connect: + self._ensure_connected() + client = cast(MCPClient, self._client) + return await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout) except asyncio.TimeoutError as e: message = f"Tool invocation timed out after {self._invocation_timeout} seconds" raise TimeoutError(message) from e @@ -949,6 +975,33 @@ async def ainvoke(self, **kwargs: Any) -> str: message = f"Failed to invoke tool '{self.name}' with args: {kwargs} , got error: {e!s}" raise MCPInvocationError(message, self.name, kwargs) from e + def warm_up(self) -> None: + """Connect and fetch the tool schema eager_connect is turned off.""" + if self._eager_connect: + return + self._ensure_connected() + + def _ensure_connected(self) -> None: + """Establish connection if not connected eager_connect is turned off.""" + with self._lock: + if self._client is not None: + return + client = self._server_info.create_client() + worker = _MCPClientSessionManager(client, timeout=self._connection_timeout) + tools = worker.tools() + tool = next((t for t in tools if t.name == self.name), None) + if tool is None: + available = [t.name for t in tools] + msg = f"Tool '{self.name}' not found on server. Available tools: {', '.join(available)}" + raise MCPToolNotFoundError(msg, tool_name=self.name, available_tools=available) + # Publish connection and tighten parameters for better prompting + self._client = client + self._worker = worker + try: + self.parameters = tool.inputSchema + except Exception as e: + logger.debug(f"TOOL: Could not update strict parameters after connect: {e!s}") + def to_dict(self) -> dict[str, Any]: """ Serializes the MCPTool to a dictionary. @@ -966,6 +1019,7 @@ def to_dict(self) -> dict[str, Any]: "server_info": self._server_info.to_dict(), "connection_timeout": self._connection_timeout, "invocation_timeout": self._invocation_timeout, + "eager_connect": self._eager_connect, } return { "type": generate_qualified_class_name(type(self)), @@ -998,6 +1052,7 @@ def from_dict(cls, data: dict[str, Any]) -> "Tool": # Handle backward compatibility for timeout parameters connection_timeout = inner_data.get("connection_timeout", 30) invocation_timeout = inner_data.get("invocation_timeout", 30) + eager_connect = inner_data.get("eager_connect", True) # Create a new MCPTool instance with the deserialized parameters # This will establish a new connection to the MCP server @@ -1007,6 +1062,7 @@ def from_dict(cls, data: dict[str, Any]) -> "Tool": server_info=server_info, connection_timeout=connection_timeout, invocation_timeout=invocation_timeout, + eager_connect=eager_connect, ) def close(self): diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py index b13f87e856..912a25e966 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py @@ -120,6 +120,7 @@ def __init__( tool_names: list[str] | None = None, connection_timeout: float = 30.0, invocation_timeout: float = 30.0, + eager_connect: bool = True, ): """ Initialize the MCP toolset. @@ -129,6 +130,8 @@ def __init__( matching names will be added to the toolset. :param connection_timeout: Timeout in seconds for server connection :param invocation_timeout: Default timeout in seconds for tool invocations + :param eager_connect: If True (default), connect to server and load tools during initialization. + If False, defer connection to warm up. :raises MCPToolNotFoundError: If any of the specified tool names are not found on the server """ # Store configuration @@ -136,8 +139,37 @@ def __init__( self.tool_names = tool_names self.connection_timeout = connection_timeout self.invocation_timeout = invocation_timeout + self.eager_connect = eager_connect + self.warmup_called = False + + if not eager_connect: + # Do not connect during validation; expose a toolset with one fake tool to pass validation + placeholder_tool = Tool( + name=f"mcp_not_connected_placeholder_{id(self)}", + description="Placeholder tool initialised when eager_connect is turned off", + parameters={"type": "object", "properties": {}, "additionalProperties": True}, + function=lambda: None, + ) + super().__init__(tools=[placeholder_tool]) + else: + tools = self._connect_and_load_tools() + super().__init__(tools=tools) + + def warm_up(self) -> None: + """Connect and load tools when running eager_connect is turned off. + + Call this before handing the toolset to components like ``ToolInvoker`` so that + each tool's schema is available without performing a real invocation. + """ + if self.eager_connect or self.warmup_called: + return + + # connect and load tools never adds duplicate tools, set the tools attribute directly + self.tools = self._connect_and_load_tools() + self.warmup_called = True - # Connect and load tools + def _connect_and_load_tools(self) -> list[Tool]: + """Connect and load tools.""" try: # Create the client and spin up a worker so open/close happen in the # same coroutine, avoiding AnyIO cancel-scope issues. @@ -195,9 +227,7 @@ def invoke_tool(**kwargs: Any) -> Any: ) haystack_tools.append(tool) - # Initialize parent class with complete tools list - super().__init__(tools=haystack_tools) - + return haystack_tools except Exception as e: # We need to close because we could connect properly, retrieve tools yet # fail because of an MCPToolNotFoundError @@ -273,6 +303,7 @@ def to_dict(self) -> dict[str, Any]: "tool_names": self.tool_names, "connection_timeout": self.connection_timeout, "invocation_timeout": self.invocation_timeout, + "eager_connect": self.eager_connect, }, } @@ -297,6 +328,7 @@ def from_dict(cls, data: dict[str, Any]) -> "MCPToolset": tool_names=inner_data.get("tool_names"), connection_timeout=inner_data.get("connection_timeout", 30.0), invocation_timeout=inner_data.get("invocation_timeout", 30.0), + eager_connect=inner_data.get("eager_connect", True), ) def close(self): diff --git a/integrations/mcp/tests/test_mcp_tool.py b/integrations/mcp/tests/test_mcp_tool.py index a2e0bf5935..1fe3556888 100644 --- a/integrations/mcp/tests/test_mcp_tool.py +++ b/integrations/mcp/tests/test_mcp_tool.py @@ -124,3 +124,29 @@ def test_mcp_tool_serde(self, mcp_tool_cleanup): } assert isinstance(new_tool._server_info, InMemoryServerInfo) + + def test_mcp_tool_lazy_connect_updates_schema(self, mcp_tool_cleanup): + """ + When eager_connect=False, constructor should not discover schema. + First use should connect and then tighten parameters to strict schema. + """ + server_info = InMemoryServerInfo(server=calculator_mcp._mcp_server) + + tool = MCPTool(name="add", server_info=server_info, eager_connect=False) + mcp_tool_cleanup(tool) + + # Initially permissive schema (no discovery on __init__) + assert tool.parameters == {"type": "object", "properties": {}, "additionalProperties": True} + + # First invocation connects lazily and should succeed + result_json = tool.invoke(a=2, b=5) + result = json.loads(result_json) + assert result["content"][0]["text"] == "7" + + # After first use, parameters should be tightened to the strict schema from the server + assert tool.parameters == { + "properties": {"a": {"title": "A", "type": "integer"}, "b": {"title": "B", "type": "integer"}}, + "required": ["a", "b"], + "title": "addArguments", + "type": "object", + } From d504eb80fb80a4afcd604566a8ad75d4beea97ed Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 20 Oct 2025 14:57:45 +0200 Subject: [PATCH 02/11] Update haystack dependency --- integrations/mcp/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/mcp/pyproject.toml b/integrations/mcp/pyproject.toml index ed7d66f509..edd35f6b32 100644 --- a/integrations/mcp/pyproject.toml +++ b/integrations/mcp/pyproject.toml @@ -29,7 +29,7 @@ classifiers = [ ] dependencies = [ "mcp>=1.8.0", - "haystack-ai>=2.18.0", + "haystack-ai>=2.19.0", "exceptiongroup", # Backport of ExceptionGroup for Python < 3.11 "httpx" # HTTP client library used for SSE connections ] From 6ca0335a338df81f35d28ac75b44f9625aceab16 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 20 Oct 2025 15:12:27 +0200 Subject: [PATCH 03/11] Make eager_connect False by default --- .../src/haystack_integrations/tools/mcp/mcp_tool.py | 6 +++--- .../src/haystack_integrations/tools/mcp/mcp_toolset.py | 6 +++--- integrations/mcp/tests/test_mcp_tool.py | 10 ++++++---- integrations/mcp/tests/test_mcp_toolset.py | 9 +++++++-- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 9db8313c07..257f6c565b 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -814,7 +814,7 @@ def __init__( description: str | None = None, connection_timeout: int = 30, invocation_timeout: int = 30, - eager_connect: bool = True, + eager_connect: bool = False, ): """ Initialize the MCP tool. @@ -824,8 +824,8 @@ def __init__( :param description: Custom description (if None, server description will be used) :param connection_timeout: Timeout in seconds for server connection :param invocation_timeout: Default timeout in seconds for tool invocations - :param eager_connect: If True (default), connect to server during initialization. - If False, defer connection until warm up or first tool use + :param eager_connect: If True, connect to server during initialization. + If False (default), defer connection until warm_up or first tool use, whichever comes first. :raises MCPConnectionError: If connection to the server fails :raises MCPToolNotFoundError: If no tools are available or the requested tool is not found diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py index 912a25e966..cc13d72b1a 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py @@ -120,7 +120,7 @@ def __init__( tool_names: list[str] | None = None, connection_timeout: float = 30.0, invocation_timeout: float = 30.0, - eager_connect: bool = True, + eager_connect: bool = False, ): """ Initialize the MCP toolset. @@ -130,8 +130,8 @@ def __init__( matching names will be added to the toolset. :param connection_timeout: Timeout in seconds for server connection :param invocation_timeout: Default timeout in seconds for tool invocations - :param eager_connect: If True (default), connect to server and load tools during initialization. - If False, defer connection to warm up. + :param eager_connect: If True, connect to server and load tools during initialization. + If False (default), defer connection to warm_up. :raises MCPToolNotFoundError: If any of the specified tool names are not found on the server """ # Store configuration diff --git a/integrations/mcp/tests/test_mcp_tool.py b/integrations/mcp/tests/test_mcp_tool.py index 1fe3556888..e2e04e5d87 100644 --- a/integrations/mcp/tests/test_mcp_tool.py +++ b/integrations/mcp/tests/test_mcp_tool.py @@ -26,21 +26,21 @@ def mcp_add_tool(self, mcp_tool_cleanup): """Provides an MCPTool instance for the 'add' tool using the in-memory calculator server.""" server_info = InMemoryServerInfo(server=calculator_mcp._mcp_server) # The MCPTool constructor will fetch the tool's schema from the in-memory server - tool = MCPTool(name="add", server_info=server_info) + tool = MCPTool(name="add", server_info=server_info, eager_connect=True) return mcp_tool_cleanup(tool) @pytest.fixture def mcp_echo_tool(self, mcp_tool_cleanup): """Provides an MCPTool instance for the 'echo' tool using the in-memory echo server.""" server_info = InMemoryServerInfo(server=echo_mcp._mcp_server) - tool = MCPTool(name="echo", server_info=server_info) + tool = MCPTool(name="echo", server_info=server_info, eager_connect=True) return mcp_tool_cleanup(tool) @pytest.fixture def mcp_error_tool(self, mcp_tool_cleanup): """Provides an MCPTool instance for the 'divide_by_zero' tool for error testing.""" server_info = InMemoryServerInfo(server=calculator_mcp._mcp_server) - tool = MCPTool(name="divide_by_zero", server_info=server_info) + tool = MCPTool(name="divide_by_zero", server_info=server_info, eager_connect=True) return mcp_tool_cleanup(tool) # New tests using in-memory approach will be added below @@ -90,7 +90,9 @@ def test_mcp_tool_serde(self, mcp_tool_cleanup): """Test serialization and deserialization of MCPTool with in-memory server.""" server_info = InMemoryServerInfo(server=calculator_mcp._mcp_server) - tool = MCPTool(name="add", server_info=server_info, description="Addition tool for serde testing") + tool = MCPTool( + name="add", server_info=server_info, description="Addition tool for serde testing", eager_connect=True + ) # Register tool for cleanup mcp_tool_cleanup(tool) diff --git a/integrations/mcp/tests/test_mcp_toolset.py b/integrations/mcp/tests/test_mcp_toolset.py index 68df52d28d..e887356fbf 100644 --- a/integrations/mcp/tests/test_mcp_toolset.py +++ b/integrations/mcp/tests/test_mcp_toolset.py @@ -38,6 +38,7 @@ async def calculator_toolset(mcp_tool_cleanup): server_info=server_info, connection_timeout=45, invocation_timeout=60, + eager_connect=True, ) return mcp_tool_cleanup(toolset) @@ -52,6 +53,7 @@ async def echo_toolset(mcp_tool_cleanup): server_info=server_info, connection_timeout=45, invocation_timeout=60, + eager_connect=True, ) return mcp_tool_cleanup(toolset) @@ -67,6 +69,7 @@ async def calculator_toolset_with_tool_filter(mcp_tool_cleanup): tool_names=["add"], # Only include the 'add' tool connection_timeout=45, invocation_timeout=60, + eager_connect=True, ) return mcp_tool_cleanup(toolset) @@ -211,6 +214,7 @@ async def test_toolset_error_handling(self, mock_create_client): server_info=server_info, connection_timeout=1.0, invocation_timeout=1.0, + eager_connect=True, ) async def test_toolset_tool_not_found(self): @@ -223,6 +227,7 @@ async def test_toolset_tool_not_found(self): tool_names=["non_existent_tool"], connection_timeout=10, invocation_timeout=10, + eager_connect=True, ) @@ -280,7 +285,7 @@ def subtract(a: int, b: int) -> int: # Create the toolset server_info = SSEServerInfo(base_url=f"http://127.0.0.1:{port}") - toolset = MCPToolset(server_info=server_info) + toolset = MCPToolset(server_info=server_info, eager_connect=True) # Verify we got both tools assert len(toolset) == 2 @@ -381,7 +386,7 @@ def subtract(a: int, b: int) -> int: # Create the toolset - note the /mcp endpoint for streamable-http server_info = StreamableHttpServerInfo(url=f"http://127.0.0.1:{port}/mcp") - toolset = MCPToolset(server_info=server_info) + toolset = MCPToolset(server_info=server_info, eager_connect=True) # Verify we got both tools assert len(toolset) == 2 From a3b5dc42af1d6986bffa8195b9b36e2dc74b5cd3 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 20 Oct 2025 15:47:29 +0200 Subject: [PATCH 04/11] Add real integration tests for warm_up --- integrations/mcp/tests/test_mcp_tool.py | 29 ++++++++++++++++++++++ integrations/mcp/tests/test_mcp_toolset.py | 28 ++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/integrations/mcp/tests/test_mcp_tool.py b/integrations/mcp/tests/test_mcp_tool.py index e2e04e5d87..7cf839e1c4 100644 --- a/integrations/mcp/tests/test_mcp_tool.py +++ b/integrations/mcp/tests/test_mcp_tool.py @@ -1,11 +1,17 @@ import json +import os import pytest +from haystack.components.agents import Agent +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.core.pipeline import Pipeline +from haystack.dataclasses import ChatMessage from haystack.tools.errors import ToolInvocationError from haystack.tools.from_function import tool from haystack_integrations.tools.mcp import ( MCPTool, + StdioServerInfo, ) from .mcp_memory_transport import InMemoryServerInfo @@ -152,3 +158,26 @@ def test_mcp_tool_lazy_connect_updates_schema(self, mcp_tool_cleanup): "title": "addArguments", "type": "object", } + + @pytest.mark.skipif("OPENAI_API_KEY" not in os.environ, reason="OPENAI_API_KEY not set") + @pytest.mark.integration + def test_pipeline_warmup_with_mcp_tool(self): + """Test lazy connection with Pipeline.warm_up() - replicates time_pipeline.py.""" + + # Replicate time_pipeline.py using MCPTool instead of MCPToolset + server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"]) + + # Create tool with lazy connection (default behavior) + tool = MCPTool(name="get_current_time", server_info=server_info) + try: + # Build pipeline with Agent, Pipeline will warm up the tool in the agent automatically + agent = Agent(chat_generator=OpenAIChatGenerator(model="gpt-4.1-mini"), tools=[tool]) + pipeline = Pipeline() + pipeline.add_component("agent", agent) + + user_input_msg = ChatMessage.from_user(text="What is the time in New York?") + result = pipeline.run({"agent": {"messages": [user_input_msg]}}) + assert "New York" in result["agent"]["messages"][3].text + finally: + if tool: + tool.close() diff --git a/integrations/mcp/tests/test_mcp_toolset.py b/integrations/mcp/tests/test_mcp_toolset.py index e887356fbf..625df58a71 100644 --- a/integrations/mcp/tests/test_mcp_toolset.py +++ b/integrations/mcp/tests/test_mcp_toolset.py @@ -11,10 +11,13 @@ import pytest import pytest_asyncio from haystack import logging +from haystack.components.agents import Agent +from haystack.components.generators.chat import OpenAIChatGenerator from haystack.core.pipeline import Pipeline +from haystack.dataclasses import ChatMessage from haystack.tools import Tool -from haystack_integrations.tools.mcp import MCPToolset +from haystack_integrations.tools.mcp import MCPToolset, StdioServerInfo from haystack_integrations.tools.mcp.mcp_tool import ( MCPConnectionError, MCPToolNotFoundError, @@ -230,6 +233,29 @@ async def test_toolset_tool_not_found(self): eager_connect=True, ) + @pytest.mark.skipif("OPENAI_API_KEY" not in os.environ, reason="OPENAI_API_KEY not set") + @pytest.mark.integration + async def test_pipeline_warmup_with_mcp_toolset(self): + """Test lazy connection with Pipeline.warm_up() - replicates time_pipeline.py.""" + + # Replicate time_pipeline.py using calculator instead of time server + server_info = StdioServerInfo(command="uvx", args=["mcp-server-time", "--local-timezone=Europe/Berlin"]) + + # Create toolset with lazy connection (default behavior) + toolset = MCPToolset(server_info=server_info) + try: + # Build pipeline exactly like time_pipeline.py + agent = Agent(chat_generator=OpenAIChatGenerator(model="gpt-4.1-mini"), tools=toolset) + pipeline = Pipeline() + pipeline.add_component("agent", agent) + + user_input_msg = ChatMessage.from_user(text="What is the time in New York?") + result = pipeline.run({"agent": {"messages": [user_input_msg]}}) + assert "New York" in result["agent"]["messages"][3].text + finally: + if toolset: + toolset.close() + @pytest.mark.integration class TestMCPToolsetIntegration: From ca198b3a485e63d95bb1b6e0c3361abb8b19cb59 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 20 Oct 2025 15:58:14 +0200 Subject: [PATCH 05/11] Fix one test --- integrations/mcp/tests/test_mcp_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/mcp/tests/test_mcp_integration.py b/integrations/mcp/tests/test_mcp_integration.py index 04601624fd..484a34109b 100644 --- a/integrations/mcp/tests/test_mcp_integration.py +++ b/integrations/mcp/tests/test_mcp_integration.py @@ -234,7 +234,7 @@ def test_mcp_tool_error_handling_integration(self): # Use a non-existent server address to force a connection error server_info = SSEServerInfo(base_url="http://localhost:9999", timeout=1) # Short timeout with pytest.raises(MCPConnectionError) as exc_info: - MCPTool(name="non_existent_tool", server_info=server_info, connection_timeout=2) + MCPTool(name="non_existent_tool", server_info=server_info, connection_timeout=2, eager_connect=True) # Check for platform-agnostic error message patterns error_message = str(exc_info.value) From 54a607ef8783e1175e53ac04d97b992335ac15b0 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Mon, 20 Oct 2025 16:05:25 +0200 Subject: [PATCH 06/11] Remove test --- integrations/mcp/tests/test_mcp_tool.py | 26 ------------------------- 1 file changed, 26 deletions(-) diff --git a/integrations/mcp/tests/test_mcp_tool.py b/integrations/mcp/tests/test_mcp_tool.py index 7cf839e1c4..111a58c07a 100644 --- a/integrations/mcp/tests/test_mcp_tool.py +++ b/integrations/mcp/tests/test_mcp_tool.py @@ -133,32 +133,6 @@ def test_mcp_tool_serde(self, mcp_tool_cleanup): assert isinstance(new_tool._server_info, InMemoryServerInfo) - def test_mcp_tool_lazy_connect_updates_schema(self, mcp_tool_cleanup): - """ - When eager_connect=False, constructor should not discover schema. - First use should connect and then tighten parameters to strict schema. - """ - server_info = InMemoryServerInfo(server=calculator_mcp._mcp_server) - - tool = MCPTool(name="add", server_info=server_info, eager_connect=False) - mcp_tool_cleanup(tool) - - # Initially permissive schema (no discovery on __init__) - assert tool.parameters == {"type": "object", "properties": {}, "additionalProperties": True} - - # First invocation connects lazily and should succeed - result_json = tool.invoke(a=2, b=5) - result = json.loads(result_json) - assert result["content"][0]["text"] == "7" - - # After first use, parameters should be tightened to the strict schema from the server - assert tool.parameters == { - "properties": {"a": {"title": "A", "type": "integer"}, "b": {"title": "B", "type": "integer"}}, - "required": ["a", "b"], - "title": "addArguments", - "type": "object", - } - @pytest.mark.skipif("OPENAI_API_KEY" not in os.environ, reason="OPENAI_API_KEY not set") @pytest.mark.integration def test_pipeline_warmup_with_mcp_tool(self): From 8b3af278638606fa93ff71dfd9859a4127f60525 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 21 Oct 2025 09:58:41 +0200 Subject: [PATCH 07/11] Update from_dict to use eager_connect False by default --- .../mcp/src/haystack_integrations/tools/mcp/mcp_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 257f6c565b..30ba0437bb 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -1052,7 +1052,7 @@ def from_dict(cls, data: dict[str, Any]) -> "Tool": # Handle backward compatibility for timeout parameters connection_timeout = inner_data.get("connection_timeout", 30) invocation_timeout = inner_data.get("invocation_timeout", 30) - eager_connect = inner_data.get("eager_connect", True) + eager_connect = inner_data.get("eager_connect", False) # because False is the default # Create a new MCPTool instance with the deserialized parameters # This will establish a new connection to the MCP server From 1c50c9586662e124d1270fa9a9ae7374bcea01e2 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 21 Oct 2025 10:00:41 +0200 Subject: [PATCH 08/11] Comment update --- .../mcp/src/haystack_integrations/tools/mcp/mcp_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 30ba0437bb..17800733e6 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -845,7 +845,7 @@ def __init__( if not eager_connect: # Permissive placeholder JSON Schema so the Tool is valid # without discovering the remote schema during validation. - # Replaced with the strict schema on first use. + # Tool parameters/schema will be replaced with the correct schema (from the MCP server) on first use. params = {"type": "object", "properties": {}, "additionalProperties": True} super().__init__(name=name, description=description or "", parameters=params, function=self._invoke_tool) return From a01c069daf32b22ed8441e85518b4efa46126c2e Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 21 Oct 2025 10:16:36 +0200 Subject: [PATCH 09/11] Lint --- .../mcp/src/haystack_integrations/tools/mcp/mcp_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 17800733e6..9f4dfd8151 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -1052,7 +1052,7 @@ def from_dict(cls, data: dict[str, Any]) -> "Tool": # Handle backward compatibility for timeout parameters connection_timeout = inner_data.get("connection_timeout", 30) invocation_timeout = inner_data.get("invocation_timeout", 30) - eager_connect = inner_data.get("eager_connect", False) # because False is the default + eager_connect = inner_data.get("eager_connect", False) # because False is the default # Create a new MCPTool instance with the deserialized parameters # This will establish a new connection to the MCP server From b9ad31606d6ced5fb0546278b539441367f3c96b Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Tue, 21 Oct 2025 13:55:52 +0200 Subject: [PATCH 10/11] Make warm_up single point of entry for ensuring connection --- .../src/haystack_integrations/tools/mcp/mcp_tool.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 9f4dfd8151..fd0787c9c0 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -924,8 +924,7 @@ def _invoke_tool(self, **kwargs: Any) -> str: logger.debug(f"TOOL: Invoking tool '{self.name}' with args: {kwargs}") try: # Connect on first use if eager_connect is turned off - if not self._eager_connect: - self._ensure_connected() + self.warm_up() async def invoke(): logger.debug(f"TOOL: Inside invoke coroutine for '{self.name}'") @@ -962,8 +961,7 @@ async def ainvoke(self, **kwargs: Any) -> str: :raises TimeoutError: If the operation times out """ try: - if not self._eager_connect: - self._ensure_connected() + self.warm_up() client = cast(MCPClient, self._client) return await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout) except asyncio.TimeoutError as e: @@ -976,13 +974,9 @@ async def ainvoke(self, **kwargs: Any) -> str: raise MCPInvocationError(message, self.name, kwargs) from e def warm_up(self) -> None: - """Connect and fetch the tool schema eager_connect is turned off.""" + """Connect and fetch the tool schema if eager_connect is turned off.""" if self._eager_connect: return - self._ensure_connected() - - def _ensure_connected(self) -> None: - """Establish connection if not connected eager_connect is turned off.""" with self._lock: if self._client is not None: return From e30ae1d20b0b7ecc2c7fde558e3ede57dfe74874 Mon Sep 17 00:00:00 2001 From: Vladimir Blagojevic Date: Wed, 22 Oct 2025 13:19:58 +0200 Subject: [PATCH 11/11] PR feedback --- .../tools/mcp/mcp_tool.py | 88 ++++++++----------- .../tools/mcp/mcp_toolset.py | 14 +-- .../tests/test_mcp_timeout_reconnection.py | 68 +++++++------- 3 files changed, 79 insertions(+), 91 deletions(-) diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py index 41d54bbb81..425346d57f 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_tool.py @@ -853,35 +853,10 @@ def __init__( logger.debug(f"TOOL: Initializing MCPTool '{name}'") try: - # Create client and spin up a long-lived worker that keeps the - # connect/close lifecycle inside one coroutine. - self._client = server_info.create_client() - logger.debug(f"TOOL: Created client for MCPTool '{name}'") - - # The worker starts immediately and blocks here until the connection - # is established (or fails), returning the tool list. - self._worker = _MCPClientSessionManager(self._client, timeout=connection_timeout) - - tools = self._worker.tools() - # Handle no tools case - if not tools: - logger.debug(f"TOOL: No tools found for '{name}'") - message = "No tools available on server" - raise MCPToolNotFoundError(message, tool_name=name) - - # Find the specified tool - tool_dict = {t.name: t for t in tools} - logger.debug(f"TOOL: Available tools: {list(tool_dict.keys())}") - - tool_info: types.Tool | None = tool_dict.get(name) - - if not tool_info: - available = list(tool_dict.keys()) - logger.debug(f"TOOL: Tool '{name}' not found in available tools") - message = f"Tool '{name}' not found on server. Available tools: {', '.join(available)}" - raise MCPToolNotFoundError(message, tool_name=name, available_tools=available) - + logger.debug(f"TOOL: Connecting to MCP server for '{name}'") + tool_info = self._connect_and_initialize(name) logger.debug(f"TOOL: Found tool '{name}', initializing Tool parent class") + # Initialize the parent class super().__init__( name=name, @@ -914,6 +889,36 @@ def __init__( message = f"Failed to initialize MCPTool '{name}': {error_message}" raise MCPConnectionError(message=message, server_info=server_info, operation="initialize") from e + def _connect_and_initialize(self, tool_name: str) -> types.Tool: + """ + Connect to the MCP server and retrieve the tool schema. + + :param tool_name: Name of the tool to look for + :returns: The tool schema for this tool + :raises MCPToolNotFoundError: If the tool is not found on the server + """ + client = self._server_info.create_client() + worker = _MCPClientSessionManager(client, timeout=self._connection_timeout) + tools = worker.tools() + + # Handle no tools case + if not tools: + message = "No tools available on server" + raise MCPToolNotFoundError(message, tool_name=tool_name) + + # Find the specified tool + tool = next((t for t in tools if t.name == tool_name), None) + if tool is None: + available = [t.name for t in tools] + msg = f"Tool '{tool_name}' not found on server. Available tools: {', '.join(available)}" + raise MCPToolNotFoundError(msg, tool_name=tool_name, available_tools=available) + + # Publish connection + self._client = client + self._worker = worker + + return tool + def _invoke_tool(self, **kwargs: Any) -> str: """ Synchronous tool invocation. @@ -928,12 +933,8 @@ def _invoke_tool(self, **kwargs: Any) -> str: async def invoke(): logger.debug(f"TOOL: Inside invoke coroutine for '{self.name}'") - # This should never happen, and mypy doesn't know that - if self._client is None: - raise MCPConnectionError(message="Not connected to an MCP server", operation="call_tool") - result = await asyncio.wait_for( - self._client.call_tool(self.name, kwargs), timeout=self._invocation_timeout - ) + client = cast(MCPClient, self._client) + result = await asyncio.wait_for(client.call_tool(self.name, kwargs), timeout=self._invocation_timeout) logger.debug(f"TOOL: Invoke successful for '{self.name}'") return result @@ -975,26 +976,11 @@ async def ainvoke(self, **kwargs: Any) -> str: def warm_up(self) -> None: """Connect and fetch the tool schema if eager_connect is turned off.""" - if self._eager_connect: - return with self._lock: if self._client is not None: return - client = self._server_info.create_client() - worker = _MCPClientSessionManager(client, timeout=self._connection_timeout) - tools = worker.tools() - tool = next((t for t in tools if t.name == self.name), None) - if tool is None: - available = [t.name for t in tools] - msg = f"Tool '{self.name}' not found on server. Available tools: {', '.join(available)}" - raise MCPToolNotFoundError(msg, tool_name=self.name, available_tools=available) - # Publish connection and tighten parameters for better prompting - self._client = client - self._worker = worker - try: - self.parameters = tool.inputSchema - except Exception as e: - logger.debug(f"TOOL: Could not update strict parameters after connect: {e!s}") + tool = self._connect_and_initialize(self.name) + self.parameters = tool.inputSchema def to_dict(self) -> dict[str, Any]: """ diff --git a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py index cc13d72b1a..529e7e487c 100644 --- a/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py +++ b/integrations/mcp/src/haystack_integrations/tools/mcp/mcp_toolset.py @@ -140,7 +140,7 @@ def __init__( self.connection_timeout = connection_timeout self.invocation_timeout = invocation_timeout self.eager_connect = eager_connect - self.warmup_called = False + self._warmup_called = False if not eager_connect: # Do not connect during validation; expose a toolset with one fake tool to pass validation @@ -154,19 +154,21 @@ def __init__( else: tools = self._connect_and_load_tools() super().__init__(tools=tools) + self._warmup_called = True def warm_up(self) -> None: - """Connect and load tools when running eager_connect is turned off. + """Connect and load tools when eager_connect is turned off. - Call this before handing the toolset to components like ``ToolInvoker`` so that - each tool's schema is available without performing a real invocation. + This method is automatically called by ``ToolInvoker.warm_up()`` and ``Pipeline.warm_up()``. + You can also call it directly before using the toolset to ensure all tool schemas + are available without performing a real invocation. """ - if self.eager_connect or self.warmup_called: + if self._warmup_called: return # connect and load tools never adds duplicate tools, set the tools attribute directly self.tools = self._connect_and_load_tools() - self.warmup_called = True + self._warmup_called = True def _connect_and_load_tools(self) -> list[Tool]: """Connect and load tools.""" diff --git a/integrations/mcp/tests/test_mcp_timeout_reconnection.py b/integrations/mcp/tests/test_mcp_timeout_reconnection.py index f2f64025f7..5ed3fcebd5 100644 --- a/integrations/mcp/tests/test_mcp_timeout_reconnection.py +++ b/integrations/mcp/tests/test_mcp_timeout_reconnection.py @@ -13,6 +13,7 @@ import subprocess import sys import tempfile +import textwrap import time from unittest.mock import AsyncMock, MagicMock @@ -108,40 +109,39 @@ def test_real_sse_reconnection_after_server_restart(self): try: # Create server script with cross-platform signal handling with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_file: - temp_file.write( - f""" -import sys -import signal -from mcp.server.fastmcp import FastMCP - -# Handle shutdown signals gracefully (cross-platform) -def signal_handler(signum, frame): - sys.exit(0) - -# Only set up signal handlers that exist on the platform -if hasattr(signal, 'SIGTERM'): - signal.signal(signal.SIGTERM, signal_handler) -if hasattr(signal, 'SIGINT'): - signal.signal(signal.SIGINT, signal_handler) - -mcp = FastMCP("Reconnection Test Server", host="127.0.0.1", port={port}) - -@mcp.tool() -def test_tool(message: str) -> str: - return f"Server response: {{message}}" - -if __name__ == "__main__": - try: - print(f"Starting server on port {port}", flush=True) - mcp.run(transport="sse") - except (KeyboardInterrupt, SystemExit): - print("Server shutting down gracefully", flush=True) - sys.exit(0) - except Exception as e: - print(f"Server error: {{e}}", file=sys.stderr, flush=True) - sys.exit(1) -""".encode() - ) + script_content = textwrap.dedent(f""" + import sys + import signal + from mcp.server.fastmcp import FastMCP + + # Handle shutdown signals gracefully (cross-platform) + def signal_handler(signum, frame): + sys.exit(0) + + # Only set up signal handlers that exist on the platform + if hasattr(signal, 'SIGTERM'): + signal.signal(signal.SIGTERM, signal_handler) + if hasattr(signal, 'SIGINT'): + signal.signal(signal.SIGINT, signal_handler) + + mcp = FastMCP("Reconnection Test Server", host="127.0.0.1", port={port}) + + @mcp.tool() + def test_tool(message: str) -> str: + return f"Server response: {{message}}" + + if __name__ == "__main__": + try: + print(f"Starting server on port {port}", flush=True) + mcp.run(transport="sse") + except (KeyboardInterrupt, SystemExit): + print("Server shutting down gracefully", flush=True) + sys.exit(0) + except Exception as e: + print(f"Server error: {{e}}", file=sys.stderr, flush=True) + sys.exit(1) + """).strip() + temp_file.write(script_content.encode()) server_script_path = temp_file.name # Start server