-
Notifications
You must be signed in to change notification settings - Fork 135
Client Usage
This guide covers how to use the Python A2A client to interact with A2A agents. You'll learn about different client types, authentication, error handling, and advanced features.
The simplest way to interact with an A2A agent is using the HTTPClient:
from python_a2a import HTTPClient
# Create a client pointing to an A2A agent
client = HTTPClient("http://localhost:5000")
# Send a simple message
response = client.send_message("Hello, agent!")
print(response.content)
# Send a structured message
structured_input = {"query": "What's the weather?", "location": "New York"}
response = client.send_message(structured_input)
print(response.content)A2A tasks represent discrete units of work:
from python_a2a import HTTPClient
client = HTTPClient("http://localhost:5000")
# Create a new task
task = client.create_task({"query": "Research quantum computing"})
print(f"Created task with ID: {task.task_id}")
# Check the task status
status = client.get_task_status(task.task_id)
print(f"Task status: {status}")
# Get task output when complete
if status == "completed":
output = client.get_task_output(task.task_id)
print(f"Task output: {output}")Call specific agent functions directly:
from python_a2a import HTTPClient
client = HTTPClient("http://localhost:5000")
# Call a function with parameters
result = client.call_function(
"calculate_sum",
{"a": 10, "b": 20}
)
print(f"Result: {result}") # Result: 30
# Call a function with complex parameters
search_result = client.call_function(
"search_database",
{
"query": "Python A2A",
"filters": {
"category": "programming",
"max_results": 3
}
}
)
print(search_result)Receive real-time streaming responses:
import asyncio
from python_a2a import HTTPClient
async def stream_example():
client = HTTPClient("http://localhost:5000")
print("Streaming response:")
async for chunk in client.stream_response("Tell me a story about AI"):
print(chunk.content, end="", flush=True)
print("\nStreaming complete!")
if __name__ == "__main__":
asyncio.run(stream_example())Maintain context across multiple messages:
from python_a2a import HTTPClient
client = HTTPClient("http://localhost:5000")
# Create a new conversation
conversation = client.create_conversation()
print(f"Created conversation with ID: {conversation.id}")
# Add messages to the conversation
response1 = client.add_message(conversation.id, "Hello, how are you?")
print(f"Agent: {response1.content}")
response2 = client.add_message(conversation.id, "Tell me more about yourself.")
print(f"Agent: {response2.content}")
response3 = client.add_message(conversation.id, "What can you help me with?")
print(f"Agent: {response3.content}")
# Get the full conversation history
history = client.get_conversation(conversation.id)
print("\nFull conversation:")
for message in history.messages:
print(f"{message.role}: {message.content}")Connect to authenticated agents:
from python_a2a import HTTPClient
# Basic authentication
client = HTTPClient(
"http://localhost:5000",
auth=("username", "password")
)
# API key authentication
client = HTTPClient(
"http://localhost:5000",
headers={"X-API-Key": "your-api-key"}
)
# OAuth token authentication
client = HTTPClient(
"http://localhost:5000",
headers={"Authorization": "Bearer your-oauth-token"}
)
# Custom authentication scheme
client = HTTPClient(
"http://localhost:5000",
headers={"X-Custom-Auth": "custom-auth-value"}
)
# Send a message with the authenticated client
response = client.send_message("Hello, secure agent!")
print(response.content)Handle errors gracefully:
from python_a2a import HTTPClient
from python_a2a.exceptions import (
A2AError,
A2AConnectionError,
A2ATimeoutError,
A2AAuthenticationError
)
client = HTTPClient("http://localhost:5000")
try:
response = client.send_message("Hello, agent!")
print(response.content)
except A2AConnectionError as e:
print(f"Connection error: {e}")
except A2ATimeoutError as e:
print(f"Timeout error: {e}")
except A2AAuthenticationError as e:
print(f"Authentication error: {e}")
except A2AError as e:
# Catch-all for other A2A errors
print(f"A2A error: {e}")
except Exception as e:
# Catch-all for unexpected errors
print(f"Unexpected error: {e}")Configure retry behavior and timeouts:
from python_a2a import HTTPClient
# Configure retries and timeouts
client = HTTPClient(
"http://localhost:5000",
timeout=10.0, # 10 second timeout
max_retries=3, # Retry failed requests up to 3 times
retry_delay=0.5 # Wait 0.5 seconds between retries
)
# Send a message with the configured client
response = client.send_message("Hello, agent!")
print(response.content)Interact with multiple agents through a network:
from python_a2a import AgentNetwork, HTTPClient
# Create an agent network
network = AgentNetwork(name="Assistant Network")
network.add("weather", "http://localhost:5001")
network.add("travel", "http://localhost:5002")
network.add("calculator", "http://localhost:5003")
# Create a client that uses the network
client = HTTPClient(network)
# Let the network route to the appropriate agent
response = client.send_message("What's the weather in New York?")
print(f"Response: {response.content}")
# Explicitly send to a specific agent
weather_response = client.send_message_to("weather", "What's the forecast for tomorrow?")
print(f"Weather: {weather_response.content}")
travel_response = client.send_message_to("travel", "Find hotels in Paris")
print(f"Travel: {travel_response.content}")
calculator_response = client.send_message_to("calculator", "What is 123 * 456?")
print(f"Calculator: {calculator_response.content}")Discover and connect to agents dynamically:
from python_a2a.discovery import DiscoveryClient
from python_a2a import HTTPClient
# Create a discovery client
discovery = DiscoveryClient("http://localhost:7000")
# Find agents by name
weather_agents = discovery.find_agents(name_contains="Weather")
print(f"Found weather agents: {[a.name for a in weather_agents]}")
# Find agents by description
travel_agents = discovery.find_agents(description_contains="travel")
print(f"Found travel agents: {[a.name for a in travel_agents]}")
# Find agents by capability
calculator_agents = discovery.find_agents(skill_contains="calculate")
print(f"Found calculator agents: {[a.name for a in calculator_agents]}")
# Connect to a discovered agent
if weather_agents:
weather_agent = weather_agents[0]
client = HTTPClient(weather_agent.endpoint)
response = client.send_message("What's the weather in Tokyo?")
print(f"Weather response: {response.content}")Send multiple requests efficiently:
import asyncio
from python_a2a import HTTPClient
async def batch_example():
client = HTTPClient("http://localhost:5000")
# Prepare a batch of questions
questions = [
"What's the weather today?",
"What's the capital of France?",
"How many planets are in the solar system?",
"What is the square root of 144?"
]
# Send all questions concurrently
tasks = [client.send_message_async(question) for question in questions]
responses = await asyncio.gather(*tasks)
# Process responses
for i, response in enumerate(responses):
print(f"Question {i+1}: {questions[i]}")
print(f"Answer: {response.content}\n")
if __name__ == "__main__":
asyncio.run(batch_example())Create sequential agent interactions:
from python_a2a import HTTPClient
client = HTTPClient("http://localhost:5000")
# Step 1: Initial query
step1_response = client.send_message("Find information about quantum computing")
print(f"Step 1 - Research: {step1_response.content}")
# Step 2: Summarize the information
step2_response = client.send_message(f"Summarize this: {step1_response.content}")
print(f"Step 2 - Summary: {step2_response.content}")
# Step 3: Ask follow-up questions
step3_response = client.send_message(f"Based on this summary: {step2_response.content}, what are the practical applications?")
print(f"Step 3 - Applications: {step3_response.content}")
# Step 4: Final synthesis
step4_response = client.send_message("Create a structured report based on all the previous information")
print(f"Step 4 - Final Report: {step4_response.content}")Implement caching for improved performance:
from python_a2a import HTTPClient
import functools
import time
class CachingClient:
def __init__(self, endpoint, cache_ttl=300): # TTL in seconds
self.client = HTTPClient(endpoint)
self.cache = {}
self.cache_ttl = cache_ttl
def send_message(self, message):
"""Send a message with caching."""
# Use the message content as the cache key
cache_key = str(message)
# Check if we have a valid cached response
if cache_key in self.cache:
cached_item = self.cache[cache_key]
# Check if the cache entry is still valid
if time.time() - cached_item["timestamp"] < self.cache_ttl:
print("Cache hit!")
return cached_item["response"]
# No cache hit, send the request
print("Cache miss!")
response = self.client.send_message(message)
# Cache the response
self.cache[cache_key] = {
"response": response,
"timestamp": time.time()
}
return response
def clear_cache(self):
"""Clear the cache."""
self.cache = {}
def prune_cache(self):
"""Remove expired cache entries."""
now = time.time()
expired_keys = [
k for k, v in self.cache.items()
if now - v["timestamp"] >= self.cache_ttl
]
for key in expired_keys:
del self.cache[key]
# Example usage
caching_client = CachingClient("http://localhost:5000")
# First request will be a cache miss
response1 = caching_client.send_message("What is the capital of France?")
print(response1.content)
# Second identical request will be a cache hit
response2 = caching_client.send_message("What is the capital of France?")
print(response2.content)
# Different request will be a cache miss
response3 = caching_client.send_message("What is the capital of Spain?")
print(response3.content)-
Error Handling
- Always handle exceptions appropriately
- Implement retries for transient failures
- Log errors with sufficient context
-
Performance Optimization
- Use streaming for long responses
- Batch related requests
- Implement client-side caching when appropriate
-
Resource Management
- Close connections properly
- Release resources in a finally block
- Use context managers when available
-
Security
- Store credentials securely
- Validate server certificates
- Use HTTPS for production environments
-
Monitoring and Logging
- Track request latency and errors
- Log request and response payloads for debugging
- Implement circuit breakers for unreliable agents
By following these practices, you can build robust client applications that effectively interact with the A2A agent ecosystem.