Skip to content

Advanced Usage

Manoj Desai edited this page May 3, 2025 · 1 revision

Advanced Usage

This section covers advanced features and techniques for building sophisticated agent systems with Python A2A.

Agent Discovery

Agent discovery enables agents to find and connect to each other dynamically:

from python_a2a.discovery import AgentRegistry, enable_discovery
from python_a2a import A2AServer, agent, run_server

# Create a registry server
registry = AgentRegistry(name="A2A Registry Server")

# Run the registry server
# In a production environment, run this on a separate machine
import threading
threading.Thread(
    target=run_server,
    args=(registry,),
    kwargs={"port": 7000},
    daemon=True
).start()

# Create agents with discovery enabled
@agent(
    name="Weather Agent",
    description="Provides weather information"
)
class WeatherAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Weather info for {task.input}"}

@agent(
    name="Travel Agent",
    description="Provides travel recommendations"
)
class TravelAgent(A2AServer):
    def handle_task(self, task):
        return {"output": f"Travel recommendations for {task.input}"}

# Create and run the agents with discovery
registry_url = "http://localhost:7000"

weather_agent = WeatherAgent()
weather_discovery = enable_discovery(weather_agent, registry_url=registry_url)

travel_agent = TravelAgent()
travel_discovery = enable_discovery(travel_agent, registry_url=registry_url)

# Run the agents
threading.Thread(
    target=run_server,
    args=(weather_agent,),
    kwargs={"port": 5001},
    daemon=True
).start()

threading.Thread(
    target=run_server,
    args=(travel_agent,),
    kwargs={"port": 5002},
    daemon=True
).start()

# Now clients can discover these agents through the registry
from python_a2a.discovery import DiscoveryClient

# Create a discovery client
discovery = DiscoveryClient(registry_url)

# Find agents by name or description
weather_agents = discovery.find_agents(name_contains="Weather")
travel_agents = discovery.find_agents(description_contains="travel")

print(f"Found weather agents: {[a.name for a in weather_agents]}")
print(f"Found travel agents: {[a.name for a in travel_agents]}")

# Connect to a discovered agent
from python_a2a import HTTPClient

if weather_agents:
    weather_agent_info = weather_agents[0]
    client = HTTPClient(weather_agent_info.endpoint)
    response = client.send_message("New York")
    print(f"Weather response: {response.content}")

Smart Routing

Smart routing automatically directs messages to the most appropriate agent:

from python_a2a import AgentNetwork, HTTPClient
from python_a2a.client.router import SmartRouter
from python_a2a.client.llm.openai import OpenAILLMClient
import os

# Create an agent network
network = AgentNetwork(name="Assistant Network")
network.add("weather", "http://localhost:5001")
network.add("travel", "http://localhost:5002")
network.add("calculator", "http://localhost:5003")

# Create a smart router with an LLM
router = SmartRouter(
    network=network,
    llm_client=OpenAILLMClient(
        api_key=os.environ.get("OPENAI_API_KEY"),
        model="gpt-3.5-turbo"
    )
)

# Create a client with the smart router
client = HTTPClient(router)

# Send messages - they'll be automatically routed to the appropriate agent
weather_query = "What's the weather in Tokyo today?"
travel_query = "Recommend hotels in Paris."
math_query = "What is 56 * 23?"

print("Weather query response:", client.send_message(weather_query).content)
print("Travel query response:", client.send_message(travel_query).content)
print("Math query response:", client.send_message(math_query).content)

Advanced Workflows

Complex workflows with conditional branching and parallel execution:

from python_a2a import AgentNetwork, Flow
from python_a2a.workflow import Step, Condition, ParallelSteps

# Create an agent network
network = AgentNetwork(name="Travel Planning Network")
network.add("weather", "http://localhost:5001")
network.add("hotels", "http://localhost:5002")
network.add("flights", "http://localhost:5003")
network.add("activities", "http://localhost:5004")
network.add("recommendations", "http://localhost:5005")

# Create a complex workflow
flow = Flow(agent_network=network)

# Initial steps
flow.ask("weather", "What's the weather forecast for {destination} during {dates}?")

# Conditional branching based on weather
flow.if_contains("sunny")
# If sunny, get outdoor activities
flow.ask("activities", "Recommend outdoor activities in {destination}")
flow.else_if_contains("rainy")
# If rainy, get indoor activities
flow.ask("activities", "Recommend indoor activities in {destination}")
flow.else_branch()
# Otherwise, get mixed activities
flow.ask("activities", "Recommend both indoor and outdoor activities in {destination}")
flow.end_if()

