Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/samples/core_streaming_handoffs_fastapi/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
model_config.yaml
144 changes: 144 additions & 0 deletions python/samples/core_streaming_handoffs_fastapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# AutoGen-Core Streaming Chat with Multi-Agent Handoffs via FastAPI

This sample demonstrates how to build a streaming chat API featuring multi-agent handoffs and persistent conversation history using `autogen-core` and FastAPI. For more details on the handoff pattern, see the [AutoGen documentation](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/design-patterns/handoffs.html).

Inspired by `@ToryPan`'s example for streaming with Core API.

## Key Features

1. **Streaming Response**: Implements real-time streaming of agent responses using FastAPI's `StreamingResponse`, `autogen-core`'s asynchronous features, and an `asyncio.Queue` to manage the data stream.
2. **Multi-Agent Handoffs**: Showcases a system where different agents (Triage, Sales, Issues & Repairs) handle specific parts of a conversation, using tools (`delegate_tools`) to transfer the conversation between agents based on the context.
3. **Persistent Multi-Turn Conversation**: Agents receive and process conversation history, enabling context-aware interactions. History is saved per conversation ID in JSON files within the `chat_history` directory, allowing conversations to resume across sessions.
4. **Simple Web UI**: Includes a basic web interface (served via FastAPI's static files) for easy interaction with the chat system directly from a browser.

## File Structure

* `app.py`: Main FastAPI application code, including API endpoints, agent definitions, runtime setup, handoff logic, and streaming.
* `agent_user.py`: Defines the `UserAgent` responsible for interacting with the human user and saving chat history.
* `agent_base.py`: Defines the base `AIAgent` class used by specialized agents.
* `models.py`: Contains data models used for communication (e.g., `UserTask`, `AgentResponse`).
* `topics.py`: Defines topic types used for routing messages between agents.
* `tools.py`: Defines tools that agents can execute (e.g., `execute_order_tool`).
* `tools_delegate.py`: Defines tools specifically for delegating/transferring the conversation to other agents.
* `README.md`: (This document) Project introduction and usage instructions.
* `static/`: Contains static files for the web UI (e.g., `index.html`).
* `model_config_template.yaml`: Template for the model configuration file.

## Installation

First, ensure you have Python installed (recommended 3.8 or higher). Then, install the necessary libraries:

```bash
pip install "fastapi" "uvicorn[standard]" "autogen-core" "autogen-ext[openai]" "PyYAML"
```

## Configuration

Create a new file named `model_config.yaml` in the same directory as this README file to configure your language model settings (e.g., Azure OpenAI details). Use `model_config_template.yaml` as a starting point.

**Note**: For production, manage API keys securely using environment variables or other secrets management tools instead of hardcoding them in the configuration file.

## Running the Application

In the directory containing `app.py`, run the following command to start the FastAPI application:

```bash
uvicorn app:app --host 0.0.0.0 --port 8501 --reload
```

The application includes a simple web interface. After starting the server, navigate to `http://localhost:8501` in your browser.

The API endpoint for chat completions will be available at `http://localhost:8501/chat/completions`.

## Using the API

You can interact with the agent system by sending a POST request to the `/chat/completions` endpoint. The request body must be in JSON format and contain a `message` field (the user's input) and a `conversation_id` field to track the chat session.

**Request Body Format**:

```json
{
"message": "I need refund for a product.",
"conversation_id": "user123_session456"
}
```

**Example (using curl)**:

```bash
curl -N -X POST http://localhost:8501/chat/completions \
-H "Content-Type: application/json" \
-d '{
"message": "Hi, I bought a rocket-powered unicycle and it exploded.",
"conversation_id": "wile_e_coyote_1"
}'
```

**Example (using Python requests)**:

```python
import requests
import json
import uuid

url = "http://localhost:8501/chat/completions"
conversation_id = f"conv-id" # Generate a unique conversation ID for a different session.

def send_message(message_text):
data = {
'message': message_text,
'conversation_id': conversation_id
}
headers = {'Content-Type': 'application/json'}
try:
print(f"\n>>> User: {message_text}")
print("<<< Assistant: ", end="", flush=True)
response = requests.post(url, json=data, headers=headers, stream=True)
response.raise_for_status()
full_response = ""
for chunk in response.iter_content(chunk_size=None):
if chunk:
try:
# Decode the chunk
chunk_str = chunk.decode('utf-8')
# Handle potential multiple JSON objects in a single chunk
for line in chunk_str.strip().split('\n'):
if line:
data = json.loads(line)
# Check the new structure
if 'content' in data and isinstance(data['content'], dict) and 'message' in data['content']:
message_content = data['content']['message']
message_type = data['content'].get('type', 'string') # Default to string if type is missing

# Print based on type (optional, could just print message_content)
if message_type == 'function':
print(f"[{message_type.upper()}] {message_content}", end='\n', flush=True) # Print function calls on new lines for clarity
print("<<< Assistant: ", end="", flush=True) # Reprint prefix for next string part
else:
print(message_content, end='', flush=True)

full_response += message_content # Append only the message part
else:
print(f"\nUnexpected chunk format: {line}")

except json.JSONDecodeError:
print(f"\nError decoding chunk/line: '{line if 'line' in locals() else chunk_str}'")

print("\n--- End of Response ---")
return full_response

except requests.exceptions.RequestException as e:
print(f"\nError: {e}")
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")

# Start conversation
send_message("I want refund")
# Continue conversation (example)
# send_message("I want the rocket my friend Amith bought.")
# send_message("They are the SpaceX 3000s")
# send_message("That sounds great, I'll take it!")
# send_message("Yes, I agree to the price and the caveat.")


```
134 changes: 134 additions & 0 deletions python/samples/core_streaming_handoffs_fastapi/agent_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import json
from typing import List, Tuple

from autogen_core import (
FunctionCall,
MessageContext,
RoutedAgent,
TopicId,
message_handler,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
FunctionExecutionResult,
FunctionExecutionResultMessage,
SystemMessage
)
from autogen_core.tools import Tool
from models import UserTask,AgentResponse
import asyncio



class AIAgent(RoutedAgent):
def __init__(
self,
description: str,
system_message: SystemMessage,
model_client: ChatCompletionClient,
tools: List[Tool],
delegate_tools: List[Tool],
agent_topic_type: str,
user_topic_type: str,
response_queue : asyncio.Queue[str | object]
) -> None:
super().__init__(description)
self._system_message = system_message
self._model_client = model_client
self._tools = dict([(tool.name, tool) for tool in tools])
self._tool_schema = [tool.schema for tool in tools]
self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools])
self._delegate_tool_schema = [tool.schema for tool in delegate_tools]
self._agent_topic_type = agent_topic_type
self._user_topic_type = user_topic_type
self._response_queue = response_queue

