diff --git a/python/samples/core_streaming_handoffs_fastapi/.gitignore b/python/samples/core_streaming_handoffs_fastapi/.gitignore new file mode 100644 index 000000000000..96aa01c30888 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/.gitignore @@ -0,0 +1 @@ +model_config.yaml \ No newline at end of file diff --git a/python/samples/core_streaming_handoffs_fastapi/README.md b/python/samples/core_streaming_handoffs_fastapi/README.md new file mode 100644 index 000000000000..612d30b4a5b0 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/README.md @@ -0,0 +1,144 @@ +# AutoGen-Core Streaming Chat with Multi-Agent Handoffs via FastAPI + +This sample demonstrates how to build a streaming chat API featuring multi-agent handoffs and persistent conversation history using `autogen-core` and FastAPI. For more details on the handoff pattern, see the [AutoGen documentation](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/design-patterns/handoffs.html). + +Inspired by `@ToryPan`'s example for streaming with Core API. + +## Key Features + +1. **Streaming Response**: Implements real-time streaming of agent responses using FastAPI's `StreamingResponse`, `autogen-core`'s asynchronous features, and an `asyncio.Queue` to manage the data stream. +2. **Multi-Agent Handoffs**: Showcases a system where different agents (Triage, Sales, Issues & Repairs) handle specific parts of a conversation, using tools (`delegate_tools`) to transfer the conversation between agents based on the context. +3. **Persistent Multi-Turn Conversation**: Agents receive and process conversation history, enabling context-aware interactions. History is saved per conversation ID in JSON files within the `chat_history` directory, allowing conversations to resume across sessions. +4. **Simple Web UI**: Includes a basic web interface (served via FastAPI's static files) for easy interaction with the chat system directly from a browser. + +## File Structure + +* `app.py`: Main FastAPI application code, including API endpoints, agent definitions, runtime setup, handoff logic, and streaming. +* `agent_user.py`: Defines the `UserAgent` responsible for interacting with the human user and saving chat history. +* `agent_base.py`: Defines the base `AIAgent` class used by specialized agents. +* `models.py`: Contains data models used for communication (e.g., `UserTask`, `AgentResponse`). +* `topics.py`: Defines topic types used for routing messages between agents. +* `tools.py`: Defines tools that agents can execute (e.g., `execute_order_tool`). +* `tools_delegate.py`: Defines tools specifically for delegating/transferring the conversation to other agents. +* `README.md`: (This document) Project introduction and usage instructions. +* `static/`: Contains static files for the web UI (e.g., `index.html`). +* `model_config_template.yaml`: Template for the model configuration file. + +## Installation + +First, ensure you have Python installed (recommended 3.8 or higher). Then, install the necessary libraries: + +```bash +pip install "fastapi" "uvicorn[standard]" "autogen-core" "autogen-ext[openai]" "PyYAML" +``` + +## Configuration + +Create a new file named `model_config.yaml` in the same directory as this README file to configure your language model settings (e.g., Azure OpenAI details). Use `model_config_template.yaml` as a starting point. + +**Note**: For production, manage API keys securely using environment variables or other secrets management tools instead of hardcoding them in the configuration file. + +## Running the Application + +In the directory containing `app.py`, run the following command to start the FastAPI application: + +```bash +uvicorn app:app --host 0.0.0.0 --port 8501 --reload +``` + +The application includes a simple web interface. After starting the server, navigate to `http://localhost:8501` in your browser. + +The API endpoint for chat completions will be available at `http://localhost:8501/chat/completions`. + +## Using the API + +You can interact with the agent system by sending a POST request to the `/chat/completions` endpoint. The request body must be in JSON format and contain a `message` field (the user's input) and a `conversation_id` field to track the chat session. + +**Request Body Format**: + +```json +{ + "message": "I need refund for a product.", + "conversation_id": "user123_session456" +} +``` + +**Example (using curl)**: + +```bash +curl -N -X POST http://localhost:8501/chat/completions \ +-H "Content-Type: application/json" \ +-d '{ + "message": "Hi, I bought a rocket-powered unicycle and it exploded.", + "conversation_id": "wile_e_coyote_1" +}' +``` + +**Example (using Python requests)**: + +```python +import requests +import json +import uuid + +url = "http://localhost:8501/chat/completions" +conversation_id = f"conv-id" # Generate a unique conversation ID for a different session. + +def send_message(message_text): + data = { + 'message': message_text, + 'conversation_id': conversation_id + } + headers = {'Content-Type': 'application/json'} + try: + print(f"\n>>> User: {message_text}") + print("<<< Assistant: ", end="", flush=True) + response = requests.post(url, json=data, headers=headers, stream=True) + response.raise_for_status() + full_response = "" + for chunk in response.iter_content(chunk_size=None): + if chunk: + try: + # Decode the chunk + chunk_str = chunk.decode('utf-8') + # Handle potential multiple JSON objects in a single chunk + for line in chunk_str.strip().split('\n'): + if line: + data = json.loads(line) + # Check the new structure + if 'content' in data and isinstance(data['content'], dict) and 'message' in data['content']: + message_content = data['content']['message'] + message_type = data['content'].get('type', 'string') # Default to string if type is missing + + # Print based on type (optional, could just print message_content) + if message_type == 'function': + print(f"[{message_type.upper()}] {message_content}", end='\n', flush=True) # Print function calls on new lines for clarity + print("<<< Assistant: ", end="", flush=True) # Reprint prefix for next string part + else: + print(message_content, end='', flush=True) + + full_response += message_content # Append only the message part + else: + print(f"\nUnexpected chunk format: {line}") + + except json.JSONDecodeError: + print(f"\nError decoding chunk/line: '{line if 'line' in locals() else chunk_str}'") + + print("\n--- End of Response ---") + return full_response + + except requests.exceptions.RequestException as e: + print(f"\nError: {e}") + except Exception as e: + print(f"\nAn unexpected error occurred: {e}") + +# Start conversation +send_message("I want refund") +# Continue conversation (example) +# send_message("I want the rocket my friend Amith bought.") +# send_message("They are the SpaceX 3000s") +# send_message("That sounds great, I'll take it!") +# send_message("Yes, I agree to the price and the caveat.") + + +``` \ No newline at end of file diff --git a/python/samples/core_streaming_handoffs_fastapi/agent_base.py b/python/samples/core_streaming_handoffs_fastapi/agent_base.py new file mode 100644 index 000000000000..d49d5933c026 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/agent_base.py @@ -0,0 +1,134 @@ +import json +from typing import List, Tuple + +from autogen_core import ( + FunctionCall, + MessageContext, + RoutedAgent, + TopicId, + message_handler, +) +from autogen_core.models import ( + AssistantMessage, + ChatCompletionClient, + FunctionExecutionResult, + FunctionExecutionResultMessage, + SystemMessage +) +from autogen_core.tools import Tool +from models import UserTask,AgentResponse +import asyncio + + + +class AIAgent(RoutedAgent): + def __init__( + self, + description: str, + system_message: SystemMessage, + model_client: ChatCompletionClient, + tools: List[Tool], + delegate_tools: List[Tool], + agent_topic_type: str, + user_topic_type: str, + response_queue : asyncio.Queue[str | object] + ) -> None: + super().__init__(description) + self._system_message = system_message + self._model_client = model_client + self._tools = dict([(tool.name, tool) for tool in tools]) + self._tool_schema = [tool.schema for tool in tools] + self._delegate_tools = dict([(tool.name, tool) for tool in delegate_tools]) + self._delegate_tool_schema = [tool.schema for tool in delegate_tools] + self._agent_topic_type = agent_topic_type + self._user_topic_type = user_topic_type + self._response_queue = response_queue + + @message_handler + async def handle_task(self, message: UserTask, ctx: MessageContext) -> None: + # Start streaming LLM responses + llm_stream = self._model_client.create_stream( + messages=[self._system_message] + message.context, + tools=self._tool_schema + self._delegate_tool_schema, + cancellation_token=ctx.cancellation_token + ) + final_response = None + async for chunk in llm_stream: + if isinstance(chunk, str): + await self._response_queue.put({'type': "string", 'message': chunk}) + else: + final_response = chunk + assert final_response is not None, "No response from model" + print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True) + # Process the LLM result. + while isinstance(final_response.content, list) and all(isinstance(m, FunctionCall) for m in final_response.content): + tool_call_results: List[FunctionExecutionResult] = [] + delegate_targets: List[Tuple[str, UserTask]] = [] + # Process each function call. + for call in final_response.content: + arguments = json.loads(call.arguments) + await self._response_queue.put({"type":"function","message":f"Executing {call.name}"}) + if call.name in self._tools: + # Execute the tool directly. + result = await self._tools[call.name].run_json(arguments, ctx.cancellation_token) + result_as_str = self._tools[call.name].return_value_as_string(result) + tool_call_results.append( + FunctionExecutionResult(call_id=call.id, content=result_as_str, is_error=False, name=call.name) + ) + elif call.name in self._delegate_tools: + # Execute the tool to get the delegate agent's topic type. + result = await self._delegate_tools[call.name].run_json(arguments, ctx.cancellation_token) + topic_type = self._delegate_tools[call.name].return_value_as_string(result) + # Create the context for the delegate agent, including the function call and the result. + delegate_messages = list(message.context) + [ + AssistantMessage(content=[call], source=self.id.type), + FunctionExecutionResultMessage( + content=[ + FunctionExecutionResult( + call_id=call.id, + content=f"Transferred to {topic_type}. Adopt persona immediately.", + is_error=False, + name=call.name, + ) + ] + ), + ] + delegate_targets.append((topic_type, UserTask(context=delegate_messages))) + else: + raise ValueError(f"Unknown tool: {call.name}") + if len(delegate_targets) > 0: + # Delegate the task to other agents by publishing messages to the corresponding topics. + for topic_type, task in delegate_targets: + print(f"{'-'*80}\n{self.id.type}:\nDelegating to {topic_type}", flush=True) + await self._response_queue.put({"type":"function","message":f"You are now talking to {topic_type}"}) + await self.publish_message(task, topic_id=TopicId(topic_type, source=self.id.key)) + if len(tool_call_results) > 0: + print(f"{'-'*80}\n{self.id.type}:\n{tool_call_results}", flush=True) + # Make another LLM call with the results. + message.context.extend([ + AssistantMessage(content=final_response.content, source=self.id.type), + FunctionExecutionResultMessage(content=tool_call_results), + ]) + llm_stream = self._model_client.create_stream( + messages=[self._system_message] + message.context, + tools=self._tool_schema + self._delegate_tool_schema, + cancellation_token=ctx.cancellation_token + ) + final_response = None + async for chunk in llm_stream: + if isinstance(chunk, str): + await self._response_queue.put({'type': 'string', 'message': chunk}) + else: + final_response = chunk + assert final_response is not None, "No response from model" + print(f"{'-'*80}\n{self.id.type}:\n{final_response.content}", flush=True) + else: + # The task has been delegated, so we are done. + return + # The task has been completed, publish the final result. + assert isinstance(final_response.content, str) + message.context.append(AssistantMessage(content=final_response.content, source=self.id.type)) + await self.publish_message( + AgentResponse(context=message.context, reply_to_topic_type=self._agent_topic_type), + topic_id=TopicId(self._user_topic_type, source=self.id.key), + ) diff --git a/python/samples/core_streaming_handoffs_fastapi/agent_user.py b/python/samples/core_streaming_handoffs_fastapi/agent_user.py new file mode 100644 index 000000000000..79f4007cfccd --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/agent_user.py @@ -0,0 +1,44 @@ +from autogen_core import ( + MessageContext, + RoutedAgent, + message_handler, +) + +from autogen_core.model_context import BufferedChatCompletionContext + +from models import AgentResponse +import asyncio +import json +import os + + + +class UserAgent(RoutedAgent): + def __init__(self, + description: str, + user_topic_type: str, + agent_topic_type: str, + response_queue : asyncio.Queue[str | object], + stream_done : object) -> None: + super().__init__(description) + self._user_topic_type = user_topic_type + self._agent_topic_type = agent_topic_type + self._response_queue = response_queue + self._STREAM_DONE = stream_done + + @message_handler + async def handle_task_result(self, message: AgentResponse, ctx: MessageContext) -> None: + #Save chat history + context = BufferedChatCompletionContext(buffer_size=10,initial_messages=message.context) + save_context = await context.save_state() + # Save context to JSON file + chat_history_dir = "chat_history" + if ctx.topic_id is None: + raise ValueError("MessageContext.topic_id is None, cannot save chat history") + file_path = os.path.join(chat_history_dir, f"history-{ctx.topic_id.source}.json") + with open(file_path, 'w') as f: + json.dump(save_context, f, indent=4) + + #End stream + await self._response_queue.put(self._STREAM_DONE) + diff --git a/python/samples/core_streaming_handoffs_fastapi/app.py b/python/samples/core_streaming_handoffs_fastapi/app.py new file mode 100644 index 000000000000..bae26846a43d --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/app.py @@ -0,0 +1,286 @@ +import json +import time +import os +import re + +from autogen_core import ( + SingleThreadedAgentRuntime, + TypeSubscription, + TopicId +) +from autogen_core.models import ( + SystemMessage, + UserMessage, + AssistantMessage +) + +from autogen_core.model_context import BufferedChatCompletionContext +from autogen_core.models import ChatCompletionClient +from agent_user import UserAgent +from agent_base import AIAgent + +from models import UserTask +from topics import ( + triage_agent_topic_type, + user_topic_type, + sales_agent_topic_type, + issues_and_repairs_agent_topic_type, +) + +from tools import ( + execute_order_tool, + execute_refund_tool, + look_up_item_tool, +) + +from tools_delegate import ( + transfer_to_issues_and_repairs_tool, + transfer_to_sales_agent_tool, + transfer_back_to_triage_tool +) + +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import StreamingResponse, FileResponse +from fastapi.staticfiles import StaticFiles +from contextlib import asynccontextmanager +from typing import AsyncGenerator +import aiofiles +import yaml +import asyncio + + +# Runtime for the agent. +runtime = SingleThreadedAgentRuntime() + +# Queue for streaming results from the agent back to the request handler +response_queue: asyncio.Queue[str | object] = asyncio.Queue() + +# Sentinel object to signal the end of the stream +STREAM_DONE = object() + +@asynccontextmanager +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + # Create chat_history directory if it doesn't exist + chat_history_dir = "chat_history" + if not os.path.exists(chat_history_dir): + os.makedirs(chat_history_dir) + + # Get model client from config. + async with aiofiles.open("model_config.yaml", "r") as file: + model_config = yaml.safe_load(await file.read()) + model_client = ChatCompletionClient.load_component(model_config) + + # Register the triage agent. + triage_agent_type = await AIAgent.register( + runtime, + type=triage_agent_topic_type, # Using the topic type as the agent type. + factory=lambda: AIAgent( + description="A triage agent.", + system_message=SystemMessage( + content="You are a customer service bot for ACME Inc. " + "Introduce yourself. Always be very brief. " + "Gather information to direct the customer to the right department. " + "But make your questions subtle and natural." + ), + model_client=model_client, + tools=[], + delegate_tools=[ + transfer_to_issues_and_repairs_tool, + transfer_to_sales_agent_tool + ], + agent_topic_type=triage_agent_topic_type, + user_topic_type=user_topic_type, + response_queue=response_queue + ), + ) + # Add subscriptions for the triage agent: it will receive messages published to its own topic only. + await runtime.add_subscription(TypeSubscription(topic_type=triage_agent_topic_type, agent_type=triage_agent_type.type)) + + # Register the sales agent. + sales_agent_type = await AIAgent.register( + runtime, + type=sales_agent_topic_type, # Using the topic type as the agent type. + factory=lambda: AIAgent( + description="A sales agent.", + system_message=SystemMessage( + content="You are a sales agent for ACME Inc." + "Always answer in a sentence or less." + "Follow the following routine with the user:" + "1. Ask them about any problems in their life related to catching roadrunners.\n" + "2. Casually mention one of ACME's crazy made-up products can help.\n" + " - Don't mention price.\n" + "3. Once the user is bought in, drop a ridiculous price.\n" + "4. Only after everything, and if the user says yes, " + "tell them a crazy caveat and execute their order.\n" + "" + ), + model_client=model_client, + tools=[execute_order_tool], + delegate_tools=[transfer_back_to_triage_tool], + agent_topic_type=sales_agent_topic_type, + user_topic_type=user_topic_type, + response_queue=response_queue + ), + ) + # Add subscriptions for the sales agent: it will receive messages published to its own topic only. + await runtime.add_subscription(TypeSubscription(topic_type=sales_agent_topic_type, agent_type=sales_agent_type.type)) + + # Register the issues and repairs agent. + issues_and_repairs_agent_type = await AIAgent.register( + runtime, + type=issues_and_repairs_agent_topic_type, # Using the topic type as the agent type. + factory=lambda: AIAgent( + description="An issues and repairs agent.", + system_message=SystemMessage( + content="You are a customer support agent for ACME Inc." + "Always answer in a sentence or less." + "Follow the following routine with the user:" + "1. First, ask probing questions and understand the user's problem deeper.\n" + " - unless the user has already provided a reason.\n" + "2. Propose a fix (make one up).\n" + "3. ONLY if not satisfied, offer a refund.\n" + "4. If accepted, search for the ID and then execute refund." + ), + model_client=model_client, + tools=[ + execute_refund_tool, + look_up_item_tool, + ], + delegate_tools=[transfer_back_to_triage_tool], + agent_topic_type=issues_and_repairs_agent_topic_type, + user_topic_type=user_topic_type, + response_queue=response_queue + ), + ) + # Add subscriptions for the issues and repairs agent: it will receive messages published to its own topic only. + await runtime.add_subscription( + TypeSubscription(topic_type=issues_and_repairs_agent_topic_type, agent_type=issues_and_repairs_agent_type.type) + ) + + # Register the user agent. + user_agent_type = await UserAgent.register( + runtime, + type=user_topic_type, + factory=lambda: UserAgent( + description="A user agent.", + user_topic_type=user_topic_type, + agent_topic_type=triage_agent_topic_type, + response_queue=response_queue, + stream_done = STREAM_DONE + ) + ) + # Add subscriptions for the user agent: it will receive messages published to its own topic only. + await runtime.add_subscription(TypeSubscription(topic_type=user_topic_type, agent_type=user_agent_type.type)) + + # Start the agent runtime. + runtime.start() + yield + await runtime.stop() + + +app = FastAPI(lifespan=lifespan) + +# Mount static files directory +app.mount("/static", StaticFiles(directory="static"), name="static") + + +@app.get("/") +async def read_index(): + # Serve the index.html file + return FileResponse('static/index.html') + + +@app.post("/chat/completions") +async def chat_completions_stream(request: Request): + json_data = await request.json() + message = json_data.get("message", "") + conversation_id = json_data.get("conversation_id", "conv_id") + + if not isinstance(message, str): + raise HTTPException(status_code=400, detail="Invalid input: 'message' must be a string.") + + if not isinstance(conversation_id, str): + raise HTTPException(status_code=400, detail="Invalid input: 'conversation_id' must be a string.") + + # Validate conversation_id to prevent path traversal attacks + if not re.match(r'^[A-Za-z0-9_-]+$', conversation_id): + raise HTTPException(status_code=400, detail="Invalid input: 'conversation_id' contains invalid characters.") + + chat_history_dir = "chat_history" + base_dir = os.path.abspath(chat_history_dir) + full_path = os.path.normpath(os.path.join(base_dir, f"history-{conversation_id}.json")) + if not full_path.startswith(base_dir + os.sep): + raise HTTPException(status_code=400, detail="Invalid input: 'conversation_id' leads to invalid path.") + chat_history_file = full_path + + messages = [] + # Initialize chat_history and route_agent with default values + chat_history = {} + route_agent = triage_agent_topic_type + + # Load chat history if it exists. + # Chat history is saved inside the UserAgent. Use redis if possible. + # There may be a better way to do this. + if os.path.exists(chat_history_file): + context = BufferedChatCompletionContext(buffer_size=15) + try: + async with aiofiles.open(chat_history_file, "r") as f: + content = await f.read() + if content: # Check if file is not empty + chat_history = json.loads(content) + await context.load_state(chat_history) # Load state only if history is loaded + loaded_messages = await context.get_messages() + if loaded_messages: + messages = loaded_messages + last_message = messages[-1] + if isinstance(last_message, AssistantMessage) and isinstance(last_message.source, str): + route_agent = last_message.source + except json.JSONDecodeError: + print(f"Error decoding JSON from {chat_history_file}. Starting with empty history.") + # Reset to defaults if loading fails + messages = [] + route_agent = triage_agent_topic_type + chat_history = {} + except Exception as e: + print(f"Error loading chat history for {conversation_id}: {e}") + # Reset to defaults on other errors + messages = [] + route_agent = triage_agent_topic_type + chat_history = {} + # else: route_agent remains the default triage_agent_topic_type if file doesn't exist + + messages.append(UserMessage(content=message,source="User")) + + + + async def response_stream() -> AsyncGenerator[str, None]: + task1 = asyncio.create_task(runtime.publish_message( + UserTask(context=messages), + topic_id=TopicId(type=route_agent, source=conversation_id), # Explicitly use 'type' parameter + )) + # Consume items from the response queue until the stream ends or an error occurs + while True: + item = await response_queue.get() + if item is STREAM_DONE: + print(f"{time.time():.2f} - MAIN: Received STREAM_DONE. Exiting loop.") + break + elif isinstance(item, str) and item.startswith("ERROR:"): + print(f"{time.time():.2f} - MAIN: Received error message from agent: {item}") + break + # Ensure item is serializable before yielding + else: + yield json.dumps({"content": item}) + "\n" + + # Wait for the task to finish. + await task1 + + return StreamingResponse(response_stream(), media_type="text/plain") # type: ignore + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8501) + + + diff --git a/python/samples/core_streaming_handoffs_fastapi/chat_history/history-wile_e_coyote_1.json b/python/samples/core_streaming_handoffs_fastapi/chat_history/history-wile_e_coyote_1.json new file mode 100644 index 000000000000..fd6194b26509 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/chat_history/history-wile_e_coyote_1.json @@ -0,0 +1,49 @@ +{ + "messages": [ + { + "content": "Hi, I bought a rocket-powered unicycle and it exploded.", + "source": "User", + "type": "UserMessage" + }, + { + "content": "Hi, I'm here to help. That sounds serious. Are you looking to report the issue or perhaps discuss a refund?", + "thought": null, + "source": "TriageAgent", + "type": "AssistantMessage" + }, + { + "content": "Hi, I bought a rocket-powered unicycle and it exploded.", + "source": "User", + "type": "UserMessage" + }, + { + "content": [ + { + "id": "call_iRnHdOjAk80LBNwPxMdxwwqM", + "arguments": "{}", + "name": "transfer_to_issues_and_repairs" + } + ], + "thought": null, + "source": "TriageAgent", + "type": "AssistantMessage" + }, + { + "content": [ + { + "content": "Transferred to IssuesAndRepairsAgent. Adopt persona immediately.", + "name": "transfer_to_issues_and_repairs", + "call_id": "call_iRnHdOjAk80LBNwPxMdxwwqM", + "is_error": false + } + ], + "type": "FunctionExecutionResultMessage" + }, + { + "content": "Could you please describe what happened with your rocket-powered unicycle before it exploded?", + "thought": null, + "source": "IssuesAndRepairsAgent", + "type": "AssistantMessage" + } + ] +} \ No newline at end of file diff --git a/python/samples/core_streaming_handoffs_fastapi/model_config_template.yaml b/python/samples/core_streaming_handoffs_fastapi/model_config_template.yaml new file mode 100644 index 000000000000..c43f0a5bdd13 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/model_config_template.yaml @@ -0,0 +1,26 @@ +# Use Open AI with key +provider: autogen_ext.models.openai.OpenAIChatCompletionClient +config: + model: gpt-4o + api_key: REPLACE_WITH_YOUR_API_KEY +# Use Azure Open AI with key +# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient +# config: +# model: gpt-4o +# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/ +# azure_deployment: {your-azure-deployment} +# api_version: {your-api-version} +# api_key: REPLACE_WITH_YOUR_API_KEY +# Use Azure OpenAI with AD token provider. +# provider: autogen_ext.models.openai.AzureOpenAIChatCompletionClient +# config: +# model: gpt-4o +# azure_endpoint: https://{your-custom-endpoint}.openai.azure.com/ +# azure_deployment: {your-azure-deployment} +# api_version: {your-api-version} +# azure_ad_token_provider: +# provider: autogen_ext.auth.azure.AzureTokenProvider +# config: +# provider_kind: DefaultAzureCredential +# scopes: +# - https://cognitiveservices.azure.com/.default \ No newline at end of file diff --git a/python/samples/core_streaming_handoffs_fastapi/models.py b/python/samples/core_streaming_handoffs_fastapi/models.py new file mode 100644 index 000000000000..d67c3c3a6fb7 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/models.py @@ -0,0 +1,14 @@ +from typing import List +from autogen_core.models import LLMMessage +from pydantic import BaseModel + + +class UserLogin(BaseModel): + pass + +class UserTask(BaseModel): + context: List[LLMMessage] + +class AgentResponse(BaseModel): + reply_to_topic_type: str + context: List[LLMMessage] diff --git a/python/samples/core_streaming_handoffs_fastapi/requirements.txt b/python/samples/core_streaming_handoffs_fastapi/requirements.txt new file mode 100644 index 000000000000..869c43be2c85 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/requirements.txt @@ -0,0 +1,5 @@ +autogen-core>=0.5.4 +autogen-ext[openai,azure]>=0.5.4 +fastapi==0.115.12 +uvicorn==0.34.2 +PyYAML==6.0.2 diff --git a/python/samples/core_streaming_handoffs_fastapi/static/index.html b/python/samples/core_streaming_handoffs_fastapi/static/index.html new file mode 100644 index 000000000000..646fdb169a61 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/static/index.html @@ -0,0 +1,215 @@ + + + + ACME Agent Chat + + + +
+

