-
Notifications
You must be signed in to change notification settings - Fork 8.1k
Description
What happened?
Describe the bug
We are trying to create a multi team (Supervisor) pattern, in which a supervisor should orchestrate and communicate between multiple teams to complete the job. For that We have used combination of MagenticOneOrchestrator, SoceityOfMindAgent and SelectorGroupChat, code is attached below. We see the teams are initialized dynamically and the supervisor is channeling the user query to the relevant team. When the job is done we see below exception.
Please suggest what is wrong.
Error processing publish message for GeneralAssistantSupervisor_2ae9d9d0-41f3-4916-9f91-4c36856a1242/2ae9d9d0-41f3-4916-9f91-4c36856a1242
Traceback (most recent call last):
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_core/_single_threaded_agent_runtime.py", line 510, in _on_message
return await agent.on_message(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_core/_base_agent.py", line 113, in on_message
return await self.on_message_impl(message, ctx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_agentchat/teams/_group_chat/_sequential_routed_agent.py", line 67, i
n on_message_impl
return await super().on_message_impl(message, ctx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_core/_routed_agent.py", line 485, in on_message_impl
return await h(self, message, ctx)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_core/_routed_agent.py", line 268, in wrapper
return_value = await func(self, message, ctx) # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_agentchat/teams/_group_chat/_chat_agent_container.py", line 69, in h
andle_request
async for msg in self._agent.on_messages_stream(self._message_buffer, ctx.cancellation_token):
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_agentchat/agents/_society_of_mind_agent.py", line 148, in on_message
s_stream
async for inner_msg in self._team.run_stream(task=task, cancellation_token=cancellation_token):
File "/home/azureuser/clientdemo/magenticone/.venv/lib/python3.12/site-packages/autogen_agentchat/teams/_group_chat/_base_group_chat.py", line 400, in run_s
tream
raise ValueError("Task list cannot be empty.")
ValueError: Task list cannot be empty.
To Reproduce
Use the code below.
Expected behavior
Required task should be shared between agent and MagenticOneOrchestrator, SoceityOfMindAgent and SelectorGroupChat the there should not be exception
Screenshots
NA
Additional context
import json
import logging
import os
from typing import Any, Awaitable, Callable, Optional
import aiofiles
import yaml
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent, SocietyOfMindAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.messages import TextMessage, UserInputRequestedEvent
from autogen_agentchat.teams import MagenticOneGroupChat, SelectorGroupChat
from autogen_core import CancellationToken
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from tools.onboarding_offboarding_tools import *
from tools.incident_mitigation_tools import *
from tools.wawa_tools import *
logger = logging.getLogger(__name__)
app = FastAPI()
# Load team configurations from external JSON file
CONFIG_FILE = "teams_config.json"
def load_team_config():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r") as file:
return json.load(file)
return []
TEAM_CONFIG = load_team_config()
def get_team_config(request: str):
for team in TEAM_CONFIG:
if team.get("teamName", "").lower() == request.lower():
return team # ✅ Ensure it returns the correct team config
return {}
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
model_config_path = "model_config.yaml"
state_path = "team_state.json"
history_path = "team_history.json"
# Serve static files
app.mount("/static", StaticFiles(directory="."), name="static")
class ConnectionManager:
def __init__(self):
#self.active_connections: list[WebSocket] = []
self.active_connections: dict(str, WebSocket) = {}
async def connect(self, websocket: WebSocket, clientId):
await websocket.accept()
#self.active_connections.append(websocket)
self.active_connections[clientId] = websocket
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_admin_message(self, message: str):
websocket = self.active_connections.get("admin")
await websocket.send_json(message)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_json(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_json(message)
manager = ConnectionManager()
text_termination = TextMentionTermination("TERMINATE")
max_msg_termination = MaxMessageTermination(max_messages=100)
combined_termination = max_msg_termination | text_termination
model_client = AzureOpenAIChatCompletionClient(
model="gpt-4o", temperature=0.0, top_p=0.0, frequency_penalty=0.0, presence_penalty=0.0
)
@app.get("/")
async def root():
"""Serve the chat interface HTML file."""
return FileResponse("app_user.html")
async def load_agents_from_config(teamType: str) -> list[AssistantAgent]:
"""Load agent configurations from JSON files."""
team_config = get_team_config(teamType)
if not team_config:
print(f"❌ No team config found for '{teamType}'")
return []
agents_config_file = team_config.get("config", "general_agents_config.json") # Get correct config file
print(f"📂 Loading agent config file: {agents_config_file}")
if not os.path.exists(agents_config_file):
print(f"❌ File '{agents_config_file}' does not exist!")
return []
try:
async with aiofiles.open(agents_config_file, "r") as file:
file_content = await file.read()
print("📄 Loaded File Content (Preview):", file_content[:200]) # Print first 200 chars for debugging
agents_data = json.loads(file_content)
except json.JSONDecodeError as e:
print(f"❌ JSON Error in '{agents_config_file}': {e}")
return []
except Exception as e:
print(f"❌ Unexpected error while reading '{agents_config_file}': {e}")
return []
agents = []
for agent in agents_data.get("agents", []):
tool_functions = [globals().get(tool) for tool in agent.get("tools", []) if globals().get(tool)]
if len(tool_functions) != len(agent.get("tools", [])):
missing_tools = [tool for tool in agent.get("tools", []) if tool not in globals()]
logger.warning(f"⚠️ Missing tools: {missing_tools} for agent {agent['name']}")
agents.append(
AssistantAgent(
name=agent["name"],
model_client=model_client,
tools=tool_functions,
system_message=agent["system_message"],
description=agent["description"],
reflect_on_tool_use=True,
model_client_stream=False,
)
)
print(f"✅ Successfully loaded {len(agents)} agents from '{agents_config_file}'")
return agents
async def get_team(user_input_func: Callable[[str, Optional[CancellationToken]], Awaitable[str]], request: str) -> MagenticOneGroupChat:
"""Retrieve or create a team based on request type."""
user_proxy = UserProxyAgent(name="User", input_func=user_input_func)
team_config = get_team_config(request)
print("::::team_config", team_config)
team_type = team_config.get("teamName", "GeneralAssistantSupervisor")
print("::::team_type", team_type)
agent_list = await load_agents_from_config(team_type)
agent_list.append(user_proxy)
"""
Args:
participants (List[ChatAgent]): The participants in the group chat.
model_client (ChatCompletionClient): The model client used for generating responses.
termination_condition (TerminationCondition, optional): The termination condition for the group chat. Defaults to None.
Without a termination condition, the group chat will run based on the orchestrator logic or until the maximum number of turns is reached.
max_turns (int, optional): The maximum number of turns in the group chat before stopping. Defaults to 20.
max_stalls (int, optional): The maximum number of stalls allowed before re-planning. Defaults to 3.
final_answer_prompt (str, optional): The LLM prompt used to generate the final answer or response from the team's transcript. A default (sensible for GPT-4o class models) is provided.
"""
team = MagenticOneGroupChat(
agent_list,
model_client=model_client,
termination_condition=combined_termination,
max_turns=100,
max_stalls=30
)
# Load state from file.
"""if not os.path.exists(state_path):
return team
async with aiofiles.open(state_path, "r") as file:
state = json.loads(await file.read())
await team.load_state(state)"""
return team
async def get_history() -> list[dict[str, Any]]:
"""Get chat history from file."""
if not os.path.exists(history_path):
return []
async with aiofiles.open(history_path, "r") as file:
return json.loads(await file.read())
@app.get("/history")
async def history() -> list[dict[str, Any]]:
"""Fetch chat history from storage."""
try:
# return await get_history()
return []
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
@app.websocket("/ws/chat/{client_id}")
async def chat(websocket: WebSocket,client_id: str):
"""WebSocket handler for real-time chat."""
global model_client
await manager.connect(websocket, client_id)
# User input function used by the team.
async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
data = await websocket.receive_json()
return TextMessage.model_validate(data).content
try:
while True:
try:
# Get user message.
data = await websocket.receive_json()
request = TextMessage.model_validate(data)
print("TEAM_CONFIG:", TEAM_CONFIG)
teams = {}
for team in TEAM_CONFIG:
print("Current team:", team) # Debugging
print("Type:", type(team))
if isinstance(team, dict) and "teamName" in team:
teams[team["teamName"]] = await get_team(_user_input, team["teamName"])
else:
print(f"Invalid team format: {team}") # Debugging line
agents = {}
for team_name, team_instance in teams.items():
if isinstance(team_instance, MagenticOneGroupChat): # Ensure it's a valid team instance
agents[team_name] = SocietyOfMindAgent(team_name, team=team_instance, model_client=model_client, description="An agent that uses an inner team of agents to generate responses. Strictly - Reponse should be in 20 - 30 words only.")
else:
print(f"Skipping invalid team instance for {team_name}") # Debugging line
selector_prompt = (
"Select the most suitable agent to perform the next task. {roles}\n\n"
"Current conversation context: {history}\n\n"
"- Prioritize expertise.\n"
"- Planner agent assigns tasks before execution.\n"
"- General tasks: GeneralAssistantSupervisor.\n"
"- Pump issues: FuelStationSupervisor.\n"
"- Coordinate across teams when necessary.\n"
"- Select only one agent at a time."
)
orchestrator = SelectorGroupChat(
list(agents.values()),
model_client=model_client,
termination_condition=combined_termination,
selector_prompt=selector_prompt,
allow_repeated_speaker=True
)
history = await get_history()
async for message in orchestrator.run_stream(task=request):
if isinstance(message, TaskResult):
continue
await websocket.send_json(message.model_dump())
await manager.send_admin_message(message.model_dump())
if not isinstance(message, UserInputRequestedEvent):
history.append(message.model_dump())
"""
# Save team state to file.
async with aiofiles.open(state_path, "w") as file:
state = await team.save_state()
await file.write(json.dumps(state))
# Save chat history to file.
async with aiofiles.open(history_path, "w") as file:
await file.write(json.dumps(history))
"""
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
await websocket.send_json({"type": "error", "content": f"Unexpected error: {str(e)}", "source": "system"})
except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
await websocket.send_json({"type": "error", "content": f"Unexpected error: {str(e)}", "source": "system"})
# Example usage
if __name__ == "__main__":
import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8002, timeout_keep_alive=1200.0, ws_ping_interval=3000.0, ws_ping_timeout=3000.0)
uvicorn.run(
app,
host="0.0.0.0",
port=8003,
log_level="debug", # Same as '--log-level debug'
timeout_keep_alive=600, # Keeps idle HTTP connections for 10 minutes
ws_ping_interval=300, # Sends a WebSocket ping every 300 seconds
ws_ping_timeout=300 # Waits 5 minutes for a pong before closing
)
Sample team config file
{
"agents": [
{
"name": "GeneralAssistantAgent",
"system_message": "You are a helpful assistant.",
"description": "You are a helpful assistant.",
"tools": []
},
{
"name": "WeatherAgent",
"system_message": "You are a helpful assistant to answer weather queries.",
"description": "You are a helpful assistant to answer weather queries.",
"tools": ["get_weather"]
}
]
}
Which packages was the bug in?
Python AgentChat (autogen-agentchat>=0.4.0)
AutoGen library version.
Python 0.4.7
Other library version.
No response
Model used
gpt-4o
Model provider
None
Other model provider
No response
Python version
None
.NET version
None
Operating system
None