diff --git a/README.md b/README.md index c4c65cf..d14b03b 100644 --- a/README.md +++ b/README.md @@ -488,7 +488,7 @@ server_thread.start() time.sleep(2) # Allow server to start # Convert MCP tool to LangChain -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # Use the tool in LangChain result = calculator_tool.run("5 * 9 + 3") diff --git a/README_de.md b/README_de.md index 331e2d1..eff692a 100644 --- a/README_de.md +++ b/README_de.md @@ -473,7 +473,7 @@ server_thread.start() time.sleep(2) # Allow server to start # Convert MCP tool to LangChain -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # Use the tool in LangChain result = calculator_tool.run("5 * 9 + 3") diff --git a/README_es.md b/README_es.md index 44a13b4..bf35ef9 100644 --- a/README_es.md +++ b/README_es.md @@ -473,7 +473,7 @@ server_thread.start() time.sleep(2) # Allow server to start # Convert MCP tool to LangChain -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # Use the tool in LangChain result = calculator_tool.run("5 * 9 + 3") diff --git a/README_fr.md b/README_fr.md index 2b438c1..732e735 100644 --- a/README_fr.md +++ b/README_fr.md @@ -473,7 +473,7 @@ server_thread.start() time.sleep(2) # Permettre au serveur de démarrer # Convertir l'outil MCP vers LangChain -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # Utiliser l'outil dans LangChain result = calculator_tool.run("5 * 9 + 3") diff --git a/README_ja.md b/README_ja.md index 273aff5..da74b48 100644 --- a/README_ja.md +++ b/README_ja.md @@ -474,7 +474,7 @@ server_thread.start() time.sleep(2) # サーバーの起動を待つ # MCP ツールを LangChain に変換 -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # LangChain でツールを使用 result = calculator_tool.run("5 * 9 + 3") diff --git a/README_zh.md b/README_zh.md index 89a91f2..6c80da5 100644 --- a/README_zh.md +++ b/README_zh.md @@ -473,7 +473,7 @@ server_thread.start() time.sleep(2) # 允许服务器启动 # 将 MCP 工具转换为 LangChain -calculator_tool = to_langchain_tool("http://localhost:5000", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:5000", "calculator")) # 在 LangChain 中使用工具 result = calculator_tool.run("5 * 9 + 3") diff --git a/examples/langchain/mcp_to_langchain.py b/examples/langchain/mcp_to_langchain.py index 5ca1f75..7d2415a 100644 --- a/examples/langchain/mcp_to_langchain.py +++ b/examples/langchain/mcp_to_langchain.py @@ -10,6 +10,7 @@ import time import threading import socket +import asyncio def find_available_port(start_port=5000, max_tries=10): """Find an available port""" @@ -91,7 +92,7 @@ def calculator(input): # 3. Convert the MCP tool to a LangChain tool print(f"\nConverting MCP tool to LangChain tool from {server_url}") - calculator_tool = to_langchain_tool(server_url, "calculator") + calculator_tool = asyncio.run(to_langchain_tool(server_url, "calculator")) # 4. Use the LangChain tool print("\nUsing the LangChain tool:") diff --git a/pyproject.toml b/pyproject.toml index d9c7c86..1a40d4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ dev = [ "flake8>=3.9.2", "mypy>=0.812", "responses>=0.13.3", + "mcp>=1.11.0" ] server = [ "flask>=2.0.0", diff --git a/python_a2a/langchain/README.md b/python_a2a/langchain/README.md index bd9e522..3541af6 100644 --- a/python_a2a/langchain/README.md +++ b/python_a2a/langchain/README.md @@ -143,10 +143,10 @@ server.run(port=8080) from python_a2a.langchain import to_langchain_tool # Convert all tools from an MCP server -tools = to_langchain_tool("http://localhost:8080") +tools = asyncio.run(to_langchain_tool("http://localhost:8080")) # Convert a specific tool -calculator_tool = to_langchain_tool("http://localhost:8080", "calculator") +calculator_tool = asyncio.run(to_langchain_tool("http://localhost:8080", "calculator")) # Use in LangChain from langchain.agents import initialize_agent @@ -191,7 +191,7 @@ The integration provides detailed error handling with specific exception types: ```python try: - tool = to_langchain_tool("http://localhost:8080", "non_existent_tool") + tool = asyncio.run(to_langchain_tool("http://localhost:8080", "non_existent_tool")) except MCPToolConversionError as e: print(f"MCP tool error: {e}") ``` diff --git a/python_a2a/langchain/exceptions.py b/python_a2a/langchain/exceptions.py index 84bfedc..07266de 100644 --- a/python_a2a/langchain/exceptions.py +++ b/python_a2a/langchain/exceptions.py @@ -15,6 +15,12 @@ def __init__(self, message=None): self.message = message or "LangChain is not installed. Install it with 'pip install langchain langchain_core'" super().__init__(self.message) +class MCPNotInstalledError(Exception): + """Raised when mcp is not installed""" + + def __init__(self, message=None): + self.message = message or "mcp is not installed. Install it with 'pip install mcp'" + class LangChainToolConversionError(LangChainIntegrationError): """Raised when a LangChain tool cannot be converted.""" pass diff --git a/python_a2a/langchain/mcp.py b/python_a2a/langchain/mcp.py index 9e99894..5f3692b 100644 --- a/python_a2a/langchain/mcp.py +++ b/python_a2a/langchain/mcp.py @@ -10,6 +10,7 @@ import json import requests from typing import Any, Dict, List, Optional, Union, Callable, Type, get_type_hints +import traceback logger = logging.getLogger(__name__) @@ -17,14 +18,15 @@ from .exceptions import ( LangChainNotInstalledError, LangChainToolConversionError, - MCPToolConversionError + MCPToolConversionError, + MCPNotInstalledError ) # Check for LangChain availability without failing try: # Try to import LangChain components try: - from langchain_core.tools import BaseTool, ToolException + from langchain_core.tools import BaseTool, ToolException, tool except ImportError: # Fall back to older LangChain structure from langchain.tools import BaseTool, ToolException @@ -40,6 +42,13 @@ class BaseTool: class ToolException(Exception): pass +# Check for MCP availability without failing +try: + from mcp import ClientSession, Tool as MCPTool + from mcp.client.streamable_http import streamablehttp_client + HAS_MCP = True +except ImportError: + HAS_MCP = False # Utility for mapping between Python types and MCP types class TypeMapper: @@ -448,11 +457,61 @@ async def wrapper(**kwargs): except Exception as e: logger.exception("Failed to create MCP server from LangChain tools") raise LangChainToolConversionError(f"Failed to convert LangChain tools: {str(e)}") + + + +async def get_tool(available_tool: MCPTool, mcp_url: str): + """ + Get langchain tool function for a mentioned MCP Tool from MCP Server + """ + tool_name = available_tool.name + if not tool_name: + logger.exception("Tool Name not found in the tool") + raise LangChainToolConversionError("Tool Name not found in the tool") + parameters = available_tool.inputSchema["properties"].values() + args = [i["title"].lower() for i in parameters] + args = [i.replace(" ", "_") for i in args] + arg_names = ", ".join(args) + func_def_str = f""" +@tool +async def {tool_name}({arg_names}): + '''Call MCP tool function''' + args2 = "{arg_names}".split(", ") + body = {{}} + for a in args2: + body[a] = locals()[a] + try: + async with streamablehttp_client("{mcp_url}") as ( + read_stream, write_stream, _, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + result = await session.call_tool("{tool_name}", body) + if "error" in result: + return f"Error: {{result['error']}}" + + # Process content in response + if "content" in result: + content = result.get("content", []) + if content and isinstance(content, list) and "text" in content[0]: + return content[0]["text"] + + # If no structured content, return the raw result + return str(result) + except BaseException as e: + return f"Error while calling tool: {{e}}" + """ + dynamic_scope = {} -def to_langchain_tool(mcp_url, tool_name=None): + exec(func_def_str, globals(), dynamic_scope) + the_tool = dynamic_scope[tool_name] + return the_tool + + +async def to_langchain_tool(mcp_url, tool_name=None): """ - Convert MCP server tool(s) to LangChain tool(s). + Get Convert MCP server tool(s) & Convert them to LangChain tool(s). Args: mcp_url: URL of the MCP server @@ -463,10 +522,13 @@ def to_langchain_tool(mcp_url, tool_name=None): Example: >>> # Convert a specific tool - >>> calculator_tool = to_langchain_tool("http://localhost:8000", "calculator") - >>> + >>> calculator_tool = await to_langchain_tool("http://localhost:8080/mcp", "calculator") + >>> #or + >>> calculator_tool = asyncio.run(to_langchain_tool("http://localhost:8080/mcp", "calculator")) >>> # Convert all tools from a server - >>> tools = to_langchain_tool("http://localhost:8000") + >>> tools = await to_langchain_tool("http://localhost:8080/mcp") + >>> #or + >>> tools = asyncio.run(to_langchain_tool("http://localhost:8080/mcp")) Raises: LangChainNotInstalledError: If LangChain is not installed @@ -475,133 +537,39 @@ def to_langchain_tool(mcp_url, tool_name=None): if not HAS_LANGCHAIN: raise LangChainNotInstalledError() - try: - # Try to import Tool from various possible locations in LangChain - try: - from langchain.tools import Tool - except ImportError: - try: - from langchain.agents import Tool - except ImportError: - raise ImportError("Cannot import Tool class from LangChain") + if not HAS_MCP: + raise MCPNotInstalledError() + try: # Get available tools from MCP server + langchain_tools = [] try: - tools_response = requests.get(f"{mcp_url}/tools") - if tools_response.status_code != 200: - raise MCPToolConversionError(f"Failed to get tools from MCP server: {tools_response.status_code}") - - available_tools = tools_response.json() - logger.info(f"Found {len(available_tools)} tools on MCP server") + async with streamablehttp_client(f"{mcp_url}") as ( + read_stream, write_stream, _, + ): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + available_tools = await session.list_tools() + + if tool_name: + available_tool = [t for t in available_tools.tools if t.name == tool_name][0] + the_tool = await get_tool(available_tool, mcp_url) + return the_tool + + all_available_tools = [t for t in available_tools.tools ] + for a_tool in all_available_tools: + the_tool = await get_tool(a_tool, mcp_url) + langchain_tools.append(the_tool) + return langchain_tools except Exception as e: logger.error(f"Error getting tools from MCP server: {e}") raise MCPToolConversionError(f"Failed to get tools from MCP server: {str(e)}") # Filter tools if a specific tool is requested - if tool_name is not None: - available_tools = [t for t in available_tools if t.get("name") == tool_name] - if not available_tools: - raise MCPToolConversionError(f"Tool '{tool_name}' not found on MCP server at {mcp_url}") - - # Create LangChain tools - langchain_tools = [] - - for tool_info in available_tools: - name = tool_info.get("name", "unnamed_tool") - description = tool_info.get("description", f"MCP Tool: {name}") - parameters = tool_info.get("parameters", []) - - logger.info(f"Creating LangChain tool for MCP tool: {name}") - - # Create function to call the MCP tool - def create_tool_func(tool_name): - # Need this wrapper to properly capture tool_name in closure - def tool_func(*args, **kwargs): - """Call MCP tool function""" - try: - # Handle different input patterns - if len(args) == 1 and not kwargs: - input_value = args[0] - - # If the input looks like a parameter=value string (for multi-param tools) - if '=' in input_value and not input_value.startswith('{'): - # Parse simple param=value¶m2=value2 format - params = {} - for pair in input_value.split('&'): - if '=' in pair: - k, v = pair.split('=', 1) - params[k.strip()] = v.strip() - if params: - kwargs = params - # Try to detect parameter format based on tool parameters - elif parameters and len(parameters) == 1: - # Single parameter case - use the parameter name from tool info - param_name = parameters[0]["name"] - kwargs = {param_name: input_value} - else: - # Default to 'input' parameter - kwargs = {"input": input_value} - - # Call the MCP tool - response = requests.post( - f"{mcp_url}/tools/{tool_name}", - json=kwargs - ) - - if response.status_code != 200: - return f"Error: HTTP {response.status_code} - {response.text}" - - # Parse the response - result = response.json() - - # Process error in response - if "error" in result: - return f"Error: {result['error']}" - - # Process content in response - if "content" in result: - content = result.get("content", []) - if content and isinstance(content, list) and "text" in content[0]: - return content[0]["text"] - - # If no structured content, return the raw result - return str(result) - except Exception as e: - logger.exception(f"Error calling MCP tool {tool_name}") - return f"Error calling tool: {str(e)}" - - return tool_func - - # Create the tool with a function that properly handles tool name in closure - tool_func = create_tool_func(name) - - # Create the LangChain tool - lc_tool = Tool( - name=name, - description=description, - func=tool_func - ) - - # Add metadata if applicable - if hasattr(lc_tool, "metadata"): - lc_tool.metadata = { - "source": "mcp", - "url": mcp_url, - "parameters": parameters - } - - langchain_tools.append(lc_tool) - logger.info(f"Successfully created LangChain tool: {name}") - - # Return single tool if requested, otherwise return list - if tool_name is not None and len(langchain_tools) == 1: - return langchain_tools[0] - - return langchain_tools except MCPToolConversionError: # Re-raise without wrapping raise except Exception as e: logger.exception("Failed to convert MCP tool to LangChain format") - raise MCPToolConversionError(f"Failed to convert MCP tool: {str(e)}") \ No newline at end of file + raise MCPToolConversionError(f"Failed to convert MCP tool: {str(e)}") diff --git a/tests/basic_test.py b/tests/basic_test.py index d8905b6..3892c0c 100644 --- a/tests/basic_test.py +++ b/tests/basic_test.py @@ -11,7 +11,7 @@ from python_a2a import ( Message, TextContent, FunctionCallContent, FunctionResponseContent, - FunctionParameter, MessageRole, MessageType, Conversation + FunctionParameter, MessageRole, Conversation ) from python_a2a.utils import pretty_print_message, create_text_message, format_function_params diff --git a/tests/test_langchain.py b/tests/test_langchain.py index 9d152ec..643ac08 100644 --- a/tests/test_langchain.py +++ b/tests/test_langchain.py @@ -16,160 +16,8 @@ pytestmark = pytest.mark.skipif(not LANGCHAIN_AVAILABLE, reason="LangChain not installed") # Import the components to test -from python_a2a.langchain import ToolServer, LangChainBridge, AgentFlow -class TestToolServer: - """Tests for the ToolServer class.""" - - def test_initialization(self): - """Test initializing a ToolServer.""" - # Create a mock BaseTool - mock_tool = Mock() - mock_tool.name = "test_tool" - mock_tool.description = "A test tool" - mock_tool._run = Mock(return_value="Tool result") - - # Initialize a ToolServer - server = ToolServer(name="Test Server", description="Test Description") - - # Check basic properties - assert server.name == "Test Server" - assert server.description == "Test Description" - - # Register the mock tool - server.register_tool(mock_tool) - - # Check if tool was registered - assert mock_tool.name in server.tools_map - assert server.tools_map[mock_tool.name] == mock_tool - - def test_from_tools(self): - """Test creating a ToolServer from a list of tools.""" - # Create mock tools - mock_tool1 = Mock() - mock_tool1.name = "tool1" - mock_tool1.description = "Tool 1" - mock_tool1._run = Mock(return_value="Tool 1 result") - - mock_tool2 = Mock() - mock_tool2.name = "tool2" - mock_tool2.description = "Tool 2" - mock_tool2._run = Mock(return_value="Tool 2 result") - - # Create ToolServer from tools - server = ToolServer.from_tools( - [mock_tool1, mock_tool2], - name="Tools Server", - description="Server with tools" - ) - - # Check if both tools were registered - assert mock_tool1.name in server.tools_map - assert mock_tool2.name in server.tools_map - assert len(server.tools_map) == 2 - - -class TestLangChainBridge: - """Tests for the LangChainBridge class.""" - - def test_agent_to_a2a(self): - """Test converting a LangChain agent to an A2A agent.""" - # Create a mock AgentExecutor - mock_agent = Mock() - mock_agent.run = Mock(return_value="Agent response") - - # Convert to A2A - with patch('python_a2a.server.A2AServer'): - a2a_agent = LangChainBridge.agent_to_a2a(mock_agent, name="Test Agent") - - # Check basic properties - assert a2a_agent.name == "Test Agent" - assert a2a_agent.agent_executor == mock_agent - - def test_agent_to_tool(self): - """Test converting an A2A agent to a LangChain tool.""" - # Mock the A2AClient - with patch('python_a2a.client.A2AClient') as mock_client_class: - # Configure the mock - mock_client = Mock() - mock_client.get_agent_info = Mock(return_value={ - "name": "Mock Agent", - "description": "A mock agent" - }) - mock_client.ask = Mock(return_value="Agent response") - mock_client_class.return_value = mock_client - - # Convert to LangChain tool - tool = LangChainBridge.agent_to_tool("http://localhost:5000") - - # Check basic properties - assert tool.name == "Mock Agent" - assert tool.description == "A mock agent" - - # Test the tool - result = tool._run("test query") - assert result == "Agent response" - mock_client.ask.assert_called_once_with("test query") - - -class TestAgentFlow: - """Tests for the AgentFlow class.""" - - def test_initialization(self): - """Test initializing an AgentFlow.""" - # Create a mock AgentNetwork - mock_network = Mock() - mock_network.name = "Test Network" - - # Initialize AgentFlow - flow = AgentFlow(agent_network=mock_network, name="Test Flow") - - # Check basic properties - assert flow.name == "Test Flow" - assert flow.agent_network == mock_network - - @pytest.mark.asyncio - async def test_add_langchain_step(self): - """Test adding a LangChain step to a flow.""" - # Create a mock LangChain component - mock_chain = Mock() - mock_chain.run = Mock(return_value="Chain result") - - # Create a mock agent network - mock_network = Mock() - - # Create flow - flow = AgentFlow(agent_network=mock_network, name="Test Flow") - - # Add LangChain step - result_flow = flow.add_langchain_step(mock_chain, "test input {var}") - - # Verify method chaining works - assert result_flow == flow - - # Verify step was added - assert len(flow.steps) == 1 - assert flow.steps[0].type == "FUNCTION" - - @pytest.mark.asyncio - async def test_add_tool_step(self): - """Test adding a tool step to a flow.""" - # Create a mock agent network - mock_network = Mock() - - # Create flow - flow = AgentFlow(agent_network=mock_network, name="Test Flow") - - # Add tool step - result_flow = flow.add_tool_step("server.tool", param1="value1") - - # Verify method chaining works - assert result_flow == flow - - # Verify step was added - assert len(flow.steps) == 1 - assert flow.steps[0].type == "FUNCTION" if __name__ == "__main__":