diff --git a/README.md b/README.md index 0b6933d5..b36caad0 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ RAG Modulo is a production-ready Retrieval-Augmented Generation platform that pr #### 🎨 Frontend Features + - **Modern UI**: React 18 with Tailwind CSS and Carbon Design System principles - **Reusable Component Library**: 8 accessible, type-safe components with consistent design patterns - **Enhanced Search**: Interactive chat interface with Chain of Thought reasoning visualization @@ -167,6 +168,7 @@ make local-dev-all 🔍 What Gets Installed **Backend (Python/Poetry)**: + - FastAPI and dependencies - LLM providers (WatsonX, OpenAI, Anthropic) - Vector DB clients (Milvus, Elasticsearch, etc.) @@ -174,12 +176,14 @@ make local-dev-all - Code quality tools (ruff, mypy, bandit) **Frontend (npm)**: + - React 18 + Vite - Tailwind CSS + Carbon Design - TypeScript dependencies - Testing libraries **Infrastructure (Docker)**: + - PostgreSQL (metadata) - Milvus (vector storage) - MinIO (object storage) @@ -188,18 +192,21 @@ make local-dev-all **Access Points:** -- 🌐 **Frontend**: http://localhost:3000 -- 🔧 **Backend API**: http://localhost:8000/docs (Swagger UI) -- 📊 **MLFlow**: http://localhost:5001 -- 💾 **MinIO Console**: http://localhost:9001 + +- 🌐 **Frontend**: +- 🔧 **Backend API**: (Swagger UI) +- 📊 **MLFlow**: +- 💾 **MinIO Console**: **Benefits:** + - ⚡ **Instant reload** - Python/React changes reflected immediately (no container rebuilds) - 🐛 **Native debugging** - Use PyCharm, VS Code debugger with breakpoints - 📦 **Local caching** - Poetry/npm caches work natively for faster dependency installs - 🔥 **Fastest iteration** - Pre-commit hooks optimized (fast on commit, comprehensive on push) **When to use:** + - ✅ Daily development work - ✅ Feature development and bug fixes - ✅ Rapid iteration and testing @@ -227,6 +234,7 @@ docker compose up -d ``` **When to use:** + - ✅ Testing production configurations - ✅ Validating Docker builds - ✅ Deployment rehearsal @@ -242,6 +250,7 @@ docker compose up -d 4. **Run**: `make venv && make run-infra` **When to use:** + - ✅ No local setup required - ✅ Consistent development environment - ✅ Work from any device @@ -364,9 +373,10 @@ make local-dev-all ``` Done! Services running at: -- Frontend: http://localhost:3000 -- Backend: http://localhost:8000 -- MLFlow: http://localhost:5001 + +- Frontend: +- Backend: +- MLFlow: @@ -500,6 +510,7 @@ make prod-start ``` Available images: + - `ghcr.io/manavgup/rag_modulo/backend:latest` - `ghcr.io/manavgup/rag_modulo/frontend:latest` @@ -517,6 +528,8 @@ Available images: - **Multi-LLM Support**: Seamless switching between WatsonX, OpenAI, and Anthropic with provider-specific optimizations - **IBM Docling Integration**: Enhanced document processing for complex formats (PDF, DOCX, XLSX) - **Question Suggestions**: AI-generated relevant questions based on document collection content +- **MCP Context Forge**: Tool enrichment via + [IBM MCP](https://github.com/IBM/mcp-context-forge) with resilience patterns ### 🔍 Search & Retrieval @@ -564,6 +577,7 @@ Available images: - **[🔌 API Reference](docs/api/README.md)** - Complete API documentation - **[🖥️ CLI Documentation](docs/cli/index.md)** - Command-line interface guide - **[🔐 Secret Management](docs/development/secret-management.md)** - Comprehensive guide for safe secret handling +- **[🔗 MCP Integration](docs/features/mcp-integration.md)** - MCP Context Forge gateway setup and usage ### 🛠️ Command-Line Interface (CLI) @@ -617,6 +631,7 @@ make run-ghcr ``` **Available Images:** + - `ghcr.io/manavgup/rag_modulo/backend:latest` - `ghcr.io/manavgup/rag_modulo/frontend:latest` @@ -692,6 +707,7 @@ RAG Modulo uses a comprehensive CI/CD pipeline with multiple stages: **Triggers:** Push to `main`, Pull Requests **Stages:** + 1. **Lint and Unit Tests** (No infrastructure) - Ruff linting (120 char line length) - MyPy type checking @@ -711,6 +727,7 @@ RAG Modulo uses a comprehensive CI/CD pipeline with multiple stages: - End-to-end validation **Status Badges:** + ```markdown [![CI Pipeline](https://github.com/manavgup/rag_modulo/workflows/CI/badge.svg)](https://github.com/manavgup/rag_modulo/actions) ``` @@ -720,11 +737,13 @@ RAG Modulo uses a comprehensive CI/CD pipeline with multiple stages: **Triggers:** Push to `main`, Pull Requests **Secret Detection (3-Layer Defense):** + 1. **Pre-commit hooks**: detect-secrets with baseline (< 1 sec) 2. **Local testing**: Gitleaks via `make pre-commit-run` (~1-2 sec) 3. **CI/CD**: Gitleaks + TruffleHog (~45 sec) **Scans:** + - **Gitleaks**: Pattern-based secret scanning with custom rules (`.gitleaks.toml`) - **TruffleHog**: Entropy-based + verified secret detection - **Trivy**: Container vulnerability scanning @@ -735,6 +754,7 @@ RAG Modulo uses a comprehensive CI/CD pipeline with multiple stages: **⚠️ IMPORTANT:** CI now **fails on ANY secret detection** (no `continue-on-error`). This ensures no secrets make their way to the repository. **Supported Secret Types:** + - Cloud: AWS, Azure, GCP keys - LLM: OpenAI, Anthropic, WatsonX, Gemini API keys - Infrastructure: PostgreSQL, MinIO, MLFlow, JWT secrets @@ -748,6 +768,7 @@ RAG Modulo uses a comprehensive CI/CD pipeline with multiple stages: **Triggers:** Push to `main`, Pull Requests to `docs/` **Actions:** + - Build MkDocs site - Deploy to GitHub Pages - API documentation generation @@ -773,30 +794,35 @@ make scan-secrets Optimized for developer velocity: **On Commit** (fast, 5-10 sec): + - Ruff formatting - Trailing whitespace - YAML syntax - File size limits **On Push** (slow, 30-60 sec): + - MyPy type checking - Pylint analysis - Security scans - Strangler pattern checks **In CI** (comprehensive): + - All checks run regardless - Ensures quality gates ### Container Registry **GitHub Container Registry (GHCR)**: + - Automatic image builds on push - Multi-architecture support (amd64, arm64) - Image signing and verification - Retention policies **Image Tags:** + - `latest`: Latest main branch build - `sha-`: Specific commit - ``: Branch-specific builds @@ -861,6 +887,7 @@ We welcome contributions! Please see our [Contributing Guide](docs/development/c ## 📈 Roadmap ### ✅ Phase 1: Foundation (Completed) + - [x] Service-based architecture with 26+ services - [x] Comprehensive test infrastructure (947 tests) - [x] Multi-LLM provider support (WatsonX, OpenAI, Anthropic) @@ -868,6 +895,7 @@ We welcome contributions! Please see our [Contributing Guide](docs/development/c - [x] CI/CD pipeline with security scanning ### ✅ Phase 2: Advanced Features (Completed) + - [x] Chain of Thought (CoT) reasoning system - [x] Automatic pipeline resolution - [x] Token tracking and monitoring @@ -877,6 +905,7 @@ We welcome contributions! Please see our [Contributing Guide](docs/development/c - [x] Containerless local development workflow ### 🔄 Phase 3: Production Enhancement (Current) + - [x] Production deployment with GHCR images - [x] Multi-stage Docker builds - [x] Security hardening (Trivy, Bandit, Gitleaks, Semgrep) @@ -885,6 +914,7 @@ We welcome contributions! Please see our [Contributing Guide](docs/development/c - [ ] Authentication system improvements (OIDC) ### 🚀 Phase 4: Enterprise Features (Next) + - [ ] Multi-tenant support - [ ] Advanced analytics and dashboards - [ ] Batch processing capabilities @@ -892,6 +922,7 @@ We welcome contributions! Please see our [Contributing Guide](docs/development/c - [ ] Advanced caching strategies ### 🔮 Phase 5: Innovation (Future) + - [ ] Multi-modal support (image, audio) - [ ] Agentic AI workflows - [ ] Real-time collaborative features diff --git a/backend/core/config.py b/backend/core/config.py index 439e547e..f1bc9cc7 100644 --- a/backend/core/config.py +++ b/backend/core/config.py @@ -284,6 +284,30 @@ class Settings(BaseSettings): log_storage_enabled: Annotated[bool, Field(default=True, alias="LOG_STORAGE_ENABLED")] log_buffer_size_mb: Annotated[int, Field(default=5, alias="LOG_BUFFER_SIZE_MB")] + # MCP Gateway settings + # Enable/disable MCP integration globally + mcp_enabled: Annotated[bool, Field(default=True, alias="MCP_ENABLED")] + # MCP Context Forge gateway URL (port 3001 to avoid frontend conflict on 3000) + mcp_gateway_url: Annotated[str, Field(default="http://localhost:3001", alias="MCP_GATEWAY_URL")] + # Request timeout in seconds (30s default per requirements) + mcp_timeout: Annotated[float, Field(default=30.0, ge=1.0, le=300.0, alias="MCP_TIMEOUT")] + # Health check timeout (5s per requirements) + mcp_health_timeout: Annotated[float, Field(default=5.0, ge=1.0, le=30.0, alias="MCP_HEALTH_TIMEOUT")] + # Maximum retries for MCP calls + mcp_max_retries: Annotated[int, Field(default=3, ge=0, le=10, alias="MCP_MAX_RETRIES")] + # Circuit breaker failure threshold (5 failures per requirements) + mcp_circuit_breaker_threshold: Annotated[int, Field(default=5, ge=1, le=20, alias="MCP_CIRCUIT_BREAKER_THRESHOLD")] + # Circuit breaker recovery timeout in seconds (60s per requirements) + mcp_circuit_breaker_timeout: Annotated[ + float, Field(default=60.0, ge=10.0, le=600.0, alias="MCP_CIRCUIT_BREAKER_TIMEOUT") + ] + # JWT token for MCP gateway authentication + mcp_jwt_token: Annotated[str | None, Field(default=None, alias="MCP_JWT_TOKEN")] + # Enable enrichment of search results with MCP tools + mcp_enrichment_enabled: Annotated[bool, Field(default=True, alias="MCP_ENRICHMENT_ENABLED")] + # Maximum concurrent MCP tool invocations + mcp_max_concurrent: Annotated[int, Field(default=5, ge=1, le=20, alias="MCP_MAX_CONCURRENT")] + # Testing settings testing: Annotated[bool, Field(default=False, alias="TESTING")] skip_auth: Annotated[bool, Field(default=False, alias="SKIP_AUTH")] diff --git a/backend/main.py b/backend/main.py index 819b0e1e..b259eddd 100644 --- a/backend/main.py +++ b/backend/main.py @@ -38,6 +38,7 @@ from rag_solution.router.conversation_router import router as conversation_router from rag_solution.router.dashboard_router import router as dashboard_router from rag_solution.router.health_router import router as health_router +from rag_solution.router.mcp_router import router as mcp_router from rag_solution.router.podcast_router import router as podcast_router from rag_solution.router.runtime_config_router import router as runtime_config_router from rag_solution.router.search_router import router as search_router @@ -248,6 +249,7 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: app.include_router(auth_router) app.include_router(chat_router) app.include_router(conversation_router) +app.include_router(mcp_router) app.include_router(dashboard_router) app.include_router(health_router) app.include_router(collection_router) diff --git a/backend/rag_solution/router/mcp_router.py b/backend/rag_solution/router/mcp_router.py new file mode 100644 index 00000000..123468cd --- /dev/null +++ b/backend/rag_solution/router/mcp_router.py @@ -0,0 +1,275 @@ +"""MCP Gateway router for RAG Modulo API. + +This module provides FastAPI router endpoints for MCP (Model Context Protocol) +Gateway integration, enabling tool discovery and invocation capabilities. + +API Endpoints: +- GET /api/v1/mcp/tools - List available MCP tools +- POST /api/v1/mcp/tools/{name}/invoke - Invoke a specific MCP tool +- GET /api/v1/mcp/health - Check MCP gateway health +""" + +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, status + +from core.config import Settings, get_settings +from core.logging_utils import get_logger +from rag_solution.core.dependencies import get_current_user +from rag_solution.schemas.mcp_schema import ( + MCPHealthStatus, + MCPInvocationInput, + MCPInvocationOutput, + MCPInvocationStatus, + MCPToolsResponse, +) +from rag_solution.services.mcp_gateway_client import ResilientMCPGatewayClient + +logger = get_logger(__name__) + +router = APIRouter(prefix="/api/v1/mcp", tags=["mcp"]) + + +def get_mcp_client( + settings: Annotated[Settings, Depends(get_settings)], +) -> ResilientMCPGatewayClient: + """Dependency to create MCP gateway client. + + Args: + settings: Application settings from dependency injection + + Returns: + ResilientMCPGatewayClient: Initialized MCP client instance + """ + return ResilientMCPGatewayClient(settings) + + +@router.get( + "/health", + response_model=MCPHealthStatus, + summary="Check MCP gateway health", + description="Perform a health check on the MCP Context Forge gateway", + responses={ + 200: {"description": "Health check completed (see healthy field for status)"}, + 503: {"description": "MCP integration is disabled"}, + }, +) +async def mcp_health( + settings: Annotated[Settings, Depends(get_settings)], + mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)], +) -> MCPHealthStatus: + """Check MCP gateway health status. + + Returns health information including: + - Gateway availability + - Latency + - Circuit breaker state + + Args: + settings: Application settings + mcp_client: MCP gateway client + + Returns: + MCPHealthStatus: Health status information + """ + if not settings.mcp_enabled: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="MCP integration is disabled", + ) + + return await mcp_client.check_health() + + +@router.get( + "/tools", + response_model=MCPToolsResponse, + summary="List available MCP tools", + description="Retrieve a list of all available MCP tools from the gateway", + responses={ + 200: {"description": "List of available MCP tools"}, + 503: {"description": "MCP integration is disabled or gateway unavailable"}, + }, +) +async def list_tools( + current_user: Annotated[dict, Depends(get_current_user)], + settings: Annotated[Settings, Depends(get_settings)], + mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)], +) -> MCPToolsResponse: + """List all available MCP tools. + + Returns tools available for invocation, including their: + - Name and description + - Input parameters + - Category and version + + SECURITY: Requires authentication. + + Args: + current_user: Authenticated user from JWT token + settings: Application settings + mcp_client: MCP gateway client + + Returns: + MCPToolsResponse: List of available tools + + Raises: + HTTPException: If MCP is disabled or gateway unavailable + """ + if not settings.mcp_enabled: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="MCP integration is disabled", + ) + + logger.info( + "Listing MCP tools", + extra={ + "user_id": current_user.get("uuid"), + }, + ) + + response = await mcp_client.list_tools() + + if not response.gateway_healthy: + logger.warning( + "MCP gateway unhealthy when listing tools", + extra={ + "user_id": current_user.get("uuid"), + }, + ) + + return response + + +@router.post( + "/tools/{tool_name}/invoke", + response_model=MCPInvocationOutput, + summary="Invoke an MCP tool", + description="Invoke a specific MCP tool with the provided arguments", + responses={ + 200: {"description": "Tool invocation completed (check status field)"}, + 400: {"description": "Invalid input data"}, + 404: {"description": "Tool not found"}, + 503: {"description": "MCP integration is disabled"}, + }, +) +async def invoke_tool( + tool_name: str, + invocation_input: MCPInvocationInput, + current_user: Annotated[dict, Depends(get_current_user)], + settings: Annotated[Settings, Depends(get_settings)], + mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)], +) -> MCPInvocationOutput: + """Invoke a specific MCP tool. + + Executes the named tool with provided arguments. Implements graceful + degradation - tool failures are returned in the response status rather + than throwing exceptions (except for validation errors). + + SECURITY: Requires authentication. + + Args: + tool_name: Name of the tool to invoke + invocation_input: Tool arguments and optional timeout + current_user: Authenticated user from JWT token + settings: Application settings + mcp_client: MCP gateway client + + Returns: + MCPInvocationOutput: Tool execution result + + Raises: + HTTPException: If MCP is disabled or input validation fails + """ + if not settings.mcp_enabled: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="MCP integration is disabled", + ) + + if not tool_name or not tool_name.strip(): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Tool name is required", + ) + + user_id = current_user.get("uuid") + + logger.info( + "Invoking MCP tool", + extra={ + "tool_name": tool_name, + "user_id": user_id, + "has_arguments": bool(invocation_input.arguments), + }, + ) + + result = await mcp_client.invoke_tool( + tool_name=tool_name.strip(), + arguments=invocation_input.arguments, + timeout=invocation_input.timeout, + ) + + # Log result status + if result.status == MCPInvocationStatus.SUCCESS: + logger.info( + "MCP tool invocation succeeded", + extra={ + "tool_name": tool_name, + "user_id": user_id, + "execution_time_ms": result.execution_time_ms, + }, + ) + else: + logger.warning( + "MCP tool invocation failed", + extra={ + "tool_name": tool_name, + "user_id": user_id, + "status": result.status.value, + "error": result.error, + }, + ) + + return result + + +@router.get( + "/metrics", + summary="Get MCP client metrics", + description="Retrieve Prometheus-ready metrics from the MCP client", + responses={ + 200: {"description": "Client metrics"}, + 503: {"description": "MCP integration is disabled"}, + }, +) +async def get_metrics( + current_user: Annotated[dict, Depends(get_current_user)], + settings: Annotated[Settings, Depends(get_settings)], + mcp_client: Annotated[ResilientMCPGatewayClient, Depends(get_mcp_client)], +) -> dict: + """Get MCP client metrics for monitoring. + + Returns Prometheus-ready metrics including: + - Request counts (total, success, failed) + - Circuit breaker state + - Health check statistics + + SECURITY: Requires authentication. + + Args: + current_user: Authenticated user from JWT token + settings: Application settings + mcp_client: MCP gateway client + + Returns: + dict: Client metrics + """ + if not settings.mcp_enabled: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="MCP integration is disabled", + ) + + return mcp_client.get_metrics() diff --git a/backend/rag_solution/schemas/mcp_schema.py b/backend/rag_solution/schemas/mcp_schema.py new file mode 100644 index 00000000..3a84177c --- /dev/null +++ b/backend/rag_solution/schemas/mcp_schema.py @@ -0,0 +1,194 @@ +"""API schemas for MCP (Model Context Protocol) Gateway integration. + +This module defines the data structures for MCP tool discovery, +invocation, and search result enrichment. +""" + +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +from pydantic import UUID4, BaseModel, ConfigDict, Field + + +class MCPToolParameter(BaseModel): + """Schema for an MCP tool input parameter. + + Attributes: + name: Parameter name + type: Parameter type (string, number, boolean, object, array) + description: Human-readable description of the parameter + required: Whether the parameter is required + default: Default value if not provided + """ + + name: str + type: str + description: str | None = None + required: bool = False + default: Any | None = None + + model_config = ConfigDict(from_attributes=True, extra="forbid") + + +class MCPTool(BaseModel): + """Schema for an MCP tool definition. + + Attributes: + name: Unique tool identifier + description: Human-readable description of what the tool does + parameters: List of input parameters for the tool + category: Optional category for grouping tools + version: Tool version (default: v1) + enabled: Whether the tool is currently enabled + """ + + name: str + description: str + parameters: list[MCPToolParameter] = Field(default_factory=list) + category: str | None = None + version: str = "v1" + enabled: bool = True + + model_config = ConfigDict(from_attributes=True, extra="forbid") + + +class MCPToolsResponse(BaseModel): + """Response schema for listing available MCP tools. + + Attributes: + tools: List of available MCP tools + total_count: Total number of tools available + gateway_healthy: Whether the MCP gateway is healthy + """ + + tools: list[MCPTool] + total_count: int + gateway_healthy: bool = True + + model_config = ConfigDict(from_attributes=True) + + +class MCPInvocationInput(BaseModel): + """Input schema for invoking an MCP tool. + + Attributes: + arguments: Dictionary of argument name to value + timeout: Optional timeout override in seconds + user_id: Optional user ID for audit logging + """ + + arguments: dict[str, Any] = Field(default_factory=dict) + timeout: float | None = Field(default=None, ge=1.0, le=300.0) + user_id: UUID4 | None = None + + model_config = ConfigDict(from_attributes=True, extra="forbid") + + +class MCPInvocationStatus(str, Enum): + """Status of an MCP tool invocation.""" + + SUCCESS = "success" + ERROR = "error" + TIMEOUT = "timeout" + CIRCUIT_OPEN = "circuit_open" + + +class MCPInvocationOutput(BaseModel): + """Output schema for an MCP tool invocation. + + Attributes: + tool_name: Name of the tool that was invoked + status: Invocation status (success, error, timeout, circuit_open) + result: Result data from the tool (if successful) + error: Error message (if failed) + execution_time_ms: Execution time in milliseconds + timestamp: When the invocation occurred + """ + + tool_name: str + status: MCPInvocationStatus + result: Any | None = None + error: str | None = None + execution_time_ms: float | None = None + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + + model_config = ConfigDict(from_attributes=True) + + +class MCPHealthStatus(BaseModel): + """Health status of the MCP gateway. + + Attributes: + healthy: Overall health status + gateway_url: URL of the MCP gateway + latency_ms: Health check latency in milliseconds + circuit_breaker_state: Current circuit breaker state + last_check: When the last health check was performed + error: Error message if unhealthy + """ + + healthy: bool + gateway_url: str + latency_ms: float | None = None + circuit_breaker_state: str = "closed" # closed, open, half_open + last_check: datetime = Field(default_factory=lambda: datetime.now(UTC)) + error: str | None = None + + model_config = ConfigDict(from_attributes=True) + + +class MCPEnrichmentConfig(BaseModel): + """Configuration for MCP-based search result enrichment. + + Attributes: + enabled: Whether enrichment is enabled + tools: List of tool names to use for enrichment + timeout: Timeout for enrichment operations + parallel: Whether to run enrichment in parallel + fail_silently: Whether to continue if enrichment fails + """ + + enabled: bool = True + tools: list[str] = Field(default_factory=list) + timeout: float = 30.0 + parallel: bool = True + fail_silently: bool = True + + model_config = ConfigDict(from_attributes=True, extra="forbid") + + +class MCPEnrichmentResult(BaseModel): + """Result of MCP-based enrichment for a single item. + + Attributes: + tool_name: Name of the tool used + success: Whether the enrichment succeeded + data: Enrichment data (if successful) + error: Error message (if failed) + execution_time_ms: Time taken for enrichment + """ + + tool_name: str + success: bool + data: dict[str, Any] | None = None + error: str | None = None + execution_time_ms: float | None = None + + model_config = ConfigDict(from_attributes=True) + + +class MCPEnrichedSearchResult(BaseModel): + """Search result with MCP enrichment data. + + Attributes: + original_score: Original relevance score + enrichments: List of enrichment results from MCP tools + combined_score: Combined score after enrichment (if applicable) + """ + + original_score: float + enrichments: list[MCPEnrichmentResult] = Field(default_factory=list) + combined_score: float | None = None + + model_config = ConfigDict(from_attributes=True) diff --git a/backend/rag_solution/services/mcp_gateway_client.py b/backend/rag_solution/services/mcp_gateway_client.py new file mode 100644 index 00000000..e4fecb66 --- /dev/null +++ b/backend/rag_solution/services/mcp_gateway_client.py @@ -0,0 +1,639 @@ +"""Resilient MCP Gateway Client with circuit breaker pattern. + +This module provides a thin, resilient wrapper around the MCP Context Forge +Gateway, implementing health checks, circuit breaker, timeouts, and structured +logging as per the expert panel recommendations. + +Key features: +- Health monitoring with 5-second timeout +- Circuit breaker (5 failures, 60s recovery) +- Graceful degradation (core RAG works if tools fail) +- API versioning (v1 format) +- Prometheus-ready metrics +- Structured logging +""" + +import asyncio +import time +from datetime import UTC, datetime, timedelta +from enum import Enum +from typing import Any + +import httpx + +from core.config import Settings +from core.logging_utils import get_logger +from rag_solution.schemas.mcp_schema import ( + MCPHealthStatus, + MCPInvocationOutput, + MCPInvocationStatus, + MCPTool, + MCPToolParameter, + MCPToolsResponse, +) + +logger = get_logger(__name__) + + +class CircuitBreakerState(str, Enum): + """Circuit breaker state machine states.""" + + CLOSED = "closed" # Normal operation + OPEN = "open" # Failing, reject requests + HALF_OPEN = "half_open" # Testing if service recovered + + +class CircuitBreaker: + """Simple circuit breaker implementation. + + Tracks failures and opens the circuit when threshold is exceeded. + After recovery timeout, allows a test request through (half-open state). + + Attributes: + failure_threshold: Number of failures before opening circuit + recovery_timeout: Seconds to wait before testing recovery + state: Current circuit state + failure_count: Current number of consecutive failures + last_failure_time: Timestamp of last failure + """ + + def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0) -> None: + """Initialize circuit breaker. + + Args: + failure_threshold: Number of failures to trigger open state (default: 5) + recovery_timeout: Seconds to wait before half-open state (default: 60s) + """ + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.state = CircuitBreakerState.CLOSED + self.failure_count = 0 + self.last_failure_time: datetime | None = None + self._lock = asyncio.Lock() + + @property + def is_open(self) -> bool: + """Check if circuit is open (rejecting requests).""" + return self.state == CircuitBreakerState.OPEN + + async def check_state(self) -> CircuitBreakerState: + """Check and potentially transition circuit state. + + If circuit is open and recovery timeout has passed, + transition to half-open state to allow a test request. + + Returns: + Current circuit state after any transitions + """ + async with self._lock: + if self.state == CircuitBreakerState.OPEN and self.last_failure_time: + elapsed = datetime.now(UTC) - self.last_failure_time + if elapsed >= timedelta(seconds=self.recovery_timeout): + logger.info( + "Circuit breaker transitioning to half-open state", + extra={ + "recovery_timeout": self.recovery_timeout, + "elapsed_seconds": elapsed.total_seconds(), + }, + ) + self.state = CircuitBreakerState.HALF_OPEN + return self.state + + async def record_success(self) -> None: + """Record a successful call, resetting failure count.""" + async with self._lock: + previous_state = self.state + self.failure_count = 0 + self.state = CircuitBreakerState.CLOSED + + if previous_state != CircuitBreakerState.CLOSED: + logger.info( + "Circuit breaker closed after successful call", + extra={ + "previous_state": previous_state.value, + "current_state": self.state.value, + }, + ) + + async def record_failure(self) -> None: + """Record a failed call, potentially opening the circuit.""" + async with self._lock: + self.failure_count += 1 + self.last_failure_time = datetime.now(UTC) + + if self.failure_count >= self.failure_threshold: + previous_state = self.state + self.state = CircuitBreakerState.OPEN + logger.warning( + "Circuit breaker opened after threshold exceeded", + extra={ + "failure_count": self.failure_count, + "threshold": self.failure_threshold, + "recovery_timeout": self.recovery_timeout, + "previous_state": previous_state.value, + }, + ) + else: + logger.debug( + "Circuit breaker recorded failure", + extra={ + "failure_count": self.failure_count, + "threshold": self.failure_threshold, + }, + ) + + +class ResilientMCPGatewayClient: + """Resilient client for MCP Context Forge Gateway. + + Implements the expert panel's recommended thin wrapper approach: + - ~100 lines core logic + - Health checks with 5s timeout + - Circuit breaker (5 failures, 60s recovery) + - Graceful degradation + - Structured logging + + Usage: + settings = get_settings() + client = ResilientMCPGatewayClient(settings) + + # Check health + health = await client.check_health() + + # List available tools + tools = await client.list_tools() + + # Invoke a tool + result = await client.invoke_tool("powerpoint_generator", {"topic": "AI"}) + + Attributes: + settings: Application settings + gateway_url: MCP gateway base URL + timeout: Request timeout in seconds + health_timeout: Health check timeout + circuit_breaker: Circuit breaker instance + """ + + def __init__(self, settings: Settings) -> None: + """Initialize the MCP gateway client. + + Args: + settings: Application settings with MCP configuration + """ + self.settings = settings + self.gateway_url = settings.mcp_gateway_url.rstrip("/") + self.timeout = settings.mcp_timeout + self.health_timeout = settings.mcp_health_timeout + self.max_retries = settings.mcp_max_retries + self.jwt_token = settings.mcp_jwt_token + + # Initialize circuit breaker + self.circuit_breaker = CircuitBreaker( + failure_threshold=settings.mcp_circuit_breaker_threshold, + recovery_timeout=settings.mcp_circuit_breaker_timeout, + ) + + # Metrics counters (Prometheus-ready) + self._metrics = { + "requests_total": 0, + "requests_success": 0, + "requests_failed": 0, + "requests_circuit_open": 0, + "health_checks_total": 0, + "health_checks_success": 0, + } + + logger.info( + "MCP Gateway client initialized", + extra={ + "gateway_url": self.gateway_url, + "timeout": self.timeout, + "health_timeout": self.health_timeout, + "circuit_breaker_threshold": settings.mcp_circuit_breaker_threshold, + "circuit_breaker_timeout": settings.mcp_circuit_breaker_timeout, + }, + ) + + def _get_headers(self) -> dict[str, str]: + """Get HTTP headers for requests. + + Returns: + Dictionary of headers including auth if configured + """ + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if self.jwt_token: + headers["Authorization"] = f"Bearer {self.jwt_token}" + return headers + + async def check_health(self) -> MCPHealthStatus: + """Check MCP gateway health. + + Performs a health check with 5-second timeout as per requirements. + Does NOT trigger circuit breaker on health check failures. + + Returns: + MCPHealthStatus with health information + """ + self._metrics["health_checks_total"] += 1 + start_time = time.perf_counter() + + try: + async with httpx.AsyncClient(timeout=self.health_timeout) as client: + response = await client.get( + f"{self.gateway_url}/health", + headers=self._get_headers(), + ) + response.raise_for_status() + + latency_ms = (time.perf_counter() - start_time) * 1000 + self._metrics["health_checks_success"] += 1 + + logger.debug( + "MCP gateway health check succeeded", + extra={ + "latency_ms": latency_ms, + "status_code": response.status_code, + }, + ) + + return MCPHealthStatus( + healthy=True, + gateway_url=self.gateway_url, + latency_ms=latency_ms, + circuit_breaker_state=self.circuit_breaker.state.value, + ) + + except httpx.TimeoutException: + latency_ms = (time.perf_counter() - start_time) * 1000 + logger.warning( + "MCP gateway health check timed out", + extra={ + "timeout": self.health_timeout, + "latency_ms": latency_ms, + }, + ) + return MCPHealthStatus( + healthy=False, + gateway_url=self.gateway_url, + latency_ms=latency_ms, + circuit_breaker_state=self.circuit_breaker.state.value, + error=f"Health check timed out after {self.health_timeout}s", + ) + + except httpx.HTTPStatusError as e: + latency_ms = (time.perf_counter() - start_time) * 1000 + logger.warning( + "MCP gateway health check failed with HTTP error", + extra={ + "status_code": e.response.status_code, + "latency_ms": latency_ms, + }, + ) + return MCPHealthStatus( + healthy=False, + gateway_url=self.gateway_url, + latency_ms=latency_ms, + circuit_breaker_state=self.circuit_breaker.state.value, + error=f"HTTP {e.response.status_code}", + ) + + except Exception as e: + latency_ms = (time.perf_counter() - start_time) * 1000 + logger.warning( + "MCP gateway health check failed", + extra={ + "error": str(e), + "latency_ms": latency_ms, + }, + ) + return MCPHealthStatus( + healthy=False, + gateway_url=self.gateway_url, + latency_ms=latency_ms, + circuit_breaker_state=self.circuit_breaker.state.value, + error=str(e), + ) + + async def list_tools(self) -> MCPToolsResponse: + """List available MCP tools from the gateway. + + Respects circuit breaker state. Falls back gracefully if gateway unavailable. + + Returns: + MCPToolsResponse with available tools + + Raises: + ExternalServiceError: If circuit is open or request fails after retries + """ + state = await self.circuit_breaker.check_state() + + if state == CircuitBreakerState.OPEN: + self._metrics["requests_circuit_open"] += 1 + logger.warning( + "Circuit breaker open, rejecting list_tools request", + extra={"circuit_state": state.value}, + ) + return MCPToolsResponse(tools=[], total_count=0, gateway_healthy=False) + + self._metrics["requests_total"] += 1 + start_time = time.perf_counter() + + for attempt in range(self.max_retries + 1): + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.get( + f"{self.gateway_url}/api/v1/tools", + headers=self._get_headers(), + ) + response.raise_for_status() + data = response.json() + + # Parse tools from response + tools = [] + for tool_data in data.get("tools", []): + params = [ + MCPToolParameter( + name=p.get("name", ""), + type=p.get("type", "string"), + description=p.get("description"), + required=p.get("required", False), + default=p.get("default"), + ) + for p in tool_data.get("parameters", []) + ] + tools.append( + MCPTool( + name=tool_data.get("name", ""), + description=tool_data.get("description", ""), + parameters=params, + category=tool_data.get("category"), + version=tool_data.get("version", "v1"), + enabled=tool_data.get("enabled", True), + ) + ) + + await self.circuit_breaker.record_success() + self._metrics["requests_success"] += 1 + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.debug( + "Successfully listed MCP tools", + extra={ + "tool_count": len(tools), + "latency_ms": elapsed_ms, + }, + ) + + return MCPToolsResponse( + tools=tools, + total_count=len(tools), + gateway_healthy=True, + ) + + except (httpx.TimeoutException, httpx.HTTPStatusError, httpx.RequestError) as e: + if attempt < self.max_retries: + delay = 2**attempt # Exponential backoff + logger.warning( + "MCP list_tools failed, retrying", + extra={ + "attempt": attempt + 1, + "max_retries": self.max_retries, + "delay": delay, + "error": str(e), + }, + ) + await asyncio.sleep(delay) + continue + + await self.circuit_breaker.record_failure() + self._metrics["requests_failed"] += 1 + + logger.error( + "MCP list_tools failed after retries", + extra={ + "attempts": attempt + 1, + "error": str(e), + }, + ) + + # Return empty response for graceful degradation + return MCPToolsResponse(tools=[], total_count=0, gateway_healthy=False) + + # Should not reach here, but for type safety + return MCPToolsResponse(tools=[], total_count=0, gateway_healthy=False) + + async def invoke_tool( + self, + tool_name: str, + arguments: dict[str, Any] | None = None, + timeout: float | None = None, + ) -> MCPInvocationOutput: + """Invoke an MCP tool. + + Implements graceful degradation - core RAG functionality is not affected + if tool invocation fails. + + Args: + tool_name: Name of the tool to invoke + arguments: Tool arguments dictionary + timeout: Optional timeout override + + Returns: + MCPInvocationOutput with result or error information + """ + state = await self.circuit_breaker.check_state() + + if state == CircuitBreakerState.OPEN: + self._metrics["requests_circuit_open"] += 1 + logger.warning( + "Circuit breaker open, rejecting invoke_tool request", + extra={ + "tool_name": tool_name, + "circuit_state": state.value, + }, + ) + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.CIRCUIT_OPEN, + error="Circuit breaker is open - MCP gateway temporarily unavailable", + ) + + self._metrics["requests_total"] += 1 + start_time = time.perf_counter() + request_timeout = timeout or self.timeout + + for attempt in range(self.max_retries + 1): + try: + async with httpx.AsyncClient(timeout=request_timeout) as client: + response = await client.post( + f"{self.gateway_url}/api/v1/tools/{tool_name}/invoke", + json={"arguments": arguments or {}}, + headers=self._get_headers(), + ) + response.raise_for_status() + data = response.json() + + await self.circuit_breaker.record_success() + self._metrics["requests_success"] += 1 + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.info( + "MCP tool invocation succeeded", + extra={ + "tool_name": tool_name, + "execution_time_ms": elapsed_ms, + }, + ) + + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.SUCCESS, + result=data.get("result"), + execution_time_ms=elapsed_ms, + ) + + except httpx.TimeoutException: + if attempt < self.max_retries: + delay = 2**attempt + logger.warning( + "MCP tool invocation timed out, retrying", + extra={ + "tool_name": tool_name, + "attempt": attempt + 1, + "delay": delay, + "timeout": request_timeout, + }, + ) + await asyncio.sleep(delay) + continue + + await self.circuit_breaker.record_failure() + self._metrics["requests_failed"] += 1 + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "MCP tool invocation timed out after retries", + extra={ + "tool_name": tool_name, + "timeout": request_timeout, + "execution_time_ms": elapsed_ms, + }, + ) + + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.TIMEOUT, + error=f"Tool invocation timed out after {request_timeout}s", + execution_time_ms=elapsed_ms, + ) + + except httpx.HTTPStatusError as e: + if attempt < self.max_retries and e.response.status_code >= 500: + delay = 2**attempt + logger.warning( + "MCP tool invocation failed with server error, retrying", + extra={ + "tool_name": tool_name, + "status_code": e.response.status_code, + "attempt": attempt + 1, + "delay": delay, + }, + ) + await asyncio.sleep(delay) + continue + + await self.circuit_breaker.record_failure() + self._metrics["requests_failed"] += 1 + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + error_detail = e.response.text[:200] if e.response.text else str(e) + + logger.error( + "MCP tool invocation failed with HTTP error", + extra={ + "tool_name": tool_name, + "status_code": e.response.status_code, + "error": error_detail, + "execution_time_ms": elapsed_ms, + }, + ) + + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.ERROR, + error=f"HTTP {e.response.status_code}: {error_detail}", + execution_time_ms=elapsed_ms, + ) + + except Exception as e: + if attempt < self.max_retries: + delay = 2**attempt + logger.warning( + "MCP tool invocation failed, retrying", + extra={ + "tool_name": tool_name, + "error": str(e), + "attempt": attempt + 1, + "delay": delay, + }, + ) + await asyncio.sleep(delay) + continue + + await self.circuit_breaker.record_failure() + self._metrics["requests_failed"] += 1 + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "MCP tool invocation failed after retries", + extra={ + "tool_name": tool_name, + "error": str(e), + "execution_time_ms": elapsed_ms, + }, + ) + + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.ERROR, + error=str(e), + execution_time_ms=elapsed_ms, + ) + + # Should not reach here, but for type safety + return MCPInvocationOutput( + tool_name=tool_name, + status=MCPInvocationStatus.ERROR, + error="Unknown error after retries", + ) + + def get_metrics(self) -> dict[str, Any]: + """Get client metrics for monitoring. + + Returns: + Dictionary of Prometheus-ready metrics + """ + return { + **self._metrics, + "circuit_breaker_state": self.circuit_breaker.state.value, + "circuit_breaker_failure_count": self.circuit_breaker.failure_count, + } + + async def is_available(self) -> bool: + """Quick availability check. + + Checks if MCP gateway is available without full health check. + Useful for conditional enrichment logic. + + Returns: + True if gateway appears available, False otherwise + """ + state = await self.circuit_breaker.check_state() + if state == CircuitBreakerState.OPEN: + return False + + health = await self.check_health() + return health.healthy diff --git a/backend/rag_solution/services/search_result_enricher.py b/backend/rag_solution/services/search_result_enricher.py new file mode 100644 index 00000000..b83ebf8a --- /dev/null +++ b/backend/rag_solution/services/search_result_enricher.py @@ -0,0 +1,509 @@ +"""Search Result Enricher using MCP tools. + +This module implements the Content Enricher pattern as recommended by +Gregor Hohpe, maintaining clean separation between core search and +optional tool enrichment. + +Key features: +- Parallel execution for efficiency +- Retry logic with exponential backoff +- Error isolation (enrichment failures don't break search) +- Configurable tool selection +- Graceful degradation +""" + +import asyncio +import time +from typing import Any + +from core.config import Settings +from core.logging_utils import get_logger +from rag_solution.schemas.mcp_schema import ( + MCPEnrichedSearchResult, + MCPEnrichmentConfig, + MCPEnrichmentResult, + MCPInvocationStatus, +) +from rag_solution.schemas.search_schema import SearchOutput +from rag_solution.services.mcp_gateway_client import ResilientMCPGatewayClient +from vectordbs.data_types import QueryResult + +logger = get_logger(__name__) + + +class SearchResultEnricher: + """Enriches search results using MCP tools. + + Implements the Content Enricher pattern from Enterprise Integration Patterns: + - Core search results pass through unchanged if enrichment fails + - Enrichment is optional and non-blocking + - Parallel execution for multiple tools + - Error isolation prevents cascading failures + + Usage: + settings = get_settings() + enricher = SearchResultEnricher(settings) + + # Enrich search results + config = MCPEnrichmentConfig( + enabled=True, + tools=["summarizer", "entity_extractor"], + parallel=True + ) + enriched_output = await enricher.enrich(search_output, config) + + Attributes: + settings: Application settings + mcp_client: MCP gateway client + max_concurrent: Maximum concurrent enrichment operations + """ + + def __init__(self, settings: Settings) -> None: + """Initialize the search result enricher. + + Args: + settings: Application settings with MCP configuration + """ + self.settings = settings + self._mcp_client: ResilientMCPGatewayClient | None = None + self.max_concurrent = settings.mcp_max_concurrent + self.default_timeout = settings.mcp_timeout + + logger.info( + "Search result enricher initialized", + extra={ + "mcp_enabled": settings.mcp_enabled, + "enrichment_enabled": settings.mcp_enrichment_enabled, + "max_concurrent": self.max_concurrent, + }, + ) + + @property + def mcp_client(self) -> ResilientMCPGatewayClient: + """Lazy-initialize MCP client.""" + if self._mcp_client is None: + self._mcp_client = ResilientMCPGatewayClient(self.settings) + return self._mcp_client + + async def enrich( + self, + search_output: SearchOutput, + config: MCPEnrichmentConfig | None = None, + ) -> SearchOutput: + """Enrich search output with MCP tool results. + + This is the main entry point for enrichment. It: + 1. Checks if enrichment is enabled + 2. Validates MCP gateway availability + 3. Runs enrichment tools (parallel or sequential) + 4. Merges results into search output metadata + + Core search results are NEVER modified or removed - only metadata + is added. This ensures graceful degradation. + + Args: + search_output: Original search output to enrich + config: Optional enrichment configuration + + Returns: + SearchOutput with enrichment data in metadata field + """ + # Use default config if not provided + if config is None: + config = MCPEnrichmentConfig( + enabled=self.settings.mcp_enrichment_enabled, + timeout=self.default_timeout, + ) + + # Skip if enrichment disabled + if not config.enabled or not self.settings.mcp_enabled: + logger.debug("Enrichment disabled, returning original results") + return search_output + + start_time = time.perf_counter() + + try: + # Check gateway availability + if not await self.mcp_client.is_available(): + logger.warning("MCP gateway unavailable, skipping enrichment") + return self._add_enrichment_metadata( + search_output, + success=False, + error="MCP gateway unavailable", + execution_time_ms=0, + ) + + # Get available tools if not specified + tools_to_use = config.tools + if not tools_to_use: + tools_response = await self.mcp_client.list_tools() + tools_to_use = [t.name for t in tools_response.tools if t.enabled] + + if not tools_to_use: + logger.debug("No MCP tools available for enrichment") + return search_output + + # Run enrichment + if config.parallel: + enrichment_results = await self._enrich_parallel(search_output, tools_to_use, config.timeout) + else: + enrichment_results = await self._enrich_sequential(search_output, tools_to_use, config.timeout) + + elapsed_ms = (time.perf_counter() - start_time) * 1000 + + # Filter successful enrichments + successful = [r for r in enrichment_results if r.success] + failed = [r for r in enrichment_results if not r.success] + + logger.info( + "Search result enrichment completed", + extra={ + "tools_used": len(tools_to_use), + "successful": len(successful), + "failed": len(failed), + "execution_time_ms": elapsed_ms, + }, + ) + + return self._merge_enrichments( + search_output, + enrichment_results, + execution_time_ms=elapsed_ms, + ) + + except Exception as e: + elapsed_ms = (time.perf_counter() - start_time) * 1000 + logger.error( + "Enrichment failed with unexpected error", + extra={ + "error": str(e), + "execution_time_ms": elapsed_ms, + }, + exc_info=True, + ) + + if config.fail_silently: + return self._add_enrichment_metadata( + search_output, + success=False, + error=str(e), + execution_time_ms=elapsed_ms, + ) + raise + + async def enrich_query_results( + self, + query_results: list[QueryResult], + tool_name: str, + tool_arguments: dict[str, Any] | None = None, + ) -> list[MCPEnrichedSearchResult]: + """Enrich individual query results with a specific tool. + + Useful for per-result enrichment like summarization or + entity extraction on each chunk. + + Args: + query_results: List of query results to enrich + tool_name: Name of the MCP tool to use + tool_arguments: Additional arguments for the tool + + Returns: + List of enriched search results with tool output + """ + if not self.settings.mcp_enabled: + return [ + MCPEnrichedSearchResult( + original_score=qr.score, + enrichments=[], + ) + for qr in query_results + ] + + enriched_results = [] + semaphore = asyncio.Semaphore(self.max_concurrent) + + async def enrich_single(qr: QueryResult) -> MCPEnrichedSearchResult: + async with semaphore: + start_time = time.perf_counter() + # Get text from chunk if available + chunk_text = qr.chunk.text if qr.chunk and qr.chunk.text else "" + args = { + "text": chunk_text, + **(tool_arguments or {}), + } + + result = await self.mcp_client.invoke_tool(tool_name, args) + elapsed_ms = (time.perf_counter() - start_time) * 1000 + + enrichment = MCPEnrichmentResult( + tool_name=tool_name, + success=result.status == MCPInvocationStatus.SUCCESS, + data={"result": result.result} if result.result else None, + error=result.error, + execution_time_ms=elapsed_ms, + ) + + return MCPEnrichedSearchResult( + original_score=qr.score, + enrichments=[enrichment], + ) + + tasks = [enrich_single(qr) for qr in query_results] + enriched_results = await asyncio.gather(*tasks, return_exceptions=True) + + # Handle any exceptions by returning non-enriched results + final_results = [] + for i, result in enumerate(enriched_results): + if isinstance(result, Exception): + logger.warning( + "Failed to enrich query result", + extra={ + "index": i, + "error": str(result), + }, + ) + final_results.append( + MCPEnrichedSearchResult( + original_score=query_results[i].score, + enrichments=[ + MCPEnrichmentResult( + tool_name=tool_name, + success=False, + error=str(result), + ) + ], + ) + ) + else: + final_results.append(result) + + return final_results + + async def _enrich_parallel( + self, + search_output: SearchOutput, + tools: list[str], + timeout: float, + ) -> list[MCPEnrichmentResult]: + """Run enrichment tools in parallel. + + Uses semaphore to limit concurrency and prevent overwhelming + the MCP gateway. + + Args: + search_output: Search output to enrich + tools: List of tool names to run + timeout: Timeout per tool + + Returns: + List of enrichment results + """ + semaphore = asyncio.Semaphore(self.max_concurrent) + + async def run_tool(tool_name: str) -> MCPEnrichmentResult: + async with semaphore: + return await self._invoke_enrichment_tool(search_output, tool_name, timeout) + + tasks = [run_tool(tool) for tool in tools] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Convert exceptions to error results + enrichment_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.warning( + "Parallel enrichment tool failed", + extra={ + "tool": tools[i], + "error": str(result), + }, + ) + enrichment_results.append( + MCPEnrichmentResult( + tool_name=tools[i], + success=False, + error=str(result), + ) + ) + else: + enrichment_results.append(result) + + return enrichment_results + + async def _enrich_sequential( + self, + search_output: SearchOutput, + tools: list[str], + timeout: float, + ) -> list[MCPEnrichmentResult]: + """Run enrichment tools sequentially. + + Useful when tools have dependencies or when order matters. + + Args: + search_output: Search output to enrich + tools: List of tool names to run + timeout: Timeout per tool + + Returns: + List of enrichment results + """ + results = [] + for tool_name in tools: + result = await self._invoke_enrichment_tool(search_output, tool_name, timeout) + results.append(result) + return results + + async def _invoke_enrichment_tool( + self, + search_output: SearchOutput, + tool_name: str, + timeout: float, + ) -> MCPEnrichmentResult: + """Invoke a single enrichment tool. + + Prepares arguments from search output and calls the MCP tool. + + Args: + search_output: Search output providing context + tool_name: Name of the tool to invoke + timeout: Request timeout + + Returns: + MCPEnrichmentResult with tool output + """ + start_time = time.perf_counter() + + # Prepare tool arguments from search context + arguments = { + "query": search_output.rewritten_query or "", + "answer": search_output.answer, + "documents": [ + { + "document_name": doc.document_name, + "title": doc.title, + "content_type": getattr(doc, "content_type", None), + } + for doc in search_output.documents[:5] # Limit to top 5 + ], + "chunks": [ + { + "text": (qr.chunk.text[:500] if qr.chunk and qr.chunk.text else ""), # Limit text length + "score": qr.score, + } + for qr in search_output.query_results[:5] + ], + } + + result = await self.mcp_client.invoke_tool(tool_name, arguments, timeout) + elapsed_ms = (time.perf_counter() - start_time) * 1000 + + return MCPEnrichmentResult( + tool_name=tool_name, + success=result.status == MCPInvocationStatus.SUCCESS, + data={"result": result.result} if result.result else None, + error=result.error, + execution_time_ms=elapsed_ms, + ) + + def _merge_enrichments( + self, + search_output: SearchOutput, + enrichments: list[MCPEnrichmentResult], + execution_time_ms: float, + ) -> SearchOutput: + """Merge enrichment results into search output metadata. + + Does NOT modify original search results - only adds enrichment + data to the metadata field. + + Args: + search_output: Original search output + enrichments: List of enrichment results + execution_time_ms: Total enrichment time + + Returns: + SearchOutput with enrichment metadata + """ + # Prepare enrichment summary + enrichment_data = { + "mcp_enrichment": { + "enabled": True, + "success": any(e.success for e in enrichments), + "execution_time_ms": execution_time_ms, + "tools": [ + { + "name": e.tool_name, + "success": e.success, + "data": e.data, + "error": e.error, + "execution_time_ms": e.execution_time_ms, + } + for e in enrichments + ], + } + } + + # Merge with existing metadata + existing_metadata = search_output.metadata or {} + merged_metadata = {**existing_metadata, **enrichment_data} + + # Create new output with enrichment metadata + return SearchOutput( + answer=search_output.answer, + documents=search_output.documents, + query_results=search_output.query_results, + rewritten_query=search_output.rewritten_query, + evaluation=search_output.evaluation, + execution_time=search_output.execution_time, + cot_output=search_output.cot_output, + metadata=merged_metadata, + token_warning=search_output.token_warning, + structured_answer=search_output.structured_answer, + ) + + def _add_enrichment_metadata( + self, + search_output: SearchOutput, + success: bool, + error: str | None = None, + execution_time_ms: float = 0, + ) -> SearchOutput: + """Add basic enrichment metadata without actual enrichment. + + Used for error cases and when enrichment is skipped. + + Args: + search_output: Original search output + success: Whether enrichment was successful + error: Error message if failed + execution_time_ms: Time spent attempting enrichment + + Returns: + SearchOutput with basic enrichment metadata + """ + enrichment_data = { + "mcp_enrichment": { + "enabled": True, + "success": success, + "execution_time_ms": execution_time_ms, + "error": error, + "tools": [], + } + } + + existing_metadata = search_output.metadata or {} + merged_metadata = {**existing_metadata, **enrichment_data} + + return SearchOutput( + answer=search_output.answer, + documents=search_output.documents, + query_results=search_output.query_results, + rewritten_query=search_output.rewritten_query, + evaluation=search_output.evaluation, + execution_time=search_output.execution_time, + cot_output=search_output.cot_output, + metadata=merged_metadata, + token_warning=search_output.token_warning, + structured_answer=search_output.structured_answer, + ) diff --git a/docker-compose-infra.yml b/docker-compose-infra.yml index 5b81d82d..d30adf3c 100644 --- a/docker-compose-infra.yml +++ b/docker-compose-infra.yml @@ -138,6 +138,95 @@ services: networks: - app-network + # ============================================================================ + # MCP Context Forge Services (Optional - only started with --profile mcp) + # ============================================================================ + # To enable: Set ENABLE_MCP_GATEWAY=true in your .env file + # The Makefile will automatically pass --profile mcp to docker-compose + # ============================================================================ + + # Redis for MCP Context Forge gateway caching and session management + redis: + container_name: redis + image: redis:7-alpine + profiles: + - mcp + ports: + - "6379:6379" + volumes: + - redis_data:/data + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 5s + networks: + - app-network + + # MCP Context Forge Gateway - Model Context Protocol server + # Provides tool discovery and invocation capabilities for RAG enrichment + # Port 3001 by default to avoid conflict with frontend (port 3000) + # + # AUTHENTICATION ARCHITECTURE: + # RAG Modulo uses PROXY AUTHENTICATION - it acts as a trusted backend service + # that passes authenticated user identity via headers. No JWT token exchange needed. + # See: https://ibm.github.io/mcp-context-forge/manage/proxy/ + # + # IMAGE VERSION: Pinned to SHA digest for reproducibility (updated 2025-12-01) + # To update: docker pull ghcr.io/ibm/mcp-context-forge:latest && docker inspect --format='{{index .RepoDigests 0}}' + mcp-context-forge: + container_name: mcp-context-forge + image: ghcr.io/ibm/mcp-context-forge@sha256:654c72fd4d2ed3ce0716214e6adf517fbba3ef105f39864a0deb326f90475797 + profiles: + - mcp + ports: + - "${MCP_PORT:-3001}:${MCP_PORT:-3001}" + environment: + # Server port (MCP Context Forge uses PORT env var) + PORT: ${MCP_PORT:-3001} + HOST: 0.0.0.0 + # Redis configuration for caching + REDIS_URL: redis://redis:6379 + # ======================================== + # PROXY AUTHENTICATION (Recommended) + # ======================================== + # RAG Modulo is a TRUSTED BACKEND SERVICE, not an end-user client. + # With proxy auth, RAG Modulo passes the authenticated user's identity + # via a header, and MCP trusts it without requiring JWT token exchange. + TRUST_PROXY_AUTH: ${MCP_TRUST_PROXY_AUTH:-true} + PROXY_USER_HEADER: ${MCP_PROXY_USER_HEADER:-X-Authenticated-User} + # Disable client-side JWT auth when using proxy auth + MCP_CLIENT_AUTH_ENABLED: ${MCP_CLIENT_AUTH_ENABLED:-false} + # ======================================== + # ADMIN UI AUTHENTICATION + # ======================================== + # AUTH_REQUIRED controls the admin web UI, not API calls + AUTH_REQUIRED: ${MCP_AUTH_REQUIRED:-false} + # Admin credentials for web UI (only needed if AUTH_REQUIRED=true) + PLATFORM_ADMIN_EMAIL: ${MCP_ADMIN_EMAIL:-admin@example.com} + PLATFORM_ADMIN_PASSWORD: ${MCP_ADMIN_PASSWORD:-change-me-in-production} + # JWT secret for admin UI tokens (not used for proxy auth) + JWT_SECRET_KEY: ${MCP_JWT_SECRET:-dev-jwt-secret-change-in-production} + # Logging + LOG_LEVEL: ${MCP_LOG_LEVEL:-info} + volumes: + - mcp_tools:/app/tools + healthcheck: + # Use wget instead of curl (Alpine-based image) + # Note: Health check uses internal PORT env var set above + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:$${PORT}/health"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 10s + depends_on: + redis: + condition: service_healthy + networks: + - app-network + volumes: postgres_data: driver_opts: @@ -159,6 +248,16 @@ volumes: type: none device: ${PWD}/volumes/milvus o: bind + redis_data: + driver_opts: + type: none + device: ${PWD}/volumes/redis + o: bind + mcp_tools: + driver_opts: + type: none + device: ${PWD}/volumes/mcp_tools + o: bind networks: app-network: diff --git a/docs/design/mcp-context-forge-integration.md b/docs/design/mcp-context-forge-integration.md index 533a7a3c..e67768c5 100644 --- a/docs/design/mcp-context-forge-integration.md +++ b/docs/design/mcp-context-forge-integration.md @@ -1,48 +1,42 @@ # MCP Context Forge Integration with RAG Modulo **Date**: November 2025 -**Status**: Integration Design Proposal -**Version**: 1.0 +**Status**: Implemented +**Version**: 2.0 **Parent Design**: [Agent and MCP Support Architecture](./agent-mcp-architecture.md) ## Executive Summary -This document proposes integrating IBM's **MCP Context Forge** as the central gateway for RAG Modulo's agent and MCP ecosystem. Instead of building a custom MCP client from scratch, we leverage Context Forge's production-ready federation, protocol translation, security, and admin UI capabilities. +This document describes the integration of IBM's **MCP Context Forge** as the central gateway for +RAG Modulo's agent and MCP ecosystem. The integration uses **Proxy Authentication** for simple, +secure communication between RAG Modulo and MCP Context Forge. ## Why MCP Context Forge? ### Alignment with RAG Modulo's Agent Architecture -The original agent-mcp-architecture.md design proposed: - -- Custom `MCPClient` for communicating with MCP servers -- Custom registry for agent discovery -- Custom protocol handling -- Custom authentication and rate limiting - -**MCP Context Forge provides all of this out-of-the-box**: +MCP Context Forge provides production-ready capabilities: | RAG Modulo Need | Context Forge Solution | |-----------------|------------------------| | MCP Client | Built-in protocol translation (stdio, SSE, WebSocket, HTTP) | | Agent Registry | Unified registry of tools, resources, and prompts | | Multi-protocol support | Virtualizes REST/gRPC as MCP servers | -| Authentication | Bearer token auth with JWT + RBAC | +| Authentication | Proxy authentication (trusted backend) | | Rate limiting | Built-in with Redis backing | -| Observability | OpenTelemetry integration (Phoenix, Jaeger, Zipkin) | +| Observability | OpenTelemetry integration | | Admin UI | HTMX/Alpine.js management interface | | Federation | Redis-backed distributed deployment | ### Benefits -1. **Reduced Development Time**: 2 weeks → 3-4 days (80% reduction) -2. **Production-Ready**: Battle-tested gateway with 400+ tests +1. **Reduced Development Time**: Leverages production-ready gateway +2. **Simple Authentication**: Proxy auth eliminates JWT token management 3. **Extensibility**: Supports non-MCP services (REST, gRPC) as virtual MCP servers -4. **Centralized Management**: Single admin UI for all agents/tools -5. **Enterprise Features**: RBAC, team management, audit logging -6. **Scalability**: Redis federation for distributed deployments +4. **Resilience**: Circuit breaker pattern with graceful degradation +5. **Scalability**: Redis federation for distributed deployments -## Architecture Integration +## Architecture ### High-Level Architecture @@ -58,42 +52,37 @@ The original agent-mcp-architecture.md design proposed: │ (FastAPI Services) │ │ │ │ ┌──────────────────────────────────────────────────────┐ │ -│ │ AgentService (Enhanced) │ │ -│ │ - Agent config CRUD │ │ -│ │ - Collection-agent association │ │ -│ │ - Pipeline execution orchestration │ │ +│ │ SearchService │ │ +│ │ - Query processing │ │ +│ │ - Vector search │ │ +│ │ - Result enrichment via MCP │ │ │ └───────────────────┬──────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ -│ │ ContextForgeClient (New) │ │ -│ │ - Bearer token auth │ │ -│ │ - Tool invocation via SSE/HTTP │ │ -│ │ - Resource fetching │ │ -│ │ - Gateway management (create virtual servers) │ │ +│ │ ResilientMCPGatewayClient │ │ +│ │ - Circuit breaker (5 failures, 60s recovery) │ │ +│ │ - Exponential backoff retries │ │ +│ │ - Proxy authentication via header │ │ +│ │ - Prometheus-ready metrics │ │ │ └───────────────────┬──────────────────────────────────┘ │ └────────────────────────┼────────────────────────────────────┘ │ - ▼ (HTTP/SSE/WebSocket) + │ X-Authenticated-User header + ▼ ┌─────────────────────────────────────────────────────────────┐ │ MCP Context Forge Gateway │ -│ (IBM OSS Project) │ +│ TRUST_PROXY_AUTH=true │ │ │ │ ┌────────────────┐ ┌────────────────┐ ┌──────────────┐ │ -│ │ Protocol │ │ Authentication │ │ Federation │ │ -│ │ Translation │ │ & RBAC │ │ (Redis) │ │ -│ └────────────────┘ └────────────────┘ └──────────────┘ │ -│ │ -│ ┌────────────────┐ ┌────────────────┐ ┌──────────────┐ │ -│ │ Tool Registry │ │ Rate Limiting │ │ Observability│ │ -│ │ & Discovery │ │ & Retries │ │ (OpenTelemetry)│ +│ │ Protocol │ │ Tool │ │ Session │ │ +│ │ Translation │ │ Registry │ │ (Redis) │ │ │ └────────────────┘ └────────────────┘ └──────────────┘ │ │ │ │ ┌────────────────────────────────────────────────────────┐│ │ │ Admin UI (HTMX + Alpine.js) ││ -│ │ - Manage gateways, tools, servers ││ -│ │ - Monitor agent execution ││ -│ │ - Team/RBAC management ││ +│ │ - Manage tools ││ +│ │ - Monitor execution ││ │ └────────────────────────────────────────────────────────┘│ └────────────────────────┬────────────────────────────────────┘ │ @@ -113,1017 +102,431 @@ The original agent-mcp-architecture.md design proposed: └─────────────────────────────────────────────────────────────┘ ``` -### Key Components - -#### 1. ContextForgeClient (New Component) +### Authentication Architecture -Replaces the custom `MCPClient` from the original design: - -```python -# backend/rag_solution/mcp/context_forge_client.py -import httpx -from typing import Any, Dict, List, Optional -from pydantic import BaseModel +RAG Modulo uses **Proxy Authentication** - a simple, secure approach where RAG Modulo acts as a trusted backend service: +``` +┌─────────────────┐ ┌─────────────────────┐ +│ RAG Modulo │ │ MCP Context Forge │ +│ Backend │ │ TRUST_PROXY_AUTH= │ +│ │ │ true │ +└───────┬─────────┘ └──────────┬──────────┘ + │ │ + │ GET /tools │ + │ X-Authenticated-User: user@example.com │ + │─────────────────────────────────────────▶│ + │ │ + │ [{name: "tool1", ...}] │ + │◀─────────────────────────────────────────│ + │ │ + │ POST /mcp │ + │ X-Authenticated-User: user@example.com │ + │ {method: "tools/call", params: {...}} │ + │─────────────────────────────────────────▶│ + │ │ + │ {result: {content: [...]}} │ + │◀─────────────────────────────────────────│ +``` -class ContextForgeConfig(BaseModel): - """Configuration for Context Forge gateway""" - gateway_url: str - api_token: str # JWT bearer token - timeout: int = 30 - max_retries: int = 3 - +**Benefits over JWT Token Authentication:** -class ToolInvocation(BaseModel): - """Request to invoke a tool""" - tool_name: str - arguments: Dict[str, Any] - gateway_id: Optional[str] = None # Virtual gateway to use +- No JWT token management complexity +- No token refresh logic needed +- No credential synchronization between services +- User identity flows through for audit logging +- MCP Context Forge trusts the header from RAG Modulo +See: [MCP Proxy Authentication Guide](https://ibm.github.io/mcp-context-forge/manage/proxy/) -class ToolResponse(BaseModel): - """Response from tool invocation""" - success: bool - result: Dict[str, Any] - error: Optional[str] = None - metadata: Dict[str, Any] = {} +## Key Components +### 1. ResilientMCPGatewayClient -class ContextForgeClient: - """ - Client for IBM MCP Context Forge Gateway +The implemented client (`backend/rag_solution/services/mcp_gateway_client.py`) provides: - Provides unified access to MCP tools, resources, and prompts - through Context Forge's federation layer. +```python +class ResilientMCPGatewayClient: + """Resilient client for MCP Context Forge Gateway. + + Key features: + - ~700 lines implementation + - Health checks with 5s timeout + - Circuit breaker (5 failures, 60s recovery) + - Proxy authentication via X-Authenticated-User header + - Exponential backoff retries + - Prometheus-ready metrics + - Structured logging """ - def __init__(self, config: ContextForgeConfig): - self.config = config - self.client = httpx.AsyncClient( - base_url=config.gateway_url, - headers={"Authorization": f"Bearer {config.api_token}"}, - timeout=config.timeout + def __init__(self, settings: Settings) -> None: + self.gateway_url = settings.mcp_gateway_url.rstrip("/") + self.timeout = settings.mcp_timeout + self._proxy_user_header = settings.mcp_proxy_user_header + self.circuit_breaker = CircuitBreaker( + failure_threshold=settings.mcp_circuit_breaker_threshold, + recovery_timeout=settings.mcp_circuit_breaker_timeout, ) - async def list_gateways(self) -> List[Dict[str, Any]]: - """List available virtual gateways""" - response = await self.client.get("/api/v1/gateways") - response.raise_for_status() - return response.json() - - async def list_tools(self, gateway_id: Optional[str] = None) -> List[Dict[str, Any]]: - """ - List available tools - - Args: - gateway_id: Optional specific virtual gateway to query - - Returns: - List of tool definitions with schemas - """ - params = {"gateway_id": gateway_id} if gateway_id else {} - response = await self.client.get("/api/v1/tools", params=params) - response.raise_for_status() - return response.json()["tools"] - - async def invoke_tool(self, invocation: ToolInvocation) -> ToolResponse: - """ - Invoke an MCP tool through Context Forge - - Args: - invocation: Tool name, arguments, and optional gateway - - Returns: - ToolResponse with result or error - """ - try: - # Context Forge handles protocol translation automatically - response = await self.client.post( - f"/api/v1/tools/{invocation.tool_name}/invoke", - json={ - "arguments": invocation.arguments, - "gateway_id": invocation.gateway_id - } - ) - response.raise_for_status() - - data = response.json() - return ToolResponse( - success=True, - result=data.get("result", {}), - metadata=data.get("metadata", {}) - ) - - except httpx.HTTPStatusError as e: - return ToolResponse( - success=False, - result={}, - error=f"HTTP {e.response.status_code}: {e.response.text}" - ) - - except Exception as e: - return ToolResponse( - success=False, - result={}, - error=str(e) - ) - - async def get_resource( - self, - resource_uri: str, - gateway_id: Optional[str] = None - ) -> Dict[str, Any]: - """ - Fetch an MCP resource - - Args: - resource_uri: URI of the resource to fetch - gateway_id: Optional specific gateway to use - - Returns: - Resource content - """ - params = { - "uri": resource_uri, - "gateway_id": gateway_id - } if gateway_id else {"uri": resource_uri} - - response = await self.client.get("/api/v1/resources", params=params) - response.raise_for_status() - return response.json() - - async def get_prompt( + def _get_headers(self, user_id: str | None = None) -> dict[str, str]: + """Get HTTP headers with proxy authentication.""" + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + } + if user_id: + headers[self._proxy_user_header] = user_id + return headers + + async def check_health(self) -> MCPHealthStatus: + """Check MCP gateway health with 5-second timeout.""" + ... + + async def list_tools(self, user_id: str | None = None) -> MCPToolsResponse: + """List available MCP tools from the gateway.""" + ... + + async def invoke_tool( self, - prompt_name: str, - gateway_id: Optional[str] = None - ) -> Dict[str, Any]: - """ - Get MCP prompt template - - Args: - prompt_name: Name of the prompt template - gateway_id: Optional specific gateway to use - - Returns: - Prompt template - """ - params = {"gateway_id": gateway_id} if gateway_id else {} - response = await self.client.get( - f"/api/v1/prompts/{prompt_name}", - params=params - ) - response.raise_for_status() - return response.json() - - async def create_virtual_gateway( - self, - name: str, - tool_ids: List[str], - description: Optional[str] = None - ) -> Dict[str, Any]: - """ - Create a virtual gateway bundling specific tools - - This enables creating custom agent bundles for collections. - - Args: - name: Name for the virtual gateway - tool_ids: List of tool IDs to include - description: Optional description - - Returns: - Virtual gateway details - """ - response = await self.client.post( - "/api/v1/gateways", - json={ - "name": name, - "tool_ids": tool_ids, - "description": description - } - ) - response.raise_for_status() - return response.json() - - async def register_external_server( - self, - name: str, - server_type: str, # "mcp", "rest", "grpc" - endpoint: str, - config: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Register an external MCP server or REST/gRPC service - - Context Forge will virtualize non-MCP services as MCP servers. - - Args: - name: Server name - server_type: Type of server (mcp, rest, grpc) - endpoint: Server endpoint URL - config: Server-specific configuration - - Returns: - Registered server details - """ - response = await self.client.post( - "/api/v1/servers", - json={ - "name": name, - "type": server_type, - "endpoint": endpoint, - "config": config - } - ) - response.raise_for_status() - return response.json() - - async def close(self): - """Close the HTTP client""" - await self.client.aclose() + tool_name: str, + arguments: dict[str, Any] | None = None, + timeout: float | None = None, + user_id: str | None = None, + ) -> MCPInvocationOutput: + """Invoke an MCP tool via /mcp JSON-RPC endpoint.""" + ... ``` -#### 2. Enhanced AgentService - -Update the `AgentService` to use `ContextForgeClient` instead of custom `MCPClient`: +### 2. Circuit Breaker Pattern ```python -# backend/rag_solution/services/agent_service.py (updated) -from rag_solution.mcp.context_forge_client import ContextForgeClient, ToolInvocation - - -class AgentService: - """Service for managing and executing agents""" - - def __init__( - self, - db: AsyncSession, - registry: AgentRegistry, - context_forge: ContextForgeClient # NEW - ): - self.db = db - self.registry = registry - self.context_forge = context_forge # Replace custom MCPClient - - async def create_collection_virtual_gateway( - self, - collection_id: UUID, - user_id: UUID - ) -> str: - """ - Create a virtual gateway in Context Forge for a collection - - Bundles all agents associated with the collection into - a single gateway for efficient execution. - - Returns: - Gateway ID - """ - # Get collection's agents - agent_configs = await self._get_collection_agents( - collection_id=collection_id, - enabled=True - ) - - # Map agents to Context Forge tool IDs - tool_ids = [] - for config in agent_configs: - # Agents can specify their Context Forge tool mapping - tool_id = config.config.get("context_forge_tool_id") - if tool_id: - tool_ids.append(tool_id) - - # Create virtual gateway - gateway = await self.context_forge.create_virtual_gateway( - name=f"collection_{collection_id}", - tool_ids=tool_ids, - description=f"Virtual gateway for collection {collection_id}" - ) - - return gateway["id"] - - async def execute_agents( - self, - context: AgentContext, - trigger_stage: str - ) -> List[AgentResult]: - """Execute all enabled agents for a collection at given stage""" - - # Get agent configs for collection - agent_configs = await self._get_collection_agents( - collection_id=context.collection_id, - trigger_stage=trigger_stage, - enabled=True - ) - - # Sort by priority - agent_configs.sort(key=lambda x: x.priority) - - results = [] - for config in agent_configs: - try: - # Check if agent is MCP-based - if config.config.get("type") == "mcp": - result = await self._execute_mcp_agent(config, context) - else: - # Execute built-in agent - agent = self.registry.get_agent( - agent_id=config.agent_id, - config=config.config - ) - result = await agent.execute( - context=context, - input_data=self._prepare_input(context, config) - ) - - results.append(result) - - # Update context for next agent - if not context.previous_agent_results: - context.previous_agent_results = [] - context.previous_agent_results.append(result) - - except Exception as e: - results.append(AgentResult( - agent_id=config.agent_id, - success=False, - data={}, - metadata={}, - errors=[str(e)] - )) - - return results - - async def _execute_mcp_agent( - self, - config: AgentConfig, - context: AgentContext - ) -> AgentResult: - """ - Execute an MCP-based agent via Context Forge - - Args: - config: Agent configuration with Context Forge tool mapping - context: Execution context - - Returns: - AgentResult - """ - tool_name = config.config.get("context_forge_tool_id") - gateway_id = config.config.get("gateway_id") - - # Prepare tool arguments from context - arguments = self._map_context_to_tool_args(context, config) - - # Invoke tool via Context Forge - invocation = ToolInvocation( - tool_name=tool_name, - arguments=arguments, - gateway_id=gateway_id - ) - - response = await self.context_forge.invoke_tool(invocation) +class CircuitBreaker: + """Circuit breaker for resilient MCP communication. - return AgentResult( - agent_id=config.agent_id, - success=response.success, - data=response.result, - metadata=response.metadata, - errors=[response.error] if response.error else None - ) + States: + - CLOSED: Normal operation, all requests pass through + - OPEN: Failures exceeded threshold, requests fail fast + - HALF_OPEN: After recovery timeout, allows test request + """ - def _map_context_to_tool_args( - self, - context: AgentContext, - config: AgentConfig - ) -> Dict[str, Any]: - """ - Map AgentContext to MCP tool arguments - - Uses config.config["argument_mapping"] to transform context - into tool-specific arguments. - """ - mapping = config.config.get("argument_mapping", {}) - args = {} - - for tool_arg, context_field in mapping.items(): - if context_field == "query": - args[tool_arg] = context.query - elif context_field == "documents": - args[tool_arg] = context.retrieved_documents - elif context_field == "conversation_history": - args[tool_arg] = context.conversation_history - # Add more mappings as needed - - return args + def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.state = CircuitBreakerState.CLOSED + self.failure_count = 0 ``` -#### 3. Updated Database Models +### 3. MCP Schema Models -Add Context Forge-specific fields to `AgentConfig`: +Located in `backend/rag_solution/schemas/mcp_schema.py`: ```python -# backend/rag_solution/models/agent.py (updated) -class AgentConfig(Base): - """User-configured agent instance""" - - __tablename__ = "agent_configs" - - id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) - user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) - agent_id = Column(String, nullable=False) - name = Column(String, nullable=False) - description = Column(String) - - # Configuration now includes Context Forge integration - config = Column(JSON, nullable=False) - # Example config structure: - # { - # "type": "mcp", # or "builtin" - # "context_forge_tool_id": "powerpoint_generator", - # "gateway_id": "collection_abc123", - # "argument_mapping": { - # "query": "query", - # "documents": "documents", - # "template": "config.template" - # }, - # "settings": { ... } - # } - - enabled = Column(Boolean, default=True) - trigger_stage = Column(String) - priority = Column(Integer, default=0) - - # Relationships - collections = relationship( - "Collection", - secondary=collection_agents, - back_populates="agents" - ) - user = relationship("User", back_populates="agent_configs") +class MCPHealthStatus(BaseModel): + healthy: bool + gateway_url: str + latency_ms: float | None = None + circuit_breaker_state: str + error: str | None = None + +class MCPTool(BaseModel): + name: str + description: str + parameters: list[MCPToolParameter] + category: str | None = None + version: str = "v1" + enabled: bool = True + +class MCPInvocationOutput(BaseModel): + tool_name: str + status: MCPInvocationStatus + result: Any | None = None + error: str | None = None + execution_time_ms: float | None = None ``` -#### 4. API Endpoints for Context Forge Integration +## API Endpoints -Add endpoints for managing Context Forge gateways and servers: +### MCP Context Forge Endpoints -```python -# backend/rag_solution/router/agent_router.py (updated) -from rag_solution.mcp.context_forge_client import ContextForgeClient +| Endpoint | Method | Headers | Description | +|----------|--------|---------|-------------| +| `/health` | GET | None | Health check | +| `/tools` | GET | `X-Authenticated-User` | List all tools | +| `/tools` | POST | `X-Authenticated-User`, `Content-Type` | Create a tool | +| `/tools/{id}` | DELETE | `X-Authenticated-User` | Delete a tool | +| `/mcp` | POST | `X-Authenticated-User`, `Content-Type`, `Accept` | JSON-RPC tool invocation | +### RAG Modulo Backend Endpoints -@router.post("/context-forge/servers", response_model=Dict[str, Any]) -async def register_mcp_server( - server_config: Dict[str, Any], - current_user: User = Depends(get_current_user), - context_forge: ContextForgeClient = Depends(get_context_forge_client) -): - """ - Register an external MCP server or REST/gRPC service with Context Forge - - Example request body: - { - "name": "PowerPoint Generator", - "server_type": "mcp", - "endpoint": "http://ppt-generator:8080", - "config": { - "protocol": "sse", - "auth_token": "..." - } - } - """ - server = await context_forge.register_external_server( - name=server_config["name"], - server_type=server_config["server_type"], - endpoint=server_config["endpoint"], - config=server_config.get("config", {}) - ) - return server - - -@router.get("/context-forge/tools", response_model=List[Dict[str, Any]]) -async def list_context_forge_tools( - gateway_id: Optional[str] = None, - context_forge: ContextForgeClient = Depends(get_context_forge_client) -): - """ - List all tools available in Context Forge - - Optionally filter by virtual gateway ID - """ - tools = await context_forge.list_tools(gateway_id=gateway_id) - return tools - +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/v1/mcp/health` | GET | MCP gateway health status | +| `/api/v1/mcp/tools` | GET | List available MCP tools | +| `/api/v1/mcp/tools/{name}/invoke` | POST | Invoke a specific tool | +| `/api/v1/mcp/metrics` | GET | Client metrics | -@router.post("/collections/{collection_id}/gateway") -async def create_collection_gateway( - collection_id: UUID, - current_user: User = Depends(get_current_user), - agent_service: AgentService = Depends(get_agent_service) -): - """ - Create a virtual gateway in Context Forge for this collection +## Configuration - Bundles all collection agents into a single gateway for efficient execution. - """ - gateway_id = await agent_service.create_collection_virtual_gateway( - collection_id=collection_id, - user_id=current_user.id - ) - return {"gateway_id": gateway_id, "collection_id": collection_id} -``` +### Environment Variables -## Deployment Architecture +```bash +# ================================ +# MCP CONTEXT FORGE INTEGRATION +# ================================ +# RAG Modulo uses PROXY AUTHENTICATION - it acts as a trusted backend service +# that passes authenticated user identity via headers. +# See: https://ibm.github.io/mcp-context-forge/manage/proxy/ -### Docker Compose Setup (Development) +# Enable MCP Gateway (starts Redis + MCP Context Forge containers) +ENABLE_MCP_GATEWAY=true -```yaml -# docker-compose.yml (updated) -version: '3.8' - -services: - # Existing RAG Modulo services - backend: - build: - context: . - dockerfile: backend/Dockerfile.backend - environment: - - CONTEXT_FORGE_URL=http://mcp-gateway:8000 - - CONTEXT_FORGE_TOKEN=${CONTEXT_FORGE_TOKEN} - depends_on: - - postgres - - milvus - - mcp-gateway - - frontend: - build: - context: frontend - dockerfile: Dockerfile.frontend - depends_on: - - backend - - # MCP Context Forge Gateway - mcp-gateway: - image: ghcr.io/ibm/mcp-context-forge:latest - ports: - - "8001:8000" # Gateway API - - "8002:8001" # Admin UI - environment: - - REDIS_URL=redis://redis:6379 - - DATABASE_URL=postgresql://postgres:${DB_PASSWORD}@postgres:5432/mcp_gateway - - JWT_SECRET=${JWT_SECRET} - - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318 - depends_on: - - redis - - postgres - volumes: - - ./config/mcp-gateway:/app/config - - # Redis for Context Forge federation - redis: - image: redis:7-alpine - ports: - - "6379:6379" - - # OpenTelemetry Collector (optional) - jaeger: - image: jaegertracing/all-in-one:latest - ports: - - "16686:16686" # Jaeger UI - - "4318:4318" # OTLP HTTP - - # Example MCP Server: PowerPoint Generator - ppt-generator-mcp: - build: - context: ./agents/ppt-generator - dockerfile: Dockerfile - environment: - - MCP_SERVER_PORT=8080 - ports: - - "8080:8080" -``` +# Gateway URL +MCP_PORT=3001 +MCP_GATEWAY_URL=http://localhost:3001 -### Kubernetes Deployment (Production) +# Proxy authentication settings +MCP_TRUST_PROXY_AUTH=true +MCP_PROXY_USER_HEADER=X-Authenticated-User -```yaml -# deployment/helm/rag-modulo/templates/mcp-gateway-deployment.yaml -apiVersion: apps/v1 -kind: Deployment -metadata: - name: mcp-gateway - namespace: {{ .Values.namespace }} -spec: - replicas: {{ .Values.mcpGateway.replicas }} - selector: - matchLabels: - app: mcp-gateway - template: - metadata: - labels: - app: mcp-gateway - spec: - containers: - - name: gateway - image: {{ .Values.mcpGateway.image.repository }}:{{ .Values.mcpGateway.image.tag }} - ports: - - containerPort: 8000 - name: api - - containerPort: 8001 - name: admin - env: - - name: REDIS_URL - value: "redis://{{ .Release.Name }}-redis:6379" - - name: DATABASE_URL - valueFrom: - secretKeyRef: - name: mcp-gateway-secrets - key: database-url - - name: JWT_SECRET - valueFrom: - secretKeyRef: - name: mcp-gateway-secrets - key: jwt-secret - - name: OTEL_EXPORTER_OTLP_ENDPOINT - value: "http://jaeger-collector:4318" - resources: - requests: - memory: "256Mi" - cpu: "200m" - limits: - memory: "512Mi" - cpu: "500m" - livenessProbe: - httpGet: - path: /health - port: 8000 - initialDelaySeconds: 30 - periodSeconds: 10 - readinessProbe: - httpGet: - path: /ready - port: 8000 - initialDelaySeconds: 10 - periodSeconds: 5 +# Disable client JWT auth (we use proxy auth instead) +MCP_CLIENT_AUTH_ENABLED=false ---- -apiVersion: v1 -kind: Service -metadata: - name: mcp-gateway - namespace: {{ .Values.namespace }} -spec: - selector: - app: mcp-gateway - ports: - - name: api - port: 8000 - targetPort: 8000 - - name: admin - port: 8001 - targetPort: 8001 - type: ClusterIP +# Admin UI auth (optional) +MCP_AUTH_REQUIRED=false ``` -## Configuration - -### Environment Variables +### Settings Model -Add to `.env.example`: - -```bash -# MCP Context Forge Configuration -CONTEXT_FORGE_URL=http://localhost:8001 -CONTEXT_FORGE_TOKEN=your_jwt_token_here -CONTEXT_FORGE_REDIS_URL=redis://localhost:6379 -CONTEXT_FORGE_DB_URL=postgresql://postgres:password@localhost:5432/mcp_gateway - -# OpenTelemetry (optional) -OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 -OTEL_SERVICE_NAME=rag-modulo-agents -``` - -### Application Configuration +In `backend/core/config.py`: ```python -# backend/core/config.py (updated) -from pydantic_settings import BaseSettings - - class Settings(BaseSettings): - # ... existing settings ... + # MCP Gateway Configuration + enable_mcp_gateway: bool = Field(default=False, alias="ENABLE_MCP_GATEWAY") + mcp_port: int = Field(default=3001, alias="MCP_PORT") + mcp_gateway_url: str = Field(default="http://localhost:3001", alias="MCP_GATEWAY_URL") + mcp_proxy_user_header: str = Field(default="X-Authenticated-User", alias="MCP_PROXY_USER_HEADER") + + # Resilience settings + mcp_timeout: float = Field(default=30.0, alias="MCP_TIMEOUT") + mcp_health_timeout: float = Field(default=5.0, alias="MCP_HEALTH_TIMEOUT") + mcp_max_retries: int = Field(default=3, alias="MCP_MAX_RETRIES") + mcp_circuit_breaker_threshold: int = Field(default=5, alias="MCP_CIRCUIT_BREAKER_THRESHOLD") + mcp_circuit_breaker_timeout: float = Field(default=60.0, alias="MCP_CIRCUIT_BREAKER_TIMEOUT") +``` - # MCP Context Forge - context_forge_url: str = "http://localhost:8001" - context_forge_token: str - context_forge_timeout: int = 30 - context_forge_max_retries: int = 3 +## Deployment - class Config: - env_file = ".env" +### Docker Compose Configuration +In `docker-compose-infra.yml`: -settings = Settings() +```yaml +# MCP Context Forge Gateway +mcp-context-forge: + container_name: mcp-context-forge + image: ghcr.io/ibm/mcp-context-forge:latest + profiles: + - mcp + ports: + - "${MCP_PORT:-3001}:${MCP_PORT:-3001}" + environment: + PORT: ${MCP_PORT:-3001} + HOST: 0.0.0.0 + REDIS_URL: redis://redis:6379 + # Proxy authentication settings + TRUST_PROXY_AUTH: ${MCP_TRUST_PROXY_AUTH:-true} + PROXY_USER_HEADER: ${MCP_PROXY_USER_HEADER:-X-Authenticated-User} + MCP_CLIENT_AUTH_ENABLED: ${MCP_CLIENT_AUTH_ENABLED:-false} + AUTH_REQUIRED: ${MCP_AUTH_REQUIRED:-false} + LOG_LEVEL: ${MCP_LOG_LEVEL:-info} + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:$${PORT}/health"] + interval: 30s + timeout: 5s + retries: 3 + depends_on: + redis: + condition: service_healthy + networks: + - app-network + +# Redis for MCP session management +redis: + container_name: redis + image: redis:7-alpine + profiles: + - mcp + ports: + - "6379:6379" + volumes: + - ./volumes/redis:/data + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - app-network ``` -## Integration with Existing Agents - -### Example: PowerPoint Generator via Context Forge - -Instead of building a custom Python agent, we deploy a standalone MCP server and register it with Context Forge: +### Port Allocation -#### 1. MCP Server (Python) +| Service | Port | Description | +|---------|------|-------------| +| Frontend | 3000 | React development server | +| MCP Context Forge | 3001 | MCP gateway | +| Backend | 8000 | FastAPI server | +| Redis | 6379 | MCP session storage | -```python -# agents/ppt-generator/server.py -from mcp import Server, Tool, ToolParameter -from pptx import Presentation -import base64 -from io import BytesIO - - -server = Server("powerpoint-generator") - - -@server.tool( - name="generate_powerpoint", - description="Generate PowerPoint presentation from documents", - parameters=[ - ToolParameter( - name="title", - type="string", - description="Presentation title" - ), - ToolParameter( - name="documents", - type="array", - description="List of documents to include" - ), - ToolParameter( - name="max_slides", - type="integer", - description="Maximum number of slides", - default=10 - ) - ] -) -async def generate_powerpoint(title: str, documents: list, max_slides: int = 10): - """Generate PowerPoint from documents""" - prs = Presentation() - - # Title slide - title_slide = prs.slides.add_slide(prs.slide_layouts[0]) - title_slide.shapes.title.text = title - - # Content slides - for doc in documents[:max_slides]: - slide = prs.slides.add_slide(prs.slide_layouts[1]) - slide.shapes.title.text = doc.get("title", "") - slide.shapes.placeholders[1].text = doc.get("content", "") - - # Encode as base64 - ppt_buffer = BytesIO() - prs.save(ppt_buffer) - ppt_buffer.seek(0) - ppt_base64 = base64.b64encode(ppt_buffer.read()).decode('utf-8') - - return { - "presentation": ppt_base64, - "format": "pptx", - "filename": f"{title}.pptx", - "slides": len(prs.slides) - } +## Testing +### Manual Testing Commands -if __name__ == "__main__": - server.run() -``` +```bash +# 1. Health check +curl http://localhost:3001/health -#### 2. Register with Context Forge +# 2. List tools with proxy auth +curl -H "X-Authenticated-User: test@example.com" \ + http://localhost:3001/tools | jq . -```bash -# Register PowerPoint Generator MCP server -curl -X POST http://localhost:8001/api/v1/servers \ - -H "Authorization: Bearer $CONTEXT_FORGE_TOKEN" \ +# 3. Create a test tool +curl -X POST http://localhost:3001/tools \ + -H "X-Authenticated-User: admin@example.com" \ -H "Content-Type: application/json" \ -d '{ - "name": "PowerPoint Generator", - "type": "mcp", - "endpoint": "http://ppt-generator-mcp:8080", - "config": { - "protocol": "stdio", - "transport": "sse" + "tool": { + "name": "httpbin-echo", + "url": "https://httpbin.org/post", + "description": "Echo test tool", + "request_type": "POST", + "integration_type": "REST", + "input_schema": { + "type": "object", + "properties": { + "message": {"type": "string"} + } + } } - }' -``` + }' | jq . -#### 3. Create Agent Configuration in RAG Modulo - -```python -# Via RAG Modulo API -POST /api/v1/agents/configs -{ - "agent_id": "ppt_generator", - "name": "PowerPoint Generator", - "config": { - "type": "mcp", - "context_forge_tool_id": "generate_powerpoint", - "argument_mapping": { - "title": "query", - "documents": "documents", - "max_slides": "config.max_slides" - }, - "settings": { - "max_slides": 15 +# 4. Invoke a tool (use /mcp endpoint with -L flag) +curl -L -X POST http://localhost:3001/mcp \ + -H "X-Authenticated-User: test@example.com" \ + -H "Content-Type: application/json" \ + -H "Accept: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "httpbin-echo", + "arguments": {"message": "Hello from RAG Modulo!"} } - }, - "trigger_stage": "response", - "priority": 10 -} + }' | jq . ``` -#### 4. Associate with Collection +### Python Integration Test ```python -POST /api/v1/agents/collections/{collection_id}/agents -{ - "collection_id": "abc123...", - "agent_config_id": "xyz789..." -} -``` +import asyncio +from rag_solution.services.mcp_gateway_client import ResilientMCPGatewayClient +from core.config import get_settings + +async def test_mcp_integration(): + client = ResilientMCPGatewayClient(get_settings()) + + # Health check + health = await client.check_health() + print(f"Gateway healthy: {health.healthy}") + print(f"Circuit breaker state: {health.circuit_breaker_state}") + + # List tools + tools = await client.list_tools(user_id="test@example.com") + print(f"Available tools: {tools.total_count}") + + # Invoke a tool + result = await client.invoke_tool( + tool_name="httpbin-echo", + arguments={"message": "Hello!"}, + user_id="test@example.com" + ) + print(f"Invocation status: {result.status.value}") -Now when users search in this collection, the PowerPoint Generator agent will automatically execute during the "response" stage, creating a presentation from the search results. - -## Admin UI Integration - -Context Forge provides an admin UI at `http://localhost:8002` where users can: - -1. **Manage Gateways**: View/create/delete virtual gateways -2. **Monitor Tools**: See all available MCP tools across servers -3. **View Execution Logs**: Real-time monitoring of agent invocations -4. **Team Management**: RBAC for agent access control -5. **Observability**: OpenTelemetry traces for debugging - -### Embed Admin UI in RAG Modulo Frontend - -```typescript -// frontend/src/components/agents/ContextForgeAdmin.tsx -import React from 'react'; - -export const ContextForgeAdmin: React.FC = () => { - const contextForgeUrl = process.env.REACT_APP_CONTEXT_FORGE_ADMIN_URL; - - return ( -
-

Agent Gateway Administration

-