-
Notifications
You must be signed in to change notification settings - Fork 135
Agent Creation
This guide explains how to create custom A2A agents using Python A2A. You'll learn the different agent creation patterns, how to define skills, and advanced agent configurations.
The simplest way to create an agent is by extending the A2AServer class:
from python_a2a import A2AServer, agent, run_server
@agent(
name="My Custom Agent",
description="A custom A2A agent",
version="1.0.0"
)
class MyAgent(A2AServer):
def handle_task(self, task):
"""Process an A2A task."""
return {"output": f"Processed task: {task.input}"}
# Create and run the agent
if __name__ == "__main__":
my_agent = MyAgent()
run_server(my_agent, port=5000)Skills are capabilities that your agent can expose to users:
from python_a2a import A2AServer, agent, skill, run_server
@agent(
name="Multi-Skill Agent",
description="Agent with multiple specialized skills"
)
class MultiSkillAgent(A2AServer):
@skill(
name="Calculate Sum",
description="Add two numbers together",
parameters={
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
}
)
def calculate_sum(self, a, b):
return a + b
@skill(
name="Calculate Product",
description="Multiply two numbers",
parameters={
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
}
)
def calculate_product(self, a, b):
return a * b
@skill(
name="Greet User",
description="Generate a greeting for the user",
parameters={
"name": {"type": "string", "description": "User's name"}
}
)
def greet_user(self, name):
return f"Hello, {name}! Welcome to the A2A ecosystem."
def handle_task(self, task):
"""Process tasks based on function calls or natural language."""
# Check if this is a function call
if task.function_call:
function_name = task.function_call.name
args = task.function_call.arguments
if function_name == "calculate_sum":
result = self.calculate_sum(args.get("a", 0), args.get("b", 0))
return {"output": str(result)}
elif function_name == "calculate_product":
result = self.calculate_product(args.get("a", 0), args.get("b", 0))
return {"output": str(result)}
elif function_name == "greet_user":
result = self.greet_user(args.get("name", "Guest"))
return {"output": result}
# Default response for text input
return {"output": "I can help with calculations and greetings. Try calling a specific function."}
# Create and run the agent
if __name__ == "__main__":
multi_skill_agent = MultiSkillAgent()
run_server(multi_skill_agent, port=5000)Function calling enables structured interaction with your agent:
from python_a2a import A2AServer, agent, skill, run_server
import json
@agent(
name="Function-Enabled Agent",
description="Agent that supports function calling"
)
class FunctionEnabledAgent(A2AServer):
@skill(
name="Search Database",
description="Search a database for information",
parameters={
"query": {"type": "string", "description": "Search query"},
"filters": {
"type": "object",
"description": "Optional filters",
"properties": {
"category": {"type": "string"},
"max_results": {"type": "integer"}
}
}
}
)
def search_database(self, query, filters=None):
# Simulated database search
filters = filters or {}
category = filters.get("category", "all")
max_results = filters.get("max_results", 5)
results = [
{"id": 1, "title": f"Result 1 for {query} in {category}"},
{"id": 2, "title": f"Result 2 for {query} in {category}"},
{"id": 3, "title": f"Result 3 for {query} in {category}"},
{"id": 4, "title": f"Result 4 for {query} in {category}"},
{"id": 5, "title": f"Result 5 for {query} in {category}"}
]
return results[:max_results]
def handle_task(self, task):
# Handle function calls
if task.function_call:
function_name = task.function_call.name
args = task.function_call.arguments
if function_name == "search_database":
query = args.get("query", "")
filters = args.get("filters", {})
results = self.search_database(query, filters)
return {"output": json.dumps(results, indent=2)}
# Handle text input (try to extract intent)
if isinstance(task.input, str):
input_text = task.input.lower()
if "search" in input_text or "find" in input_text or "query" in input_text:
# Extract search query
query = input_text.replace("search", "").replace("find", "").replace("query", "").strip()
results = self.search_database(query)
return {"output": f"Search results for '{query}':\n" + "\n".join(f"- {r['title']}" for r in results)}
return {"output": "I can search a database for you. Please provide a search query."}
# Create and run the agent
if __name__ == "__main__":
function_agent = FunctionEnabledAgent()
run_server(function_agent, port=5000)Add streaming capabilities to provide real-time responses:
from python_a2a import A2AServer, agent, run_server
import asyncio
@agent(
name="Streaming Agent",
description="Agent with streaming response capabilities"
)
class StreamingAgent(A2AServer):
def handle_task(self, task):
"""Regular task handler for non-streaming requests."""
return {"output": f"Non-streaming response for: {task.input}"}
async def stream_response(self, message):
"""Stream a response token by token."""
response = f"Streaming response for: {message}"
words = response.split()
for word in words:
yield {"content": word + " "}
await asyncio.sleep(0.2) # Simulate thinking time
# Create and run the agent
if __name__ == "__main__":
streaming_agent = StreamingAgent()
run_server(streaming_agent, port=5000)Create agents powered by language models:
from python_a2a import A2AServer, agent, run_server
from python_a2a.client.llm.openai import OpenAILLMClient
import os
import json
@agent(
name="OpenAI-Powered Agent",
description="Agent backed by OpenAI language models"
)
class OpenAIAgent(A2AServer):
def __init__(self):
super().__init__()
self.llm_client = OpenAILLMClient(
api_key=os.environ.get("OPENAI_API_KEY"),
model="gpt-4"
)
def handle_task(self, task):
"""Process a task using OpenAI's language model."""
# Define system instructions for the LLM
system_message = """
You are a helpful assistant that provides concise, accurate information.
Stick to the facts and provide sources when possible.
"""
# Create a prompt for the LLM
user_message = f"User query: {task.input}\nProvide a helpful response."
# Get the response from the LLM
response = self.llm_client.complete(
system=system_message,
user=user_message
)
return {"output": response.content}
async def stream_response(self, message):
"""Stream a response from the LLM."""
system_message = """
You are a helpful assistant that provides concise, accurate information.
Stick to the facts and provide sources when possible.
"""
user_message = f"User query: {message}\nProvide a helpful response."
# Stream the response from the LLM
async for chunk in self.llm_client.stream_complete(
system=system_message,
user=user_message
):
yield {"content": chunk.content}
# Create and run the agent
if __name__ == "__main__":
openai_agent = OpenAIAgent()
run_server(openai_agent, port=5000)Implement conversation handling for multi-turn interactions:
from python_a2a import A2AServer, agent, run_server
from python_a2a.models import Conversation
@agent(
name="Conversation Agent",
description="Agent that maintains conversation context"
)
class ConversationAgent(A2AServer):
def __init__(self):
super().__init__()
self.conversations = {} # Store conversation state
def handle_task(self, task):
"""Process a one-off task."""
return {"output": f"Task processed: {task.input}"}
def handle_conversation(self, conversation):
"""Handle a multi-turn conversation."""
# Get conversation ID and create if not exists
conversation_id = conversation.id
if conversation_id not in self.conversations:
self.conversations[conversation_id] = {
"turn_count": 0,
"topics": set(),
"last_topic": None
}
# Update conversation state
conv_state = self.conversations[conversation_id]
conv_state["turn_count"] += 1
# Get the latest message
if not conversation.messages:
return {"content": "Hello! How can I help you today?"}
latest_message = conversation.messages[-1]
# Extract potential topics from the message
message_text = latest_message.content.lower()
potential_topics = ["weather", "travel", "food", "sports", "technology"]
for topic in potential_topics:
if topic in message_text:
conv_state["topics"].add(topic)
conv_state["last_topic"] = topic
# Generate a contextual response
if conv_state["turn_count"] == 1:
response = f"Hello! I see you're interested in {conv_state['last_topic'] or 'chatting'}. How can I help?"
else:
if conv_state["last_topic"]:
response = f"Let's continue discussing {conv_state['last_topic']}. What would you like to know?"
else:
response = f"I see this is your turn #{conv_state['turn_count']}. How can I help you today?"
return {"content": response}
# Create and run the agent
if __name__ == "__main__":
conversation_agent = ConversationAgent()
run_server(conversation_agent, port=5000)Create agents that compose multiple specialized agents:
from python_a2a import A2AServer, agent, run_server, HTTPClient
import threading
@agent(
name="Composition Agent",
description="Agent that composes multiple specialized agents"
)
class CompositionAgent(A2AServer):
def __init__(self):
super().__init__()
# Define endpoints for specialized agents
self.weather_endpoint = "http://localhost:5001"
self.travel_endpoint = "http://localhost:5002"
self.calculator_endpoint = "http://localhost:5003"
# Create clients for each agent
self.weather_client = HTTPClient(self.weather_endpoint)
self.travel_client = HTTPClient(self.travel_endpoint)
self.calculator_client = HTTPClient(self.calculator_endpoint)
def handle_task(self, task):
"""Process a task by routing to specialized agents."""
input_text = task.input.lower() if isinstance(task.input, str) else ""
# Route to specialized agents based on keywords
if any(word in input_text for word in ["weather", "temperature", "forecast", "rain", "sunny"]):
# Weather query
response = self.weather_client.send_message(task.input)
return {"output": f"Weather Agent says: {response.content}"}
elif any(word in input_text for word in ["travel", "flight", "hotel", "vacation", "trip"]):
# Travel query
response = self.travel_client.send_message(task.input)
return {"output": f"Travel Agent says: {response.content}"}
elif any(word in input_text for word in ["calculate", "math", "add", "subtract", "multiply", "divide"]):
# Math query
response = self.calculator_client.send_message(task.input)
return {"output": f"Calculator Agent says: {response.content}"}
# Default response if no specialized agent matches
return {"output": "I can help with weather, travel, and calculations. Please specify which service you need."}
# Create and run the agent
if __name__ == "__main__":
composition_agent = CompositionAgent()
run_server(composition_agent, port=5000)-
Modular Design
- Separate concerns with distinct agent responsibilities
- Use composition over inheritance where possible
- Create reusable skill modules
-
Robust Error Handling
- Validate inputs thoroughly
- Use try/except blocks for external service calls
- Provide helpful error messages
-
Performance Optimization
- Implement caching for expensive operations
- Use async methods for I/O-bound operations
- Batch related operations where possible
-
Security Considerations
- Validate and sanitize inputs
- Implement rate limiting
- Use appropriate authentication
-
Documentation
- Document each skill thoroughly
- Provide usage examples
- Keep the agent description up-to-date
By following these patterns and best practices, you can create powerful, specialized A2A agents that interoperate with the broader agent ecosystem.