From 30aca457fca0b348b2085f92a2128d026e5a72f3 Mon Sep 17 00:00:00 2001 From: Teryl Taylor Date: Tue, 16 Sep 2025 11:11:42 -0600 Subject: [PATCH] fix: fixed retry on client plugin connection. Signed-off-by: Teryl Taylor --- .../plugins/framework/external/mcp/client.py | 62 +++++++++++++------ 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/mcpgateway/plugins/framework/external/mcp/client.py b/mcpgateway/plugins/framework/external/mcp/client.py index 7facb160e..fe68fcd08 100644 --- a/mcpgateway/plugins/framework/external/mcp/client.py +++ b/mcpgateway/plugins/framework/external/mcp/client.py @@ -135,29 +135,54 @@ async def __connect_to_stdio_server(self, server_script_path: str) -> None: raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) async def __connect_to_http_server(self, uri: str) -> None: - """Connect to an MCP plugin server via streamable http. + """Connect to an MCP plugin server via streamable http with retry logic. Args: uri: the URI of the mcp plugin server. Raises: - PluginError: if there is an external connection error. + PluginError: if there is an external connection error after all retries. """ - - try: - http_transport = await self._exit_stack.enter_async_context(streamablehttp_client(uri)) - self._http, self._write, _ = http_transport - self._session = await self._exit_stack.enter_async_context(ClientSession(self._http, self._write)) - - await self._session.initialize() - - # List available tools - response = await self._session.list_tools() - tools = response.tools - logger.info("\nConnected to plugin MCP (http) server with tools: %s", " ".join([tool.name for tool in tools])) - except Exception as e: - logger.exception(e) - raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name)) + max_retries = 3 + base_delay = 1.0 + + for attempt in range(max_retries): + logger.info(f"Connecting to external plugin server: {uri} (attempt {attempt + 1}/{max_retries})") + + try: + # Create a fresh exit stack for each attempt + async with AsyncExitStack() as temp_stack: + http_transport = await temp_stack.enter_async_context(streamablehttp_client(uri)) + http_client, write_func, _ = http_transport + session = await temp_stack.enter_async_context(ClientSession(http_client, write_func)) + + await session.initialize() + + # List available tools + response = await session.list_tools() + tools = response.tools + logger.info("Successfully connected to plugin MCP server with tools: %s", " ".join([tool.name for tool in tools])) + + # Success! Now move to the main exit stack + self._http = await self._exit_stack.enter_async_context(streamablehttp_client(uri)) + self._http, self._write, _ = self._http + self._session = await self._exit_stack.enter_async_context(ClientSession(self._http, self._write)) + await self._session.initialize() + return + + except Exception as e: + logger.warning(f"Connection attempt {attempt + 1}/{max_retries} failed: {e}") + + if attempt == max_retries - 1: + # Final attempt failed + error_msg = f"External plugin '{self.name}' connection failed after {max_retries} attempts: {uri} is not reachable. Please ensure the MCP server is running." + logger.error(error_msg) + raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name)) + await self.shutdown() + # Wait before retry + delay = base_delay * (2**attempt) + logger.info(f"Retrying in {delay}s...") + await asyncio.sleep(delay) async def __invoke_hook(self, payload_result_model: Type[P], hook_type: HookType, payload: BaseModel, context: PluginContext) -> P: """Invoke an external plugin hook using the MCP protocol. @@ -296,4 +321,5 @@ async def __get_plugin_config(self) -> PluginConfig | None: async def shutdown(self) -> None: """Plugin cleanup code.""" - await self._exit_stack.aclose() + if self._exit_stack: + await self._exit_stack.aclose()