Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 44 additions & 18 deletions mcpgateway/plugins/framework/external/mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,29 +135,54 @@ async def __connect_to_stdio_server(self, server_script_path: str) -> None:
raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))

async def __connect_to_http_server(self, uri: str) -> None:
"""Connect to an MCP plugin server via streamable http.
"""Connect to an MCP plugin server via streamable http with retry logic.

Args:
uri: the URI of the mcp plugin server.

Raises:
PluginError: if there is an external connection error.
PluginError: if there is an external connection error after all retries.
"""

try:
http_transport = await self._exit_stack.enter_async_context(streamablehttp_client(uri))
self._http, self._write, _ = http_transport
self._session = await self._exit_stack.enter_async_context(ClientSession(self._http, self._write))

await self._session.initialize()

# List available tools
response = await self._session.list_tools()
tools = response.tools
logger.info("\nConnected to plugin MCP (http) server with tools: %s", " ".join([tool.name for tool in tools]))
except Exception as e:
logger.exception(e)
raise PluginError(error=convert_exception_to_error(e, plugin_name=self.name))
max_retries = 3
base_delay = 1.0

for attempt in range(max_retries):
logger.info(f"Connecting to external plugin server: {uri} (attempt {attempt + 1}/{max_retries})")

try:
# Create a fresh exit stack for each attempt
async with AsyncExitStack() as temp_stack:
http_transport = await temp_stack.enter_async_context(streamablehttp_client(uri))
http_client, write_func, _ = http_transport
session = await temp_stack.enter_async_context(ClientSession(http_client, write_func))

await session.initialize()

# List available tools
response = await session.list_tools()
tools = response.tools
logger.info("Successfully connected to plugin MCP server with tools: %s", " ".join([tool.name for tool in tools]))

# Success! Now move to the main exit stack
self._http = await self._exit_stack.enter_async_context(streamablehttp_client(uri))
self._http, self._write, _ = self._http
self._session = await self._exit_stack.enter_async_context(ClientSession(self._http, self._write))
await self._session.initialize()
return

except Exception as e:
logger.warning(f"Connection attempt {attempt + 1}/{max_retries} failed: {e}")

if attempt == max_retries - 1:
# Final attempt failed
error_msg = f"External plugin '{self.name}' connection failed after {max_retries} attempts: {uri} is not reachable. Please ensure the MCP server is running."
logger.error(error_msg)
raise PluginError(error=PluginErrorModel(message=error_msg, plugin_name=self.name))
await self.shutdown()
# Wait before retry
delay = base_delay * (2**attempt)
logger.info(f"Retrying in {delay}s...")
await asyncio.sleep(delay)

async def __invoke_hook(self, payload_result_model: Type[P], hook_type: HookType, payload: BaseModel, context: PluginContext) -> P:
"""Invoke an external plugin hook using the MCP protocol.
Expand Down Expand Up @@ -296,4 +321,5 @@ async def __get_plugin_config(self) -> PluginConfig | None:

async def shutdown(self) -> None:
"""Plugin cleanup code."""
await self._exit_stack.aclose()
if self._exit_stack:
await self._exit_stack.aclose()
Loading