diff --git a/agent_runtimes/langchain_agent/README.md b/agent_runtimes/langchain_agent/README.md index 5d512018e..90f50f6d9 100644 --- a/agent_runtimes/langchain_agent/README.md +++ b/agent_runtimes/langchain_agent/README.md @@ -1,3 +1,4 @@ +# MCP Langchain Agent A configurable Langchain agent that supports MCP and integrates with the MCP Gateway via streamable HTTP + Auth. Tools can be specified as a CSV list. @@ -11,3 +12,154 @@ Endpoints for: /list_tools etc. are provided. + +## Features update (2025-Aug-17) +- **Dynamic Tool Discovery**: Automatically discovers tools from MCP Gateway (`GET /tools`) +- **OpenAI-Compatible API**: Standard `/v1/chat/completions` endpoint with streaming support +- **A2A Communication**: JSON-RPC `/a2a` endpoint for gateway-to-gateway communication +- **Langchain Integration**: Full Langchain agent with function calling support +- **Health Monitoring**: `/health`, `/ready`, `/list_tools` endpoints + +## User Stories Implemented +[x] **Dynamic Tool Discovery** - Auto-discovers from gateway or uses allowlist +[x] **Dual Endpoint Exposure** - OpenAI + A2A JSON-RPC endpoints +[x] **Parameterised Tool Allow-List** - `TOOLS=` environment variable +[x] **Tool Schema Introspection** - JSON schema parsing and validation + +Structure: +``` +agent_runtimes/langchain_agent/ +├── app.py # FastAPI application incl. /v1/chat/completions and /a2a +├── agent_langchain.py # Core Langchain agent +├── mcp_client.py # MCP Gateway client +├── models.py # Pydantic models +├── config.py # Configuration management +├── start_agent.py # Startup script +├── requirements.txt # Dependencies +└── README.md # This file +``` + +### Configuration (env vars) +- OPENAI_API_KEY – required +- MCPGATEWAY_BEARER_TOKEN – JWT for the gateway + + +### Installation +Install dependencies: +```bash +cd agent_runtimes/langchain_agent +pip install -r requirements.txt +``` + + +### Quick Start +1) Start the MCP Gateway (from project root): +```bash +make serve +``` + +2) Start the Langchain Agent (in another terminal): +```bash +# Set environment variables +export OPENAI_API_KEY=your-openai-api-key +export GATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) + +# Optional: Tool allowlist (if unset, all tools discovered) +export TOOLS="tool1,tool2,tool3" # Replace with actual tool names + +python -m agent_runtimes.langchain_agent.start_agent +``` + +3) Test the agent (in new terminal): + +Basic Health Checks +```bash +# Health check +curl http://localhost:8000/health + +# Readiness check +curl http://localhost:8000/ready + +# List available tools +curl http://localhost:8000/list_tools +``` + +Basic Health Checks +```bash +# Basic chat completion +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "What tools do you have available?"} + ] + }' + +# Streaming chat completion +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Hello!"} + ], + "stream": true + }' +``` + +Test A2A communication with a working tool (books-search) +- Step 1: Find a tool to test +```bash +# Get your available tools and pick one +curl http://localhost:8000/list_tools | jq '.tools[0]' +``` + +- Step 2: Test A2A with your tool +```bash +# Replace "YOUR_TOOL_NAME" with an actual tool from step 1 +curl -X POST http://localhost:8000/a2a \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": "1", + "method": "invoke", + "params": { + "tool": "YOUR_TOOL_NAME", + "args": {} + } + }' + +# For tools that need parameters, check the tool schema: +curl -X POST http://localhost:8000/a2a \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": "1", + "method": "invoke", + "params": { + "tool": "YOUR_TOOL_NAME", + "args": { + "param1": "value1", + "param2": "value2" + } + } + }' +``` +Expected Success Response: +```json +{ + "jsonrpc": "2.0", + "id": "1", + "result": { + "success": true, + "result": { + "tool_id": "your-tool-id", + "executed_via": "direct_rest_fallback", + "status_code": 200, + "result": "...actual tool output..." + } + } +} +``` +![alt text](image.png) \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/__init__.py b/agent_runtimes/langchain_agent/__init__.py new file mode 100644 index 000000000..11867ffc3 --- /dev/null +++ b/agent_runtimes/langchain_agent/__init__.py @@ -0,0 +1,12 @@ +""" +MCP Langchain Agent Package + +A configurable Langchain agent that supports MCP and integrates with the MCP Gateway +via streamable HTTP + Auth. Exposes an OpenAI compatible API. +""" +from .app import app +from .agent_langchain import LangchainMCPAgent +from .mcp_client import MCPClient +from .config import get_settings + +__all__ = [] \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/agent_langchain.py b/agent_runtimes/langchain_agent/agent_langchain.py new file mode 100644 index 000000000..7541b6072 --- /dev/null +++ b/agent_runtimes/langchain_agent/agent_langchain.py @@ -0,0 +1,279 @@ +import asyncio +import json +import logging +from typing import List, Dict, Any, Optional, AsyncGenerator + +from langchain.agents import AgentExecutor, create_openai_functions_agent +from langchain.tools import Tool +from langchain_core.messages import HumanMessage, AIMessage, SystemMessage +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_openai import ChatOpenAI +from langchain_core.tools import BaseTool +from pydantic import BaseModel, Field + +from .mcp_client import MCPClient, ToolDef +from .models import AgentConfig + +logger = logging.getLogger(__name__) + +class MCPTool(BaseTool): + """Langchain tool wrapper for MCP tools""" + + name: str = Field(..., description="Tool name") + description: str = Field(..., description="Tool description") + mcp_client: MCPClient = Field(..., description="MCP client instance") + tool_id: str = Field(..., description="MCP tool ID") + + class Config: + arbitrary_types_allowed = True + + def _run(self, **kwargs) -> str: + """Synchronous tool execution""" + try: + result = self.mcp_client.invoke_tool(self.tool_id, kwargs) + return json.dumps(result, indent=2) + except Exception as e: + logger.error(f"Tool {self.tool_id} execution failed: {e}") + return f"Error executing tool: {str(e)}" + + async def _arun(self, **kwargs) -> str: + """Asynchronous tool execution""" + # Run in thread pool since MCP client might not be async + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self._run, **kwargs) + +class LangchainMCPAgent: + """Langchain agent that integrates with MCP Gateway""" + + def __init__(self, config: AgentConfig): + self.config = config + self.mcp_client = MCPClient.from_env(config.mcp_gateway_url) + self.mcp_client.debug = config.debug_mode + + self.llm = ChatOpenAI( + model=config.default_model, + temperature=config.temperature, + streaming=config.streaming_enabled + ) + + self.tools: List[MCPTool] = [] + self.agent_executor: Optional[AgentExecutor] = None + self._initialized = False + + @classmethod + def from_config(cls, config: AgentConfig) -> "LangchainMCPAgent": + """Create agent from configuration""" + return cls(config) + + async def initialize(self): + """Initialize the agent and load tools""" + try: + # Check if tools are restricted via environment variable (ticket requirement) + if self.config.tools_allowlist: + logger.info(f"Using tool allowlist from TOOLS env var: {self.config.tools_allowlist}") + logger.info("Skipping gateway autodiscovery as per ticket requirement") + await self._load_allowlisted_tools() + else: + # Auto-discover from gateway + logger.info("Auto-discovering tools from MCP Gateway") + await self._load_mcp_tools() + + # Create the agent + await self._create_agent() + + self._initialized = True + logger.info(f"Agent initialized with {len(self.tools)} tools") + + except Exception as e: + logger.error(f"Failed to initialize agent: {e}") + raise + + async def _load_allowlisted_tools(self): + """Load only tools specified in the allowlist (no autodiscovery)""" + try: + # Clean the allowlist + allowlist = [tool.strip() for tool in self.config.tools_allowlist if tool.strip()] + logger.info(f"Loading allowlisted tools: {allowlist}") + + self.tools = [] + for tool_id in allowlist: + # Create a basic tool definition for allowlisted tools + # In a production setup, you might want to fetch schema from gateway + mcp_tool = MCPTool( + name=tool_id.replace(".", "-").replace("_", "-"), + description=f"Allowlisted tool: {tool_id}", + mcp_client=self.mcp_client, + tool_id=tool_id + ) + self.tools.append(mcp_tool) + logger.info(f"Added allowlisted tool: {tool_id}") + + except Exception as e: + logger.error(f"Failed to load allowlisted tools: {e}") + raise + + async def _load_mcp_tools(self): + """Load tools from MCP Gateway""" + try: + # Add debug info about the connection + logger.info(f"Connecting to MCP Gateway at: {self.mcp_client.base_url}") + logger.info(f"Using token: {'Yes' if self.mcp_client.token else 'No'}") + + tool_defs = self.mcp_client.list_tools() + logger.info(f"Found {len(tool_defs)} tools from MCP Gateway") + + if len(tool_defs) == 0: + logger.warning("No tools found from MCP Gateway. Check if:") + logger.warning(" 1. Gateway is running on the expected URL") + logger.warning(" 2. Authentication token is valid") + logger.warning(" 3. Gateway has tools configured") + + self.tools = [] + for tool_def in tool_defs: + mcp_tool = MCPTool( + name=tool_def.name or tool_def.id, + description=tool_def.description or f"MCP tool: {tool_def.id}", + mcp_client=self.mcp_client, + tool_id=tool_def.id + ) + self.tools.append(mcp_tool) + logger.info(f"Loaded tool: {tool_def.id} ({tool_def.name})") + + except Exception as e: + logger.error(f"Failed to load MCP tools: {e}") + raise + + async def _create_agent(self): + """Create the Langchain agent executor""" + try: + # Define the system prompt + system_prompt = """You are a helpful AI assistant with access to various tools through the MCP (Model Context Protocol) Gateway. + +Use the available tools to help answer questions and complete tasks. When using tools: +1. Read tool descriptions carefully to understand their purpose +2. Provide the correct arguments as specified in the tool schema +3. Interpret tool results and provide helpful responses to the user +4. If a tool fails, try alternative approaches or explain the limitation + +Available tools: {tool_names} + +Always strive to be helpful, accurate, and honest in your responses.""" + + # Create prompt template + prompt = ChatPromptTemplate.from_messages([ + ("system", system_prompt), + MessagesPlaceholder(variable_name="chat_history"), + ("human", "{input}"), + MessagesPlaceholder(variable_name="agent_scratchpad"), + ]) + + # Create the agent + agent = create_openai_functions_agent( + llm=self.llm, + tools=self.tools, + prompt=prompt + ) + + # Create agent executor + self.agent_executor = AgentExecutor( + agent=agent, + tools=self.tools, + max_iterations=self.config.max_iterations, + verbose=self.config.debug_mode, + return_intermediate_steps=True + ) + + logger.info("Langchain agent created successfully") + + except Exception as e: + logger.error(f"Failed to create agent: {e}") + raise + + def is_initialized(self) -> bool: + """Check if agent is initialized""" + return self._initialized + + async def check_readiness(self) -> bool: + """Check if agent is ready to handle requests""" + try: + return ( + self._initialized and + self.agent_executor is not None and + len(self.tools) >= 0 and # Allow 0 tools for testing + await self.test_gateway_connection() + ) + except Exception: + return False + + async def test_gateway_connection(self) -> bool: + """Test connection to MCP Gateway""" + try: + # Try to list tools as a connectivity test + tools = self.mcp_client.list_tools() + return True + except Exception as e: + logger.error(f"Gateway connection test failed: {e}") + return False + + def get_available_tools(self) -> List[ToolDef]: + """Get list of available tools""" + try: + return self.mcp_client.list_tools() + except Exception: + return [] + + async def run_async( + self, + messages: List[Dict[str, str]], + model: Optional[str] = None, + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, + tools_enabled: bool = True + ) -> str: + """Run the agent asynchronously""" + if not self._initialized: + raise RuntimeError("Agent not initialized. Call initialize() first.") + + try: + # Convert messages to input format + if messages: + latest_message = messages[-1] + input_text = latest_message.get("content", "") + else: + input_text = "" + + # Prepare chat history (all messages except the last one) + chat_history = [] + for msg in messages[:-1]: + if msg["role"] == "user": + chat_history.append(HumanMessage(content=msg["content"])) + elif msg["role"] == "assistant": + chat_history.append(AIMessage(content=msg["content"])) + elif msg["role"] == "system": + chat_history.append(SystemMessage(content=msg["content"])) + + # Run the agent + result = await self.agent_executor.ainvoke({ + "input": input_text, + "chat_history": chat_history, + "tool_names": [tool.name for tool in self.tools] + }) + + return result["output"] + + except Exception as e: + logger.error(f"Agent execution failed: {e}") + return f"I encountered an error while processing your request: {str(e)}" + + async def stream_async( + self, + messages: List[Dict[str, str]], + model: Optional[str] = None, + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, + tools_enabled: bool = True + ) -> AsyncGenerator[str, None]: + """Stream agent response asynchronously""" + if not self._initialized: + raise RuntimeError("Agent not initialized. Call initialize() first.") + import asyncio diff --git a/agent_runtimes/langchain_agent/app.py b/agent_runtimes/langchain_agent/app.py new file mode 100644 index 000000000..5309b6709 --- /dev/null +++ b/agent_runtimes/langchain_agent/app.py @@ -0,0 +1,296 @@ +from fastapi import FastAPI, HTTPException, BackgroundTasks +from fastapi.responses import StreamingResponse +from fastapi.middleware.cors import CORSMiddleware +import json +import time +import uuid +from typing import List, Dict, Any, Optional, AsyncGenerator +from datetime import datetime +import asyncio +import logging + +from .models import ( + ChatCompletionRequest, + ChatCompletionResponse, + ChatCompletionChoice, + ChatMessage, + Usage, + HealthResponse, + ReadyResponse, + ToolListResponse +) +from .agent_langchain import LangchainMCPAgent +from .config import get_settings + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI( + title="MCP Langchain Agent", + description="A Langchain agent with OpenAI-compatible API that integrates with MCP Gateway", + version="1.0.0" +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Initialize settings and agent +settings = get_settings() +agent = LangchainMCPAgent.from_config(settings) + +@app.on_event("startup") +async def startup_event(): + """Initialize the agent and load tools on startup""" + try: + await agent.initialize() + logger.info("Agent initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize agent: {e}") + raise + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + """Health check endpoint""" + try: + # Basic health check - ensure agent is responsive + tools_count = len(agent.get_available_tools()) + return HealthResponse( + status="healthy", + timestamp=datetime.utcnow().isoformat(), + details={ + "agent_initialized": agent.is_initialized(), + "tools_loaded": tools_count, + "gateway_url": settings.mcp_gateway_url + } + ) + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail=f"Service unhealthy: {str(e)}") + +@app.get("/ready", response_model=ReadyResponse) +async def readiness_check(): + """Readiness check endpoint""" + try: + # More thorough readiness check + is_ready = await agent.check_readiness() + if not is_ready: + raise HTTPException(status_code=503, detail="Service not ready") + + return ReadyResponse( + ready=True, + timestamp=datetime.utcnow().isoformat(), + details={ + "gateway_connection": await agent.test_gateway_connection(), + "tools_available": (len(agent.tools) > 0) or (len(agent.get_available_tools()) > 0), + } + ) + except Exception as e: + logger.error(f"Readiness check failed: {e}") + raise HTTPException(status_code=503, detail=f"Service not ready: {str(e)}") + +@app.get("/list_tools", response_model=ToolListResponse) +async def list_tools(): + """List all available tools""" + try: + tools = agent.get_available_tools() + return ToolListResponse( + tools=[ + { + "id": tool.id, + "name": tool.name or tool.id, + "description": tool.description or "", + "schema": tool.schema or {}, + "url": tool.url, + "method": tool.method, + "integration_type": tool.integration_type + } + for tool in tools + ], + count=len(tools) + ) + except Exception as e: + logger.error(f"Failed to list tools: {e}") + raise HTTPException(status_code=500, detail=f"Failed to list tools: {str(e)}") + +@app.post("/v1/chat/completions", response_model=ChatCompletionResponse) +async def chat_completions(request: ChatCompletionRequest): + """OpenAI-compatible chat completions endpoint""" + try: + if request.stream: + return StreamingResponse( + _stream_chat_completion(request), + media_type="text/plain" + ) + else: + return await _complete_chat(request) + except Exception as e: + logger.error(f"Chat completion failed: {e}") + raise HTTPException(status_code=500, detail=f"Chat completion failed: {str(e)}") + +async def _complete_chat(request: ChatCompletionRequest) -> ChatCompletionResponse: + """Handle non-streaming chat completion""" + start_time = time.time() + + # Convert messages to langchain format + messages = [msg.dict() for msg in request.messages] + + # Run the agent + response = await agent.run_async( + messages=messages, + model=request.model, + max_tokens=request.max_tokens, + temperature=request.temperature, + tools_enabled=True + ) + + # Calculate token usage (approximate) + prompt_tokens = sum(len(msg.content.split()) for msg in request.messages if msg.content) + completion_tokens = len(response.split()) if isinstance(response, str) else 0 + total_tokens = prompt_tokens + completion_tokens + + # Create response + return ChatCompletionResponse( + id=f"chatcmpl-{uuid.uuid4().hex[:12]}", + object="chat.completion", + created=int(start_time), + model=request.model, + choices=[ + ChatCompletionChoice( + index=0, + message=ChatMessage( + role="assistant", + content=response + ), + finish_reason="stop" + ) + ], + usage=Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens + ) + ) + +async def _stream_chat_completion(request: ChatCompletionRequest) -> AsyncGenerator[str, None]: + """Handle streaming chat completion""" + start_time = time.time() + completion_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" + + # Convert messages to langchain format + messages = [msg.dict() for msg in request.messages] + + # Stream the agent response + async for chunk in agent.stream_async( + messages=messages, + model=request.model, + max_tokens=request.max_tokens, + temperature=request.temperature, + tools_enabled=True + ): + # Format as OpenAI streaming response + stream_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(start_time), + "model": request.model, + "choices": [ + { + "index": 0, + "delta": {"content": chunk}, + "finish_reason": None + } + ] + } + + yield f"data: {json.dumps(stream_chunk)}\n\n" + + # Send final chunk + final_chunk = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": int(start_time), + "model": request.model, + "choices": [ + { + "index": 0, + "delta": {}, + "finish_reason": "stop" + } + ] + } + + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + +@app.get("/v1/models") +async def list_models(): + """OpenAI-compatible models endpoint""" + return { + "object": "list", + "data": [ + { + "id": settings.default_model, + "object": "model", + "created": int(time.time()), + "owned_by": "mcp-langchain-agent" + } + ] + } + +@app.post("/v1/tools/invoke") +async def invoke_tool(request: Dict[str, Any]): + """Direct tool invocation endpoint""" + try: + tool_id = request.get("tool_id") + args = request.get("args", {}) + + if not tool_id: + raise HTTPException(status_code=400, detail="tool_id is required") + + result = await agent.invoke_tool(tool_id, args) + return {"result": result} + except Exception as e: + logger.error(f"Tool invocation failed: {e}") + raise HTTPException(status_code=500, detail=f"Tool invocation failed: {str(e)}") + +# A2A endpoint for agent-to-agent communication +@app.post("/a2a") +async def agent_to_agent(request: Dict[str, Any]): + """Agent-to-agent communication endpoint (JSON-RPC style)""" + try: + if request.get("method") == "invoke": + params = request.get("params", {}) + tool_id = params.get("tool") + args = params.get("args", {}) + + result = await agent.invoke_tool(tool_id, args) + + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "result": result + } + else: + raise HTTPException(status_code=400, detail="Unsupported method") + except Exception as e: + logger.error(f"A2A communication failed: {e}") + return { + "jsonrpc": "2.0", + "id": request.get("id"), + "error": { + "code": -32603, + "message": str(e) + } + } + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/config.py b/agent_runtimes/langchain_agent/config.py new file mode 100644 index 000000000..8adfebe51 --- /dev/null +++ b/agent_runtimes/langchain_agent/config.py @@ -0,0 +1,87 @@ +import os +from functools import lru_cache +from typing import Optional, List + +from .models import AgentConfig + +def _parse_tools_list(tools_str: str) -> Optional[List[str]]: + """Parse comma-separated tools string into list""" + if not tools_str or not tools_str.strip(): + return None + return [tool.strip() for tool in tools_str.split(",") if tool.strip()] + +@lru_cache() +def get_settings() -> AgentConfig: + """Get application settings from environment variables""" + return AgentConfig( + mcp_gateway_url=os.getenv( + "MCP_GATEWAY_URL", + "http://localhost:4444" + ), + gateway_bearer_token=os.getenv("GATEWAY_BEARER_TOKEN"), + tools_allowlist=_parse_tools_list(os.getenv("TOOLS", "")), + default_model=os.getenv("DEFAULT_MODEL", "gpt-4o-mini"), + max_iterations=int(os.getenv("MAX_ITERATIONS", "10")), + temperature=float(os.getenv("TEMPERATURE", "0.7")), + streaming_enabled=os.getenv("STREAMING_ENABLED", "true").lower() == "true", + debug_mode=os.getenv("DEBUG_MODE", "false").lower() == "true" + ) + +def validate_environment() -> dict: + """Validate environment configuration and return status""" + issues = [] + warnings = [] + + # Check required environment variables + if not os.getenv("GATEWAY_BEARER_TOKEN"): + warnings.append("GATEWAY_BEARER_TOKEN not set - authentication may fail") + + # Check optional but recommended settings + if not os.getenv("OPENAI_API_KEY"): + issues.append("OPENAI_API_KEY not set - Langchain LLM will fail") + + # Validate numeric settings + try: + max_iter = int(os.getenv("MAX_ITERATIONS", "10")) + if max_iter < 1: + warnings.append("MAX_ITERATIONS should be >= 1") + except ValueError: + warnings.append("MAX_ITERATIONS is not a valid integer") + + try: + temp = float(os.getenv("TEMPERATURE", "0.7")) + if not 0.0 <= temp <= 2.0: + warnings.append("TEMPERATURE should be between 0.0 and 2.0") + except ValueError: + warnings.append("TEMPERATURE is not a valid float") + + return { + "valid": len(issues) == 0, + "issues": issues, + "warnings": warnings + } + +def get_example_env() -> str: + """Get example environment configuration""" + return """# MCP Langchain Agent Configuration + +# Gateway Configuration +MCP_GATEWAY_URL=http://localhost:4444 +GATEWAY_BEARER_TOKEN=your-jwt-token-here + +# OpenAI Configuration (required for Langchain) +OPENAI_API_KEY=your-openai-api-key + +# Tool Configuration (optional - for production filtering) +TOOLS=list-users,books-search + +# Agent Configuration +DEFAULT_MODEL=gpt-4o-mini +MAX_ITERATIONS=10 +TEMPERATURE=0.7 +STREAMING_ENABLED=true +DEBUG_MODE=false + +# Generate GATEWAY_BEARER_TOKEN with: +# export GATEWAY_BEARER_TOKEN=$(python3 -m mcpgateway.utils.create_jwt_token -u admin --secret my-test-key) +""" \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/image-1.png b/agent_runtimes/langchain_agent/image-1.png new file mode 100644 index 000000000..184e0dcae Binary files /dev/null and b/agent_runtimes/langchain_agent/image-1.png differ diff --git a/agent_runtimes/langchain_agent/image.png b/agent_runtimes/langchain_agent/image.png new file mode 100644 index 000000000..fbc774396 Binary files /dev/null and b/agent_runtimes/langchain_agent/image.png differ diff --git a/agent_runtimes/langchain_agent/mcp_client.py b/agent_runtimes/langchain_agent/mcp_client.py new file mode 100644 index 000000000..378e8f69d --- /dev/null +++ b/agent_runtimes/langchain_agent/mcp_client.py @@ -0,0 +1,252 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +import os + +import httpx + + +@dataclass +class ToolDef: + id: str + name: Optional[str] = None + description: Optional[str] = None + schema: Optional[Dict[str, Any]] = None + # extra fields from /tools to enable direct REST execution + url: Optional[str] = None + method: Optional[str] = None # maps requestType + headers: Optional[Dict[str, Any]] = None + integration_type: Optional[str] = None # e.g. "REST" + jsonpath_filter: Optional[str] = None # not applied in MVP + + +class MCPClient: + def __init__(self, base_url: str, token: str | None = None): + self.base_url = base_url + self.token = token + self._client = httpx.Client() + + @classmethod + def from_env(cls, base_url: str | None = None) -> "MCPClient": + url = base_url or os.getenv("MCP_GATEWAY_URL", "http://localhost:4444") + token = os.getenv("GATEWAY_BEARER_TOKEN") + return cls(url, token) + + def _headers(self) -> Dict[str, str]: + h = {"Content-Type": "application/json"} + if self.token: + h["Authorization"] = f"Bearer {self.token}" + return h + + def list_tools(self) -> List[ToolDef]: + """ + Lists all available MCP tools from this server. + + Returns: + List of ToolDef objects, each representing a callable tool. + Returns empty list if server unreachable or no tools. + """ + try: + for path in ("/tools", "/admin/tools"): + url = f"{self.base_url}{path}" + resp = self._client.get(url, headers=self._headers()) + if getattr(self, "debug", False): + print(f"[MCPClient] GET {url} -> {resp.status_code}") + if resp.status_code // 100 != 2: + continue + data = resp.json() + raw_tools = data if isinstance(data, list) else data.get("tools", []) + out: List[ToolDef] = [] + for t in raw_tools: + out.append( + ToolDef( + id = t.get("id") or t.get("tool_id") or t.get("name"), + name = t.get("name") or t.get("originalName") or t.get("originalNameSlug"), + description = t.get("description"), + # schemas in either snake_case or camelCase + schema = t.get("input_schema") or t.get("inputSchema") or t.get("schema"), + # fields for direct REST execution + url = t.get("url"), + method = (t.get("requestType") or t.get("method") or "GET"), + headers = (t.get("headers") or {}) if isinstance(t.get("headers"), dict) else {}, + integration_type = t.get("integrationType"), + jsonpath_filter = t.get("jsonpathFilter"), + ) + ) + return out + return [] + except Exception: + return [] + + def invoke_tool(self, tool_id: str, args: Dict[str, Any]) -> Dict[str, Any]: + """ + Try multiple execution surfaces: + 1) JSON-RPC /rpc with method=, params= + 2) RESTful invoke/execute variants under /tools and /admin/tools + 3) Batch invoke endpoints + 4) (fallback) Direct REST call to the tool's URL using metadata from /tools + + Includes schema validation based on tool introspection. + """ + # Best-effort: fetch catalog to find a human name for /rpc and resolve name to ID + name_for_rpc = None + actual_tool_id = tool_id + tool_meta: Optional[ToolDef] = None + try: + tools = self.list_tools() + for t in tools: + # If user provided a name, find the corresponding ID + if t.name == tool_id: + actual_tool_id = t.id + name_for_rpc = t.name + tool_meta = t + break + # If user provided an ID, find the corresponding name + elif t.id == tool_id: + name_for_rpc = t.name or t.id + tool_meta = t + break + except Exception: + pass + + # Validate arguments against tool schema if available + if tool_meta and tool_meta.schema: + validation_result = self._validate_args_against_schema(args, tool_meta.schema, tool_id) + if not validation_result["valid"]: + return { + "tool_id": actual_tool_id, + "error": f"Schema validation failed: {validation_result['error']}", + "schema": tool_meta.schema, + "provided_args": args + } + + candidates = [] + # JSON-RPC first (by name, then id) + if name_for_rpc: + candidates.append(("POST", "/rpc", {"jsonrpc":"2.0","id":"1","method":name_for_rpc,"params":args})) + candidates.append(("POST", "/rpc", {"jsonrpc":"2.0","id":"1","method":actual_tool_id,"params":args})) + + # Tool-specific invoke/execute variants (use actual ID) + for base in ("/tools", "/admin/tools"): + candidates.extend([ + ("POST", f"{base}/{actual_tool_id}/invoke", {"args": args}), + ("POST", f"{base}/{actual_tool_id}/execute", {"args": args}), + ]) + + # Batch invoke with payload carrying the id + for base in ("/tools", "/admin/tools"): + candidates.extend([ + ("POST", f"{base}/invoke", {"id": actual_tool_id, "args": args}), + ("POST", f"{base}/execute", {"id": actual_tool_id, "args": args}), + ]) + + last_err = None + for method, path, body in candidates: + try: + url = f"{self.base_url}{path}" + if getattr(self, "debug", False): + print(f"[MCPClient] {method} {url} body={body}") + r = self._client.request(method, url, headers=self._headers(), json=body) + if getattr(self, "debug", False): + print(f"[MCPClient] -> {r.status_code}, {r.text[:160]}") + + if r.status_code // 100 == 2: + response_data = r.json() + # Check if it's a JSON-RPC error response + if "error" in response_data and "jsonrpc" in response_data: + last_err = f"JSON-RPC error: {response_data['error'].get('message', 'Unknown error')}" + continue # Try next method instead of returning error + return response_data + + if r.status_code in (401, 403): + return {"error": f"Auth failed at {path} (HTTP {r.status_code})."} + last_err = f"HTTP {r.status_code}" + except Exception as e: + last_err = str(e) + + # --- FINAL FALLBACK: direct REST execution using tool metadata --- + if tool_meta and tool_meta.integration_type == "REST" and tool_meta.url: + try: + # Handle different method types + method_type = (tool_meta.method or "GET").upper() + + # SSE is typically GET with streaming, treat as GET for direct calls + if method_type == "SSE": + method_type = "GET" + + headers = tool_meta.headers or {} + # Don't overwrite explicit Content-Type if provided in tool + if "Content-Type" not in {k.title(): v for k, v in headers.items()}: + headers.setdefault("Content-Type", "application/json") + + # Build request + if method_type in ("GET", "HEAD", "DELETE"): + # For GET requests, add args as query parameters + resp = self._client.request(method_type, tool_meta.url, params=args, headers=headers) + else: + # For POST/PUT, send args as JSON body + payload = args.get("body", args) if isinstance(args, dict) else args + resp = self._client.request(method_type, tool_meta.url, json=payload, headers=headers) + + # Parse result + try: + data = resp.json() + except Exception: + data = resp.text + + return { + "tool_id": actual_tool_id, + "executed_via": "direct_rest_fallback", + "request": {"url": tool_meta.url, "method": method_type}, + "status_code": resp.status_code, + "result": data, + "schema_validated": tool_meta.schema is not None + } + except Exception as e: + last_err = f"direct_rest_error: {e}" + + return {"tool_id": actual_tool_id, "args": args, "note": "No invoke path worked", "last_error": last_err} + + def _validate_args_against_schema(self, args: Dict[str, Any], schema: Dict[str, Any], tool_id: str) -> Dict[str, Any]: + """Validate arguments against tool schema""" + try: + # Basic schema validation + if not isinstance(schema, dict): + return {"valid": True, "note": "Schema not a dict, skipping validation"} + + schema_type = schema.get("type") + if schema_type != "object": + return {"valid": True, "note": f"Schema type '{schema_type}' not object, skipping validation"} + + properties = schema.get("properties", {}) + required = schema.get("required", []) + + # Check required fields + missing_required = [] + for req_field in required: + if req_field not in args: + missing_required.append(req_field) + + if missing_required: + return { + "valid": False, + "error": f"Missing required fields: {missing_required}", + "required": required, + "provided": list(args.keys()) + } + + # Check for unexpected fields (warning only) + unexpected_fields = [] + for arg_key in args.keys(): + if arg_key not in properties: + unexpected_fields.append(arg_key) + + result = {"valid": True} + if unexpected_fields: + result["warnings"] = f"Unexpected fields (not in schema): {unexpected_fields}" + + return result + + except Exception as e: + return {"valid": True, "note": f"Schema validation error: {e}"} \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/models.py b/agent_runtimes/langchain_agent/models.py new file mode 100644 index 000000000..5ede51ba0 --- /dev/null +++ b/agent_runtimes/langchain_agent/models.py @@ -0,0 +1,119 @@ +from pydantic import BaseModel, Field +from typing import List, Dict, Any, Optional, Union +from datetime import datetime + +# OpenAI Chat API Models +class ChatMessage(BaseModel): + role: str = Field(..., description="Role of the message sender") + content: str = Field(..., description="Content of the message") + name: Optional[str] = Field(None, description="Name of the sender") + +class ChatCompletionRequest(BaseModel): + model: str = Field(..., description="Model to use for completion") + messages: List[ChatMessage] = Field(..., description="List of messages") + max_tokens: Optional[int] = Field(None, description="Maximum tokens to generate") + temperature: Optional[float] = Field(0.7, description="Sampling temperature") + top_p: Optional[float] = Field(1.0, description="Nucleus sampling parameter") + n: Optional[int] = Field(1, description="Number of completions to generate") + stream: Optional[bool] = Field(False, description="Whether to stream responses") + stop: Optional[Union[str, List[str]]] = Field(None, description="Stop sequences") + presence_penalty: Optional[float] = Field(0.0, description="Presence penalty") + frequency_penalty: Optional[float] = Field(0.0, description="Frequency penalty") + logit_bias: Optional[Dict[str, float]] = Field(None, description="Logit bias") + user: Optional[str] = Field(None, description="User identifier") + +class Usage(BaseModel): + prompt_tokens: int = Field(..., description="Tokens in the prompt") + completion_tokens: int = Field(..., description="Tokens in the completion") + total_tokens: int = Field(..., description="Total tokens used") + +class ChatCompletionChoice(BaseModel): + index: int = Field(..., description="Choice index") + message: ChatMessage = Field(..., description="Generated message") + finish_reason: str = Field(..., description="Reason for finishing") + +class ChatCompletionResponse(BaseModel): + id: str = Field(..., description="Unique identifier for the completion") + object: str = Field("chat.completion", description="Object type") + created: int = Field(..., description="Unix timestamp of creation") + model: str = Field(..., description="Model used for completion") + choices: List[ChatCompletionChoice] = Field(..., description="List of completion choices") + usage: Usage = Field(..., description="Token usage information") + +# Health and Status Models +class HealthResponse(BaseModel): + status: str = Field(..., description="Health status") + timestamp: str = Field(..., description="Timestamp of health check") + details: Optional[Dict[str, Any]] = Field(None, description="Additional health details") + +class ReadyResponse(BaseModel): + ready: bool = Field(..., description="Readiness status") + timestamp: str = Field(..., description="Timestamp of readiness check") + details: Optional[Dict[str, Any]] = Field(None, description="Additional readiness details") + +# Tool Models +class ToolDefinition(BaseModel): + id: str = Field(..., description="Tool identifier") + name: str = Field(..., description="Tool name") + description: str = Field(..., description="Tool description") + input_schema: Dict[str, Any] = Field(..., description="Tool input schema", alias="schema") + url: Optional[str] = Field(None, description="Tool URL (for REST tools)") + method: Optional[str] = Field(None, description="HTTP method") + integration_type: Optional[str] = Field(None, description="Integration type") + + class Config: + populate_by_name = True # Allow both 'schema' and 'input_schema' + +class ToolListResponse(BaseModel): + tools: List[ToolDefinition] = Field(..., description="List of available tools") + count: int = Field(..., description="Number of tools") + +# Agent Configuration Models +class AgentConfig(BaseModel): + mcp_gateway_url: str = Field(..., description="MCP Gateway URL") + gateway_bearer_token: Optional[str] = Field(None, description="Gateway authentication token") + tools_allowlist: Optional[List[str]] = Field(None, description="List of allowed tool IDs") + default_model: str = Field("gpt-4o-mini", description="Default model to use") + max_iterations: int = Field(10, description="Maximum agent iterations") + temperature: float = Field(0.7, description="Default temperature") + streaming_enabled: bool = Field(True, description="Enable streaming responses") + debug_mode: bool = Field(False, description="Enable debug logging") + +# Tool Invocation Models +class ToolInvocationRequest(BaseModel): + tool_id: str = Field(..., description="Tool to invoke") + args: Dict[str, Any] = Field(default_factory=dict, description="Tool arguments") + +class ToolInvocationResponse(BaseModel): + tool_id: str = Field(..., description="Tool that was invoked") + result: Any = Field(..., description="Tool execution result") + execution_time: Optional[float] = Field(None, description="Execution time in seconds") + success: bool = Field(..., description="Whether execution was successful") + error: Optional[str] = Field(None, description="Error message if any") + +# Streaming Models +class StreamChunk(BaseModel): + id: str = Field(..., description="Stream identifier") + object: str = Field("chat.completion.chunk", description="Object type") + created: int = Field(..., description="Unix timestamp") + model: str = Field(..., description="Model used") + choices: List[Dict[str, Any]] = Field(..., description="Stream choices") + +# Error Models +class ErrorResponse(BaseModel): + error: str = Field(..., description="Error message") + code: Optional[str] = Field(None, description="Error code") + details: Optional[Dict[str, Any]] = Field(None, description="Additional error details") + +# JSON-RPC Models for A2A communication +class JSONRPCRequest(BaseModel): + jsonrpc: str = Field("2.0", description="JSON-RPC version") + method: str = Field(..., description="Method to call") + params: Optional[Dict[str, Any]] = Field(None, description="Method parameters") + id: Optional[Union[str, int]] = Field(None, description="Request identifier") + +class JSONRPCResponse(BaseModel): + jsonrpc: str = Field("2.0", description="JSON-RPC version") + result: Optional[Any] = Field(None, description="Method result") + error: Optional[Dict[str, Any]] = Field(None, description="Error object") + id: Optional[Union[str, int]] = Field(None, description="Request identifier") \ No newline at end of file diff --git a/agent_runtimes/langchain_agent/requirements.txt b/agent_runtimes/langchain_agent/requirements.txt new file mode 100644 index 000000000..08cfc3427 --- /dev/null +++ b/agent_runtimes/langchain_agent/requirements.txt @@ -0,0 +1,17 @@ +# Core FastAPI and Web Framework +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 +pydantic>=2.5.0 + +# Langchain and AI (install these first) +langchain>=0.1.0 +langchain-openai>=0.0.2 +langchain-core>=0.1.0 +openai>=1.0.0 + +# HTTP Client +httpx>=0.25.0 + +# Utilities +python-multipart>=0.0.6 +python-dotenv>=1.0.0 diff --git a/agent_runtimes/langchain_agent/start_agent.py b/agent_runtimes/langchain_agent/start_agent.py new file mode 100644 index 000000000..5c9676fa6 --- /dev/null +++ b/agent_runtimes/langchain_agent/start_agent.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +Startup script for the MCP Langchain Agent +""" + +import asyncio +import logging +import sys +from pathlib import Path + +import uvicorn +from dotenv import load_dotenv + +from .config import get_settings, validate_environment, get_example_env + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def setup_environment(): + """Setup environment and validate configuration""" + # Load .env file if it exists + env_file = Path(".env") + if env_file.exists(): + load_dotenv(env_file) + logger.info(f"Loaded environment from {env_file}") + else: + logger.info("No .env file found, using system environment") + + # Validate environment + validation = validate_environment() + + if validation["warnings"]: + logger.warning("Configuration warnings:") + for warning in validation["warnings"]: + logger.warning(f" - {warning}") + + if not validation["valid"]: + logger.error("Configuration errors:") + for issue in validation["issues"]: + logger.error(f" - {issue}") + + logger.info("Example .env file:") + print(get_example_env()) + sys.exit(1) + + return get_settings() + +async def test_agent_initialization(): + """Test that the agent can be initialized""" + try: + from .agent_langchain import LangchainMCPAgent + + settings = get_settings() + agent = LangchainMCPAgent.from_config(settings) + + logger.info("Testing agent initialization...") + await agent.initialize() + + tools = agent.get_available_tools() + logger.info(f"Agent initialized successfully with {len(tools)} tools") + + # Test gateway connection + if await agent.test_gateway_connection(): + logger.info("Gateway connection test: SUCCESS") + else: + logger.warning("Gateway connection test: FAILED") + + return True + + except Exception as e: + logger.error(f"Agent initialization failed: {e}") + return False + +def main(): + """Main startup function""" + logger.info("Starting MCP Langchain Agent") + + # Setup environment + try: + settings = setup_environment() + logger.info(f"Configuration loaded: Gateway URL = {settings.mcp_gateway_url}") + if settings.tools_allowlist: + logger.info(f"Tool allowlist: {settings.tools_allowlist}") + except Exception as e: + logger.error(f"Environment setup failed: {e}") + sys.exit(1) + + # Test agent initialization + if not asyncio.run(test_agent_initialization()): + logger.error("Agent initialization test failed") + response = input("Continue anyway? (y/N): ") + if response.lower() != 'y': + sys.exit(1) + + # Start the FastAPI server + logger.info("Starting FastAPI server...") + + try: + uvicorn.run( + "agent_runtimes.langchain_agent.app:app", + host="0.0.0.0", + port=8000, + reload=settings.debug_mode, + log_level="info" if not settings.debug_mode else "debug", + access_log=True + ) + except KeyboardInterrupt: + logger.info("Server stopped by user") + except Exception as e: + logger.error(f"Server failed to start: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/cookies.txt b/cookies.txt new file mode 100644 index 000000000..c31d9899c --- /dev/null +++ b/cookies.txt @@ -0,0 +1,4 @@ +# Netscape HTTP Cookie File +# https://curl.se/docs/http-cookies.html +# This file was generated by libcurl! Edit at your own risk. + diff --git a/headers.txt b/headers.txt new file mode 100644 index 000000000..40278db94 --- /dev/null +++ b/headers.txt @@ -0,0 +1,6 @@ +HTTP/1.1 307 Temporary Redirect +date: Wed, 13 Aug 2025 05:39:40 GMT +server: uvicorn +content-length: 0 +location: http://localhost:4444/admin/ + diff --git a/mcpgateway/templates/admin.html b/mcpgateway/templates/admin.html index be530d5da..a9b5a05b6 100644 --- a/mcpgateway/templates/admin.html +++ b/mcpgateway/templates/admin.html @@ -5314,4 +5314,4 @@

Available Log Files

}); - + \ No newline at end of file diff --git a/tools.json b/tools.json new file mode 100644 index 000000000..bb2218a96 --- /dev/null +++ b/tools.json @@ -0,0 +1,19 @@ +[ + { + "name": "list_users", + "url": "https://api.example.com/users", + "integration_type": "REST", + "request_type": "GET" + }, + { + "name": "create_user", + "url": "https://api.example.com/users", + "integration_type": "REST", + "request_type": "POST", + "input_schema": { + "type": "object", + "properties": { "body": { "type": "object" } }, + "required": ["body"] + } + } +]