# Parallel steps for hotels and flights
with flow.parallel() as parallel:
    parallel.add(Step("hotels", "Find hotels in {destination} for {dates}"))
    parallel.add(Step("flights", "Find flights to {destination} for {dates}"))

# Final recommendation
flow.ask("recommendations", "Create an itinerary for {destination} based on the weather, activities, hotels, and flights")

# Execute the workflow
result = flow.execute(variables={
    "destination": "Barcelona",
    "dates": "June 15-20, 2023"
})

print(result.final_output)
# Detailed itinerary history is available in result.history

Push Notifications

Enable real-time updates from agents to clients:

from python_a2a import A2AServer, agent, run_server, HTTPClient
from python_a2a.models import PushSubscription
import asyncio
import threading

@agent(
    name="Alerting Agent",
    description="Sends alerts and updates"
)
class AlertingAgent(A2AServer):
    def __init__(self):
        super().__init__()
        self.subscriptions = {}
        self.next_id = 1
    
    def handle_task(self, task):
        return {"output": "Monitoring activated. You will receive alerts."}
    
    def register_subscription(self, callback_url):
        subscription_id = str(self.next_id)
        self.next_id += 1
        self.subscriptions[subscription_id] = PushSubscription(
            id=subscription_id,
            callback_url=callback_url
        )
        return subscription_id
    
    def remove_subscription(self, subscription_id):
        if subscription_id in self.subscriptions:
            del self.subscriptions[subscription_id]
    
    async def send_alerts(self):
        while True:
            # In a real app, this would be triggered by events
            for sub_id, subscription in self.subscriptions.items():
                message = f"Alert notification at {asyncio.get_event_loop().time()}"
                await self.push_notification(subscription, message)
            await asyncio.sleep(10)  # Send alerts every 10 seconds

# Extending the standard client to handle notifications
class NotificationClient(HTTPClient):
    def __init__(self, endpoint):
        super().__init__(endpoint)
        self.callback_server_url = "http://localhost:6000"  # Where notifications will be sent
        self.subscription_id = None
    
    def subscribe_to_notifications(self):
        response = self.client.post(
            f"{self.endpoint}/subscribe",
            json={"callback_url": self.callback_server_url}
        )
        self.subscription_id = response.json()["subscription_id"]
        print(f"Subscribed with ID: {self.subscription_id}")
    
    def unsubscribe(self):
        if self.subscription_id:
            self.client.post(
                f"{self.endpoint}/unsubscribe",
                json={"subscription_id": self.subscription_id}
            )
            print(f"Unsubscribed ID: {self.subscription_id}")
            self.subscription_id = None

# Run the agent with notification support
alert_agent = AlertingAgent()

# Start the notification sender
async def start_alerts():
    await alert_agent.send_alerts()

# Run the agent server
def run_agent_server():
    run_server(alert_agent, port=5000)

# Start both threads
if __name__ == "__main__":
    agent_thread = threading.Thread(target=run_agent_server, daemon=True)
    agent_thread.start()
    
    # Wait for server to start
    import time
    time.sleep(1)
    
    # Create a client and subscribe
    client = NotificationClient("http://localhost:5000")
    client.subscribe_to_notifications()
    
    # Send initial message
    response = client.send_message("Start monitoring")
    print(response.content)
    
    # Keep the main thread running to receive notifications
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        client.unsubscribe()
        print("Monitoring stopped")

Task-Based Streaming

Advanced streaming with task-based updates:

from python_a2a import A2AServer, agent, run_server, HTTPClient
from python_a2a.models import Task, TaskStatus
import asyncio
import threading
import time

@agent(
    name="Research Agent",
    description="Performs in-depth research with real-time updates"
)
class ResearchAgent(A2AServer):
    async def handle_task_streaming(self, task):
        """Stream task progress in real time."""
        # Start with a planning phase
        yield {"content": "Planning research approach...\n"}
        await asyncio.sleep(1)
        
        # Gathering information phase
        yield {"content": "Gathering sources...\n"}
        await asyncio.sleep(1)
        
        # Processing phase
        yield {"content": "Analyzing information from multiple sources...\n"}
        await asyncio.sleep(1)
        
        for i in range(5):
            yield {"content": f"Researching subtopic {i+1}/5...\n"}
            await asyncio.sleep(0.5)
        
        # Conclusion phase
        yield {"content": "Compiling final results...\n"}
        await asyncio.sleep(1)
        
        # Final output
        yield {"content": f"Research complete on topic: {task.input}\n\nFindings:\n- Key insight 1\n- Key insight 2\n- Key insight 3"}