@message_handler
async def handle_task(self, message: UserTask, ctx: MessageContext) -> None:
# Start streaming LLM responses
llm_stream = self._model_client.create_stream(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token
)
final_response = None
async for chunk in llm_stream:
if isinstance(chunk, str):
await self._response_queue.put({'type': "string", 'message': chunk})
else:
final_response = chunk
assert final_response is not None, "No response from model"
print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True)
# Process the LLM result.
while isinstance(final_response.content, list) and all(isinstance(m, FunctionCall) for m in final_response.content):
tool_call_results: List[FunctionExecutionResult] = []
delegate_targets: List[Tuple[str, UserTask]] = []
# Process each function call.
for call in final_response.content:
arguments = json.loads(call.arguments)
await self._response_queue.put({"type":"function","message":f"Executing {call.name}"})
if call.name in self._tools:
# Execute the tool directly.
result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token)
result_as_str = self._tools[call.name].return_value_as_string(result)
tool_call_results.append(
FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name)
)
elif call.name in self._delegate_tools:
# Execute the tool to get the delegate agent's topic type.
result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token)
topic_type = self._delegate_tools[call.name].return_value_as_string(result)
# Create the context for the delegate agent, including the function call and the result.
delegate_messages = list(message.context) + [
AssistantMessage(content=[call], source=self.id.type),
FunctionExecutionResultMessage(
content=[
FunctionExecutionResult(
call_id=call.id,
content=f"Transferred to {topic_type}. Adopt persona immediately.",
is_error=False,
name=call.name,
)
]
),
]
delegate_targets.append((topic_type, UserTask(context=delegate_messages)))
else:
raise ValueError(f"Unknown tool: {call.name}")
if len(delegate_targets) > 0:
# Delegate the task to other agents by publishing messages to the corresponding topics.
for topic_type, task in delegate_targets:
print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True)
await self._response_queue.put({"type":"function","message":f"You are now talking to {topic_type}"})
await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key))
if len(tool_call_results) > 0:
print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True)
# Make another LLM call with the results.
message.context.extend([
AssistantMessage(content=final_response.content, source=self.id.type),
FunctionExecutionResultMessage(content=tool_call_results),
])
llm_stream = self._model_client.create_stream(
messages=[self._system_message] + message.context,
tools=self._tool_schema + self._delegate_tool_schema,
cancellation_token=ctx.cancellation_token
)
final_response = None
async for chunk in llm_stream:
if isinstance(chunk, str):
await self._response_queue.put({'type': 'string', 'message': chunk})
else:
final_response = chunk
assert final_response is not None, "No response from model"
print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True)
else:
# The task has been delegated, so we are done.
return
# The task has been completed, publish the final result.
assert isinstance(final_response.content, str)
message.context.append(AssistantMessage(content=final_response.content, source=self.id.type))
await self.publish_message(
AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type),
topic_id=TopicId(self._user_topic_type, source=self.id.key),
)
44 changes: 44 additions & 0 deletions python/samples/core_streaming_handoffs_fastapi/agent_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from autogen_core import (
MessageContext,
RoutedAgent,
message_handler,
)

from autogen_core.model_context import BufferedChatCompletionContext

from models import AgentResponse
import asyncio
import json
import os



class UserAgent(RoutedAgent):
def __init__(self,
description: str,
user_topic_type: str,
agent_topic_type: str,
response_queue : asyncio.Queue[str | object],
stream_done : object) -> None:
super().__init__(description)
self._user_topic_type = user_topic_type
self._agent_topic_type = agent_topic_type
self._response_queue = response_queue
self._STREAM_DONE = stream_done

@message_handler
async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None:
#Save chat history
context = BufferedChatCompletionContext(buffer_size=10,initial_messages=message.context)
save_context = await context.save_state()
# Save context to JSON file
chat_history_dir = "chat_history"
if ctx.topic_id is None:
raise ValueError("MessageContext.topic_id is None, cannot save chat history")
file_path = os.path.join(chat_history_dir, f"history-{ctx.topic_id.source}.json")
with open(file_path, 'w') as f:
json.dump(save_context, f, indent=4)

#End stream
await self._response_queue.put(self._STREAM_DONE)

Loading
Loading