Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/samples/core_streaming_response_fastapi/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
model_config.yaml
agent_state.json
agent_history.json
team_state.json
team_history.json
97 changes: 97 additions & 0 deletions python/samples/core_streaming_response_fastapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# AutoGen-Core Streaming Chat API with FastAPI

This sample demonstrates how to build a streaming chat API with multi-turn conversation history using `autogen-core` and FastAPI.

## Key Features

1. **Streaming Response**: Implements real-time streaming of LLM responses by utilizing FastAPI's `StreamingResponse`, `autogen-core`'s asynchronous features, and a global queue created with `asyncio.Queue()` to manage the data stream, thereby providing faster user-perceived response times.
2. **Multi-Turn Conversation**: The Agent (`MyAgent`) can receive and process chat history records (`ChatHistory`) containing multiple turns of interaction, enabling context-aware continuous conversations.

## File Structure

* `app.py`: FastAPI application code, including API endpoints, Agent definitions, runtime settings, and streaming logic.
* `README.md`: (This document) Project introduction and usage instructions.

## Installation

First, make sure you have Python installed (recommended 3.8 or higher). Then, in your project directory, install the necessary libraries via pip:

```bash
pip install "fastapi" "uvicorn[standard]" "autogen-core" "autogen-ext[openai]"
```

## Configuration

Create a new file named `model_config.yaml` in the same directory as this README file to configure your model settings.
See `model_config_template.yaml` for an example.

**Note**: Hardcoding API keys directly in the code is only suitable for local testing. For production environments, it is strongly recommended to use environment variables or other secure methods to manage keys.

## Running the Application

In the directory containing `app.py`, run the following command to start the FastAPI application:

```bash
uvicorn app:app --host 0.0.0.0 --port 8501 --reload
```

After the service starts, the API endpoint will be available at `http://<your-server-ip>:8501/chat/completions`.

## Using the API

You can interact with the Agent by sending a POST request to the `/chat/completions` endpoint. The request body must be in JSON format and contain a `messages` field, the value of which is a list, where each element represents a turn of conversation.

**Request Body Format**:

```json
{
"messages": [
{"source": "user", "content": "Hello!"},
{"source": "assistant", "content": "Hello! How can I help you?"},
{"source": "user", "content": "Introduce yourself."}
]
}
```

**Example (using curl)**:

```bash
curl -N -X POST http://localhost:8501/chat/completions \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"source": "user", "content": "Hello, I'\''m Tory."},
{"source": "assistant", "content": "Hello Tory, nice to meet you!"},
{"source": "user", "content": "Say hello by my name and introduce yourself."}
]
}'
```

**Example (using Python requests)**:

```python
import requests
import json
url = "http://localhost:8501/chat/completions"
data = {
'stream': True,
'messages': [
{'source': 'user', 'content': "Hello,I'm tory."},
{'source': 'assistant', 'content':"hello Tory, nice to meet you!"},
{'source': 'user', 'content': "Say hello by my name and introduce yourself."}
]
}
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(url, json=data, headers=headers, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=None):
if chunk:
print(json.loads(chunk)["content"], end='', flush=True)

except requests.exceptions.RequestException as e:
print(f"Error: {e}")
except json.JSONDecodeError as e:
print(f"JSON Decode Error: {e}")
```

141 changes: 141 additions & 0 deletions python/samples/core_streaming_response_fastapi/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import asyncio
import json
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncGenerator, Dict, List

import aiofiles
import yaml
from autogen_core import (
AgentId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
message_handler,
)
from autogen_core.models import AssistantMessage, ChatCompletionClient, LLMMessage, SystemMessage, UserMessage
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse


@dataclass
class AgentResponse:
"""
Represents the final accumulated response content from the LLM agent.
Note: The 'content' field hold the final response content.
"""

content: str


@dataclass
class UserRequest:
"""
Represents the chat history, containing a list of messages.
Each message is expected to be a dictionary with 'source' and 'content' keys.
"""

messages: List[Dict[str, str]]


# Runtime for the agent.
runtime = SingleThreadedAgentRuntime()

# Queue for streaming results from the agent back to the request handler
response_queue: asyncio.Queue[str | object] = asyncio.Queue()

# Sentinel object to signal the end of the stream
STREAM_DONE = object()


class MyAgent(RoutedAgent):
def __init__(self, name: str, model_client: ChatCompletionClient) -> None:
super().__init__(name)
self._system_messages = [SystemMessage(content="You are a helpful assistant.")]
self._model_client = model_client
self._response_queue = response_queue

@message_handler
async def handle_user_message(self, message: UserRequest, ctx: MessageContext) -> AgentResponse:
accumulated_content = "" # To store the full response.
try:
_message = message.messages
user_messages: List[LLMMessage] = []
for m in _message:
if m["source"] == "user":
user_messages.append(UserMessage(content=m["source"], source=m["source"]))
else:
user_messages.append(AssistantMessage(content=m["source"], source=m["source"]))
# Create a stream of messages to the model client.
async for i in self._model_client.create_stream(user_messages, cancellation_token=ctx.cancellation_token):
if isinstance(i, str):
accumulated_content += i
await self._response_queue.put(i)
else:
break
await self._response_queue.put(STREAM_DONE)
return AgentResponse(content=accumulated_content)
except Exception as e:
await self._response_queue.put("ERROR:" + str(e))
return AgentResponse(content=str(e))


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Get model client from config.
async with aiofiles.open("model_config.yaml", "r") as file:
model_config = yaml.safe_load(await file.read())
model_client = ChatCompletionClient.load_component(model_config)

# Register the agent with the runtime.
await MyAgent.register(
runtime,
"simple_agent",
lambda: MyAgent(
"myagent",
model_client=model_client,
),
)

# Start the agent runtime.
runtime.start()
yield
await runtime.stop()


app = FastAPI(lifespan=lifespan)


@app.post("/chat/completions")
async def chat_completions_stream(request: Request):
json_data = await request.json()
messages = json_data.get("messages", "")
if not isinstance(messages, list):
raise HTTPException(status_code=400, detail="Invalid input: 'messages' must be a list.")
user_request = UserRequest(messages=messages) # type: ignore

async def response_stream() -> AsyncGenerator[str, None]:
task1 = asyncio.create_task(runtime.send_message(user_request, AgentId("simple_agent", "default")))
# Consume items from the response queue until the stream ends or an error occurs
while True:
item = await response_queue.get()
if item is STREAM_DONE:
print(f"{time.time():.2f} - MAIN: Received STREAM_DONE. Exiting loop.")
break
elif isinstance(item, str) and item.startswith("ERROR:"):
print(f"{time.time():.2f} - MAIN: Received error message from agent: {item}")
break
else:
yield json.dumps({"content": item}) + "\n"

# Wait for the task to finish.
await task1

return StreamingResponse(response_stream(), media_type="text/plain") # type: ignore


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8501)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use Open AI with key
provider: autogen_ext.models.openai.OpenAIChatCompletionClient
config:
model: gpt-4o
api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure Open AI with key
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# api_key: REPLACE_WITH_YOUR_API_KEY
# Use Azure OpenAI with AD token provider.
# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient
# config:
# model: gpt-4o
# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/
# azure_deployment: {your-azure-deployment}
# api_version: {your-api-version}
# azure_ad_token_provider:
# provider: autogen_ext.auth.azure.AzureTokenProvider
# config:
# provider_kind: DefaultAzureCredential
# scopes:
# - https://cognitiveservices.azure.com/.default
Loading