Skip to content

Commit 723d5b6

Browse files
claudemanavgup
authored andcommitted
feat(mcp): Implement MCP Gateway integration for extensibility
Implements a simplified MCP (Model Context Protocol) integration approach as recommended by expert panel (Martin Fowler, Sam Newman, Michael Nygard, Gregor Hohpe). This provides foundational capability for tool-based search result enrichment. Key components: - ResilientMCPGatewayClient: Thin wrapper (~200 lines) with circuit breaker pattern, health checks (5s timeout), retry logic, and graceful degradation - SearchResultEnricher: Content Enricher pattern implementation (~200 lines) with parallel execution and error isolation - MCP Router: API endpoints for tool discovery and invocation Features: - Circuit breaker: 5 failure threshold, 60s recovery timeout - Health monitoring with 5-second timeout - API versioning (v1 format) - Prometheus-ready metrics - Graceful degradation (core RAG works if tools fail) Docker infrastructure: - Redis service for MCP gateway caching - MCP Context Forge gateway container Configuration settings added: - MCP_ENABLED, MCP_GATEWAY_URL, MCP_TIMEOUT - MCP_CIRCUIT_BREAKER_THRESHOLD, MCP_CIRCUIT_BREAKER_TIMEOUT - MCP_ENRICHMENT_ENABLED, MCP_MAX_CONCURRENT Closes #653
1 parent 45168fc commit 723d5b6

File tree

10 files changed

+2826
-0
lines changed

10 files changed

+2826
-0
lines changed

backend/core/config.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,28 @@ class Settings(BaseSettings):
284284
log_storage_enabled: Annotated[bool, Field(default=True, alias="LOG_STORAGE_ENABLED")]
285285
log_buffer_size_mb: Annotated[int, Field(default=5, alias="LOG_BUFFER_SIZE_MB")]
286286

287+
# MCP Gateway settings
288+
# Enable/disable MCP integration globally
289+
mcp_enabled: Annotated[bool, Field(default=True, alias="MCP_ENABLED")]
290+
# MCP Context Forge gateway URL
291+
mcp_gateway_url: Annotated[str, Field(default="http://localhost:3000", alias="MCP_GATEWAY_URL")]
292+
# Request timeout in seconds (30s default per requirements)
293+
mcp_timeout: Annotated[float, Field(default=30.0, ge=1.0, le=300.0, alias="MCP_TIMEOUT")]
294+
# Health check timeout (5s per requirements)
295+
mcp_health_timeout: Annotated[float, Field(default=5.0, ge=1.0, le=30.0, alias="MCP_HEALTH_TIMEOUT")]
296+
# Maximum retries for MCP calls
297+
mcp_max_retries: Annotated[int, Field(default=3, ge=0, le=10, alias="MCP_MAX_RETRIES")]
298+
# Circuit breaker failure threshold (5 failures per requirements)
299+
mcp_circuit_breaker_threshold: Annotated[int, Field(default=5, ge=1, le=20, alias="MCP_CIRCUIT_BREAKER_THRESHOLD")]
300+
# Circuit breaker recovery timeout in seconds (60s per requirements)
301+
mcp_circuit_breaker_timeout: Annotated[float, Field(default=60.0, ge=10.0, le=600.0, alias="MCP_CIRCUIT_BREAKER_TIMEOUT")]
302+
# JWT token for MCP gateway authentication
303+
mcp_jwt_token: Annotated[str | None, Field(default=None, alias="MCP_JWT_TOKEN")]
304+
# Enable enrichment of search results with MCP tools
305+
mcp_enrichment_enabled: Annotated[bool, Field(default=True, alias="MCP_ENRICHMENT_ENABLED")]
306+
# Maximum concurrent MCP tool invocations
307+
mcp_max_concurrent: Annotated[int, Field(default=5, ge=1, le=20, alias="MCP_MAX_CONCURRENT")]
308+
287309
# Testing settings
288310
testing: Annotated[bool, Field(default=False, alias="TESTING")]
289311
skip_auth: Annotated[bool, Field(default=False, alias="SKIP_AUTH")]