# Run the agent
research_agent = ResearchAgent()

# Start the agent server
def run_agent_server():
    run_server(research_agent, port=5000)

# Stream the results
async def stream_research():
    client = HTTPClient("http://localhost:5000")
    
    print("Starting research with streaming...")
    async for chunk in client.stream_response("Quantum computing advances"):
        print(chunk.content, end="", flush=True)
    print("\nResearch complete!")

if __name__ == "__main__":
    # Start the agent server
    server_thread = threading.Thread(target=run_agent_server, daemon=True)
    server_thread.start()
    
    # Wait for server to start
    time.sleep(1)
    
    # Run the streaming client
    asyncio.run(stream_research())

Custom Agent Implementations

Implementing custom agent logic for specialized use cases:

from python_a2a import A2AServer, agent, run_server
from python_a2a.models import Task, TaskStatus, FunctionCall, Conversation, Message
import json

@agent(
    name="Custom Logic Agent",
    description="Agent with specialized processing logic"
)
class CustomLogicAgent(A2AServer):
    def __init__(self):
        super().__init__()
        self.knowledge_base = {
            "product1": {"name": "Widget X", "price": 49.99, "stock": 100},
            "product2": {"name": "Gadget Y", "price": 99.99, "stock": 50},
            "product3": {"name": "Device Z", "price": 149.99, "stock": 25}
        }
    
    def handle_task(self, task):
        """Custom task handling logic."""
        # Check if this is a function call
        if task.function_call:
            return self.handle_function_call(task.function_call)
        
        # Parse input for specific commands
        input_text = task.input.lower() if isinstance(task.input, str) else ""
        
        if "list" in input_text and "products" in input_text:
            return {"output": self.list_products()}
        elif "price" in input_text:
            for product_id in self.knowledge_base:
                if product_id in input_text:
                    return {"output": self.get_price(product_id)}
        elif "stock" in input_text:
            for product_id in self.knowledge_base:
                if product_id in input_text:
                    return {"output": self.get_stock(product_id)}
        
        # Default response for unrecognized inputs
        return {"output": "I can help with: listing products, checking prices, or checking stock levels."}
    
    def handle_function_call(self, function_call):
        """Handle specific function calls."""
        if function_call.name == "list_products":
            return {"output": self.list_products()}
        elif function_call.name == "get_price":
            args = function_call.arguments
            if "product_id" in args:
                return {"output": self.get_price(args["product_id"])}
        elif function_call.name == "get_stock":
            args = function_call.arguments
            if "product_id" in args:
                return {"output": self.get_stock(args["product_id"])}
        
        return {"output": "Unknown function or missing arguments"}
    
    def list_products(self):
        """List all available products."""
        result = "Available Products:\n"
        for product_id, details in self.knowledge_base.items():
            result += f"- {details['name']} (ID: {product_id}): ${details['price']}\n"
        return result
    
    def get_price(self, product_id):
        """Get the price for a specific product."""
        if product_id in self.knowledge_base:
            product = self.knowledge_base[product_id]
            return f"The price of {product['name']} is ${product['price']}"
        return f"Product {product_id} not found"
    
    def get_stock(self, product_id):
        """Get the stock level for a specific product."""
        if product_id in self.knowledge_base:
            product = self.knowledge_base[product_id]
            return f"{product['name']} has {product['stock']} units in stock"
        return f"Product {product_id} not found"
    
    def handle_conversation(self, conversation):
        """Handle ongoing conversations with memory."""
        # Get the latest message
        if not conversation.messages:
            return {"content": "How can I help you today?"}
        
        latest_message = conversation.messages[-1]
        
        # Check for context in previous messages
        context = {"mentioned_products": []}
        for message in conversation.messages:
            if message.role == "user":
                for product_id in self.knowledge_base:
                    if product_id in message.content.lower():
                        if product_id not in context["mentioned_products"]:
                            context["mentioned_products"].append(product_id)
        
        # Process the latest message with context
        response_content = self.process_with_context(latest_message.content, context)
        return {"content": response_content}
    
    def process_with_context(self, message, context):
        """Process a message with conversation context."""
        message = message.lower()
        
        # Check for specific queries with context
        if "price" in message and context["mentioned_products"]:
            # If asking about price and products were mentioned before
            if len(context["mentioned_products"]) == 1:
                # If only one product was mentioned in the conversation
                return self.get_price(context["mentioned_products"][0])
        
        # Fall back to regular processing
        task = Task(task_id="1", input=message)
        result = self.handle_task(task)
        return result.get("output", "I'm not sure how to help with that")