Chat with ACME Agent

+
+
+ + +
+
+ + + + diff --git a/python/samples/core_streaming_handoffs_fastapi/tools.py b/python/samples/core_streaming_handoffs_fastapi/tools.py new file mode 100644 index 000000000000..b6a3d8237226 --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/tools.py @@ -0,0 +1,32 @@ +from typing import Dict, Union +from autogen_core.tools import FunctionTool + + +def execute_order(product: str, price: int) -> Dict[str, Union[str, int]]: + print("\n\n=== Order Summary ===") + print(f"Product: {product}") + print(f"Price: ${price}") + print("=================\n") + return {"product":product,"price":price} + + + +def look_up_item(search_query: str) -> Dict[str, str]: + item_id = "item_132612938" + return {"item_id":item_id,"status":"found"} + + +def execute_refund(item_id: str, reason: str = "not provided") -> Dict[str, str]: + print("\n\n=== Refund Summary ===") + print(f"Item ID: {item_id}") + print(f"Reason: {reason}") + print("=================\n") + print("Refund execution successful!") + return {"item_id":item_id, "reason":reason, "refund_status":"Successful"} + + +execute_order_tool = FunctionTool(execute_order, description="Price should be in USD.") +look_up_item_tool = FunctionTool( + look_up_item, description="Use to find item ID.\nSearch query can be a description or keywords." +) +execute_refund_tool = FunctionTool(execute_refund, description="") diff --git a/python/samples/core_streaming_handoffs_fastapi/tools_delegate.py b/python/samples/core_streaming_handoffs_fastapi/tools_delegate.py new file mode 100644 index 000000000000..440cd484196a --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/tools_delegate.py @@ -0,0 +1,25 @@ +from autogen_core.tools import FunctionTool +from topics import sales_agent_topic_type, issues_and_repairs_agent_topic_type, triage_agent_topic_type + +def transfer_to_sales_agent() -> str: + return sales_agent_topic_type + + +def transfer_to_issues_and_repairs() -> str: + return issues_and_repairs_agent_topic_type + + +def transfer_back_to_triage() -> str: + return triage_agent_topic_type + + +transfer_to_sales_agent_tool = FunctionTool( + transfer_to_sales_agent, description="Use for anything sales or buying related." +) +transfer_to_issues_and_repairs_tool = FunctionTool( + transfer_to_issues_and_repairs, description="Use for issues, repairs, or refunds." +) +transfer_back_to_triage_tool = FunctionTool( + transfer_back_to_triage, + description="Call this if the user brings up a topic outside of your purview,\nincluding escalating to human.", +) diff --git a/python/samples/core_streaming_handoffs_fastapi/topics.py b/python/samples/core_streaming_handoffs_fastapi/topics.py new file mode 100644 index 000000000000..41aaf364503f --- /dev/null +++ b/python/samples/core_streaming_handoffs_fastapi/topics.py @@ -0,0 +1,5 @@ +sales_agent_topic_type = "SalesAgent" +issues_and_repairs_agent_topic_type = "IssuesAndRepairsAgent" +triage_agent_topic_type = "TriageAgent" +user_topic_type = "User" +