-
Notifications
You must be signed in to change notification settings - Fork 135
API Reference
This page provides a comprehensive reference for the Python A2A library's API.
The base class for creating A2A-compatible agents.
class A2AServer:
def __init__(self):
"""Initialize a new A2A server."""
pass
def handle_task(self, task):
"""Process an A2A task and return a result."""
raise NotImplementedError("Subclasses must implement handle_task")
def create_task(self, task_input):
"""Create a new task with the given input."""
pass
def get_task_status(self, task_id):
"""Get the status of a task."""
pass
def get_task_output(self, task_id):
"""Get the output of a completed task."""
pass
def handle_conversation(self, conversation):
"""Process a conversation with multiple messages."""
pass
def create_conversation(self):
"""Create a new conversation."""
pass
def add_message(self, conversation_id, message_content, role="user"):
"""Add a message to a conversation."""
pass
def get_conversation(self, conversation_id):
"""Get a conversation by ID."""
pass
async def stream_response(self, message):
"""Stream a response token by token."""
raise NotImplementedError("Streaming is not implemented")
async def stream_task_updates(self, task_id):
"""Stream updates for a specific task."""
raise NotImplementedError("Task streaming is not implemented")@agent(name, description, version=None, contact=None, examples=None)
def agent_decorator(cls):
"""Decorator for A2A agents."""
pass
@skill(name, description, parameters=None, examples=None)
def skill_decorator(func):
"""Decorator for agent skills."""
passdef run_server(agent, host="localhost", port=5000, debug=False):
"""Run an A2A agent as a server."""
passThe client for connecting to A2A agents.
class HTTPClient:
def __init__(self, endpoint, timeout=60, max_retries=0, retry_delay=0.5, headers=None, auth=None):
"""Initialize a new HTTP client for an A2A agent."""
pass
def send_message(self, message):
"""Send a message to the agent and get a response."""
pass
async def send_message_async(self, message):
"""Send a message asynchronously."""
pass
def call_function(self, function_name, arguments):
"""Call a specific function on the agent."""
pass
async def stream_response(self, message):
"""Stream a response from the agent."""
pass
def create_task(self, task_input):
"""Create a new task on the agent."""
pass
def get_task_status(self, task_id):
"""Get the status of a task."""
pass
def get_task_output(self, task_id):
"""Get the output of a completed task."""
pass
async def stream_task_updates(self, task_id):
"""Stream updates for a specific task."""
pass
def create_conversation(self):
"""Create a new conversation."""
pass
def add_message(self, conversation_id, message_content, role="user"):
"""Add a message to a conversation."""
pass
def get_conversation(self, conversation_id):
"""Get a conversation by ID."""
pass
def send_message_to(self, agent_name, message):
"""Send a message to a specific agent in a network."""
passManages collections of connected agents.
class AgentNetwork:
def __init__(self, name=None, description=None):
"""Initialize a new agent network."""
pass
def add(self, name, endpoint, agent_type=None, description=None, is_fallback=False):
"""Add an agent to the network."""
pass
def remove(self, name):
"""Remove an agent from the network."""
pass
def get_agent(self, name):
"""Get an agent by name."""
pass
def get_agents_by_type(self, agent_type):
"""Get all agents of a specific type."""
pass
def list_agents(self):
"""List all agents in the network."""
passRepresents a discrete unit of work.
class Task:
def __init__(self, task_id, input, status=None, function_call=None):
"""Initialize a new task."""
self.task_id = task_id
self.input = input
self.status = status or TaskStatus.PENDING
self.function_call = function_callEnumeration of possible task statuses.
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"Represents a structured function call.
class FunctionCall:
def __init__(self, name, arguments=None):
"""Initialize a new function call."""
self.name = name
self.arguments = arguments or {}Represents a message in a conversation.
class Message:
def __init__(self, content, role="user", id=None, function_call=None):
"""Initialize a new message."""
self.id = id or str(uuid.uuid4())
self.content = content
self.role = role
self.function_call = function_call
self.created_at = datetime.now().isoformat()Represents a multi-turn conversation.
class Conversation:
def __init__(self, id=None, messages=None):
"""Initialize a new conversation."""
self.id = id or str(uuid.uuid4())
self.messages = messages or []
self.created_at = datetime.now().isoformat()Metadata about an agent.
class AgentCard:
def __init__(self, name, description=None, version=None, skills=None, examples=None, contact=None):
"""Initialize a new agent card."""
self.name = name
self.description = description
self.version = version
self.skills = skills or []
self.examples = examples or []
self.contact = contactMetadata about an agent skill.
class SkillCard:
def __init__(self, name, description=None, parameters=None, examples=None):
"""Initialize a new skill card."""
self.name = name
self.description = description
self.parameters = parameters or {}
self.examples = examples or []Client for connecting to MCP servers.
class MCPClient:
def __init__(self, mcp_server_url, timeout=60):
"""Initialize a new MCP client."""
pass
def list_tools(self):
"""List all available tools on the MCP server."""
pass
def call_tool(self, tool_name, arguments=None):
"""Call a specific tool on the MCP server."""
pass
async def call_tool_async(self, tool_name, arguments=None):
"""Call a tool asynchronously."""
passQuick implementation of MCP servers.
class FastMCP:
def __init__(self, name=None, description=None):
"""Initialize a new FastMCP server."""
pass
def tool(self, name=None, description=None):
"""Decorator for registering a tool."""
pass
def add_tool(self, name, function, schema=None):
"""Add a tool to the MCP server."""
pass
def remove_tool(self, name):
"""Remove a tool from the MCP server."""
pass
def list_tools(self):
"""List all available tools."""
passA2A agent with MCP capabilities.
class MCPAgent(A2AServer):
def __init__(self, name=None, description=None, mcp_server_url=None, llm_client=None):
"""Initialize a new MCP-enabled agent."""
super().__init__()
self.name = name
self.description = description
self.mcp_client = MCPClient(mcp_server_url) if mcp_server_url else None
self.llm_client = llm_clientRegistry for agent discovery.
class AgentRegistry(A2AServer):
def __init__(self, name=None, description=None):
"""Initialize a new agent registry."""
super().__init__()
self.name = name or "A2A Registry"
self.description = description or "Registry for A2A agents"
self.agents = {}Client for discovering agents.
class DiscoveryClient:
def __init__(self, registry_url):
"""Initialize a new discovery client."""
self.registry_url = registry_url
self.client = HTTPClient(registry_url)
def list_agents(self):
"""List all registered agents."""
pass
def find_agents(self, name_contains=None, description_contains=None, skill_contains=None):
"""Find agents matching specific criteria."""
pass
def register_agent(self, name, endpoint, description=None, agent_card=None):
"""Register an agent with the registry."""
pass
def unregister_agent(self, agent_id):
"""Unregister an agent from the registry."""
passOrchestrates multi-agent workflows.
class Flow:
def __init__(self, agent_network=None):
"""Initialize a new workflow."""
self.agent_network = agent_network
self.steps = []
self.current_branch = self.steps
self.branch_stack = []
self.condition_stack = []
def ask(self, agent_name, message_template):
"""Add a step to ask an agent a question."""
pass
def if_contains(self, text):
"""Start a conditional branch based on response content."""
pass
def else_if_contains(self, text):
"""Add an else-if condition to the current branch."""
pass
def else_branch(self):
"""Add an else branch to the current condition."""
pass
def end_if(self):
"""End the current conditional branch."""
pass
def parallel(self):
"""Create a set of parallel steps."""
pass
def execute(self, variables=None):
"""Execute the workflow with the given variables."""
passRepresents a step in a workflow.
class Step:
def __init__(self, agent_name, message_template):
"""Initialize a new workflow step."""
self.agent_name = agent_name
self.message_template = message_template
self.result = None
def execute(self, client, variables):
"""Execute this step with the given client and variables."""
passClient for OpenAI language models.
class OpenAILLMClient:
def __init__(self, api_key=None, model="gpt-3.5-turbo"):
"""Initialize a new OpenAI LLM client."""
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.model = model
def complete(self, system=None, user=None, messages=None):
"""Generate a completion from the LLM."""
pass
async def stream_complete(self, system=None, user=None, messages=None):
"""Stream a completion from the LLM."""
passClient for Anthropic language models.
class AnthropicLLMClient:
def __init__(self, api_key=None, model="claude-2"):
"""Initialize a new Anthropic LLM client."""
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
self.model = model
def complete(self, system=None, user=None, messages=None):
"""Generate a completion from the LLM."""
pass
async def stream_complete(self, system=None, user=None, messages=None):
"""Stream a completion from the LLM."""
passConverts an A2A agent to a LangChain agent.
def to_langchain_agent(a2a_agent, callback=None):
"""Convert an A2A agent to a LangChain agent."""
passConverts a LangChain component to an A2A server.
def to_a2a_server(langchain_component, name=None, description=None):
"""Convert a LangChain component to an A2A server."""
passConverts an MCP tool to a LangChain tool.
def to_langchain_tool(mcp_server_url, tool_name):
"""Convert an MCP tool to a LangChain tool."""
passConverts LangChain tools to an MCP server.
def to_mcp_server(tools, name=None, description=None):
"""Convert LangChain tools to an MCP server."""
passBase class for all A2A exceptions.
class A2AError(Exception):
"""Base class for all A2A exceptions."""
passError connecting to an A2A agent.
class A2AConnectionError(A2AError):
"""Error connecting to an A2A agent."""
passTimeout while communicating with an A2A agent.
class A2ATimeoutError(A2AError):
"""Timeout while communicating with an A2A agent."""
passAuthentication error when connecting to an A2A agent.
class A2AAuthenticationError(A2AError):
"""Authentication error when connecting to an A2A agent."""
passValidation error for A2A data structures.
class A2AValidationError(A2AError):
"""Validation error for A2A data structures."""
passRun an A2A server.
def run_server(agent, host="localhost", port=5000, debug=False):
"""Run an A2A agent as a server."""
passEnable agent discovery.
def enable_discovery(agent, registry_url, heartbeat_interval=60):
"""Enable agent discovery for an agent."""
passCreate a structured function call.
def create_function_call(function_name, **arguments):
"""Create a function call with the given name and arguments."""
return FunctionCall(name=function_name, arguments=arguments)For complete and up-to-date API documentation, please refer to the official documentation.