backend/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
# Routers
3636
from rag_solution.router.chat_router import router as chat_router
37+
from rag_solution.router.mcp_router import router as mcp_router
3738
from rag_solution.router.collection_router import router as collection_router
3839
from rag_solution.router.conversation_router import router as conversation_router
3940
from rag_solution.router.dashboard_router import router as dashboard_router
@@ -248,6 +249,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
248249
app.include_router(auth_router)
249250
app.include_router(chat_router)
250251
app.include_router(conversation_router)
252+
app.include_router(mcp_router)
251253
app.include_router(dashboard_router)
252254
app.include_router(health_router)
253255
app.include_router(collection_router)
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
"""MCP Gateway router for RAG Modulo API.
2+
3+
This module provides FastAPI router endpoints for MCP (Model Context Protocol)
4+
Gateway integration, enabling tool discovery and invocation capabilities.
5+
6+
API Endpoints:
7+
- GET /api/v1/mcp/tools - List available MCP tools
8+
- POST /api/v1/mcp/tools/{name}/invoke - Invoke a specific MCP tool
9+
- GET /api/v1/mcp/health - Check MCP gateway health
10+
"""
11+
12+
from typing import Annotated
13+
14+
from fastapi import APIRouter, Depends, HTTPException, status
15+
16+
from core.config import Settings, get_settings
17+
from core.logging_utils import get_logger
18+
from rag_solution.core.dependencies import get_current_user
19+
from rag_solution.schemas.mcp_schema import (
20+
MCPHealthStatus,
21+
MCPInvocationInput,
22+
MCPInvocationOutput,
23+
MCPInvocationStatus,
24+
MCPToolsResponse,
25+
)
26+
from rag_solution.services.mcp_gateway_client import ResilientMCPGatewayClient
27+
28+
logger = get_logger(__name__)
29+
30+
router = APIRouter(prefix="/api/v1/mcp", tags=["mcp"])
31+
32+
33+
def get_mcp_client(
34+
settings: Annotated[Settings, Depends(get_settings)],
35+
) -> ResilientMCPGatewayClient:
36+
"""Dependency to create MCP gateway client.
37+
38+
Args:
39+
settings: Application settings from dependency injection
40+
41+
Returns:
42+
ResilientMCPGatewayClient: Initialized MCP client instance
43+
"""
44+
return ResilientMCPGatewayClient(settings)
45+
46+
47+
@router.get(
48+
"/health",
49+
response_model=MCPHealthStatus,
50+
summary="Check MCP gateway health",
51+
description="Perform a health check on the MCP Context Forge gateway",
52+
responses={
53+
200: {"description": "Health check completed (see healthy field for status)"},
54+
503: {"description": "MCP integration is disabled"},
55+
},
56+
)
57+
async def mcp_health(
58+
settings: Annotated[Settings, Depends(get_settings)],
59+
mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)],
60+
) -> MCPHealthStatus:
61+
"""Check MCP gateway health status.
62+
63+
Returns health information including:
64+
- Gateway availability
65+
- Latency
66+
- Circuit breaker state
67+
68+
Args:
69+
settings: Application settings
70+
mcp_client: MCP gateway client
71+
72+
Returns:
73+
MCPHealthStatus: Health status information
74+
"""
75+
if not settings.mcp_enabled:
76+
raise HTTPException(
77+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
78+
detail="MCP integration is disabled",
79+
)
80+
81+
return await mcp_client.check_health()
82+
83+
84+
@router.get(
85+
"/tools",
86+
response_model=MCPToolsResponse,
87+
summary="List available MCP tools",
88+
description="Retrieve a list of all available MCP tools from the gateway",
89+
responses={
90+
200: {"description": "List of available MCP tools"},
91+
503: {"description": "MCP integration is disabled or gateway unavailable"},
92+
},
93+
)
94+
async def list_tools(
95+
current_user: Annotated[dict, Depends(get_current_user)],
96+
settings: Annotated[Settings, Depends(get_settings)],
97+
mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)],
98+
) -> MCPToolsResponse:
99+
"""List all available MCP tools.
100+
101+
Returns tools available for invocation, including their:
102+
- Name and description
103+
- Input parameters
104+
- Category and version
105+
106+
SECURITY: Requires authentication.
107+
108+
Args:
109+
current_user: Authenticated user from JWT token
110+
settings: Application settings
111+
mcp_client: MCP gateway client
112+
113+
Returns:
114+
MCPToolsResponse: List of available tools
115+
116+
Raises:
117+
HTTPException: If MCP is disabled or gateway unavailable
118+
"""
119+
if not settings.mcp_enabled:
120+
raise HTTPException(
121+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
122+
detail="MCP integration is disabled",
123+
)
124+
125+
logger.info(
126+
"Listing MCP tools",
127+
extra={
128+
"user_id": current_user.get("uuid"),
129+
},
130+
)
131+
132+
response = await mcp_client.list_tools()
133+
134+
if not response.gateway_healthy:
135+
logger.warning(
136+
"MCP gateway unhealthy when listing tools",
137+
extra={
138+
"user_id": current_user.get("uuid"),
139+
},
140+
)
141+
142+
return response
143+
144+
145+
@router.post(
146+
"/tools/{tool_name}/invoke",
147+
response_model=MCPInvocationOutput,
148+
summary="Invoke an MCP tool",
149+
description="Invoke a specific MCP tool with the provided arguments",
150+
responses={
151+
200: {"description": "Tool invocation completed (check status field)"},
152+
400: {"description": "Invalid input data"},
153+
404: {"description": "Tool not found"},
154+
503: {"description": "MCP integration is disabled"},
155+
},
156+
)
157+
async def invoke_tool(
158+
tool_name: str,
159+
invocation_input: MCPInvocationInput,
160+
current_user: Annotated[dict, Depends(get_current_user)],
161+
settings: Annotated[Settings, Depends(get_settings)],
162+
mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)],
163+
) -> MCPInvocationOutput:
164+
"""Invoke a specific MCP tool.
165+
166+
Executes the named tool with provided arguments. Implements graceful
167+
degradation - tool failures are returned in the response status rather
168+
than throwing exceptions (except for validation errors).
169+
170+
SECURITY: Requires authentication.
171+
172+
Args:
173+
tool_name: Name of the tool to invoke
174+
invocation_input: Tool arguments and optional timeout
175+
current_user: Authenticated user from JWT token
176+
settings: Application settings
177+
mcp_client: MCP gateway client
178+
179+
Returns:
180+
MCPInvocationOutput: Tool execution result
181+
182+
Raises:
183+
HTTPException: If MCP is disabled or input validation fails
184+
"""
185+
if not settings.mcp_enabled:
186+
raise HTTPException(
187+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
188+
detail="MCP integration is disabled",
189+
)
190+
191+
if not tool_name or not tool_name.strip():
192+
raise HTTPException(
193+
status_code=status.HTTP_400_BAD_REQUEST,
194+
detail="Tool name is required",
195+
)
196+
197+
user_id = current_user.get("uuid")
198+
199+
logger.info(
200+
"Invoking MCP tool",
201+
extra={
202+
"tool_name": tool_name,
203+
"user_id": user_id,
204+
"has_arguments": bool(invocation_input.arguments),
205+
},
206+
)
207+
208+
result = await mcp_client.invoke_tool(
209+
tool_name=tool_name.strip(),
210+
arguments=invocation_input.arguments,
211+
timeout=invocation_input.timeout,
212+
)
213+
214+
# Log result status
215+
if result.status == MCPInvocationStatus.SUCCESS:
216+
logger.info(
217+
"MCP tool invocation succeeded",
218+
extra={
219+
"tool_name": tool_name,
220+
"user_id": user_id,
221+
"execution_time_ms": result.execution_time_ms,
222+
},
223+
)
224+
else:
225+
logger.warning(
226+
"MCP tool invocation failed",
227+
extra={
228+
"tool_name": tool_name,
229+
"user_id": user_id,
230+
"status": result.status.value,
231+
"error": result.error,
232+
},
233+
)
234+
235+
return result
236+
237+
238+
@router.get(
239+
"/metrics",
240+
summary="Get MCP client metrics",
241+
description="Retrieve Prometheus-ready metrics from the MCP client",
242+
responses={
243+
200: {"description": "Client metrics"},
244+
503: {"description": "MCP integration is disabled"},
245+
},
246+
)
247+
async def get_metrics(
248+
current_user: Annotated[dict, Depends(get_current_user)],
249+
settings: Annotated[Settings, Depends(get_settings)],
250+
mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)],
251+
) -> dict:
252+
"""Get MCP client metrics for monitoring.
253+
254+
Returns Prometheus-ready metrics including:
255+
- Request counts (total, success, failed)
256+
- Circuit breaker state
257+
- Health check statistics
258+
259+
SECURITY: Requires authentication.
260+
261+
Args:
262+
current_user: Authenticated user from JWT token
263+
settings: Application settings
264+
mcp_client: MCP gateway client
265+
266+
Returns:
267+
dict: Client metrics
268+
"""
269+
if not settings.mcp_enabled:
270+
raise HTTPException(
271+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
272+
detail="MCP integration is disabled",
273+
)
274+
275+
return mcp_client.get_metrics()

0 commit comments

Comments
 (0)