# Run the custom agent
if __name__ == "__main__":
    custom_agent = CustomLogicAgent()
    run_server(custom_agent, port=5000)

Performance Optimizations

Strategies for optimizing agent performance:

from python_a2a import A2AServer, agent, run_server
from python_a2a.models import Task
import time
import threading
import asyncio
from functools import lru_cache

@agent(
    name="Optimized Agent",
    description="Agent with performance optimizations"
)
class OptimizedAgent(A2AServer):
    def __init__(self):
        super().__init__()
        # Example of a pre-warmed connection pool for external services
        self.connection_pool = {}
        self.initialize_resources()
        
        # Task queue for batch processing
        self.task_queue = asyncio.Queue()
        self.results_cache = {}
        
        # Start the background worker
        self.worker_thread = threading.Thread(target=self.start_worker, daemon=True)
        self.worker_thread.start()
    
    def initialize_resources(self):
        """Pre-initialize frequently used resources."""
        # Example: Pre-load a model or establish connections
        print("Pre-initializing resources...")
        time.sleep(1)  # Simulate initialization
        print("Resources initialized")
    
    @lru_cache(maxsize=1000)
    def expensive_computation(self, input_value):
        """Cached computation to avoid redundant work."""
        print(f"Performing expensive computation for: {input_value}")
        time.sleep(0.5)  # Simulate work
        return f"Result for {input_value}"
    
    def handle_task(self, task):
        """Optimized task handling with caching and resource pooling."""
        start_time = time.time()
        
        # Check if we have a cached result for identical inputs
        task_key = str(task.input)
        if task_key in self.results_cache:
            print(f"Cache hit for: {task_key}")
            cached_result = self.results_cache[task_key]
            # Add timing information
            processing_time = time.time() - start_time
            return {
                "output": cached_result,
                "meta": {"cached": True, "processing_time_ms": round(processing_time * 1000)}
            }
        
        # No cache hit, perform the computation
        result = self.expensive_computation(task_key)
        
        # Store in cache for future requests
        self.results_cache[task_key] = result
        
        # Clean old cache entries if needed
        if len(self.results_cache) > 1000:
            # Simple strategy: remove oldest entry (in a real app, use LRU)
            oldest_key = next(iter(self.results_cache))
            del self.results_cache[oldest_key]
        
        # Add timing information
        processing_time = time.time() - start_time
        return {
            "output": result,
            "meta": {"cached": False, "processing_time_ms": round(processing_time * 1000)}
        }
    
    def start_worker(self):
        """Start the background task worker."""
        asyncio.run(self.process_task_queue())
    
    async def process_task_queue(self):
        """Process tasks from the queue in the background."""
        while True:
            try:
                # Get task, operation, callback from queue
                task, operation, callback = await self.task_queue.get()
                
                # Process in background
                try:
                    if operation == "compute":
                        result = self.expensive_computation(str(task.input))
                        if callback:
                            callback(result)
                except Exception as e:
                    print(f"Error processing task: {e}")
                
                # Mark as done
                self.task_queue.task_done()
            except Exception as e:
                print(f"Worker error: {e}")
    
    async def enqueue_task(self, task, operation, callback=None):
        """Add a task to the background processing queue."""
        await self.task_queue.put((task, operation, callback))
        return {"accepted": True, "queue_size": self.task_queue.qsize()}

# Run the optimized agent
if __name__ == "__main__":
    optimized_agent = OptimizedAgent()
    run_server(optimized_agent, port=5000)

These advanced examples demonstrate the flexibility and power of the Python A2A library for building sophisticated agent systems. You can combine these techniques to create highly customized, performant, and scalable agent ecosystems.

Clone this wiki locally