diff --git a/src/crewai/agent.py b/src/crewai/agent.py index d10b768d4c..aff8b9dc8b 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,7 +1,7 @@ import re import shutil import subprocess -from typing import Any, Dict, List, Literal, Optional, Sequence, Union +from typing import Any, Dict, List, Literal, Optional, Sequence, Union, cast from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -50,6 +50,7 @@ class Agent(BaseAgent): max_rpm: Maximum number of requests per minute for the agent execution to be respected. verbose: Whether the agent execution should be in verbose mode. allow_delegation: Whether the agent is allowed to delegate tasks to other agents. + delegate_to: List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents. tools: Tools at agents disposal step_callback: Callback to be executed after each step of the agent execution. knowledge_sources: Knowledge sources for the agent. @@ -342,10 +343,17 @@ def create_agent_executor( callbacks=[TokenCalcHandler(self._token_process)], ) - def get_delegation_tools(self, agents: List[BaseAgent]): - agent_tools = AgentTools(agents=agents) - tools = agent_tools.tools() - return tools + def get_delegation_tools(self, agents: Sequence[BaseAgent]) -> Sequence[BaseTool]: + # If delegate_to is specified, use those agents instead of all agents + agents_to_use: List[BaseAgent] + if self.delegate_to is not None: + agents_to_use = cast(List[BaseAgent], list(self.delegate_to)) + else: + agents_to_use = list(agents) # Convert to list to match expected type + + agent_tools = AgentTools(agents=agents_to_use) + delegation_tools = agent_tools.tools() + return delegation_tools def get_multimodal_tools(self) -> Sequence[BaseTool]: from crewai.tools.agent_tools.add_image_tool import AddImageTool diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index 47515d0876..0d4f9af511 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Dict, List, Optional, TypeVar +from typing import Any, Dict, List, Optional, Sequence, TypeVar from pydantic import ( UUID4, @@ -42,6 +42,7 @@ class BaseAgent(ABC, BaseModel): verbose (bool): Verbose mode for the Agent Execution. max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution. allow_delegation (bool): Allow delegation of tasks to agents. + delegate_to (Optional[List["BaseAgent"]]): List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents. tools (Optional[List[Any]]): Tools at the agent's disposal. max_iter (int): Maximum iterations for an agent to execute a task. agent_executor (InstanceOf): An instance of the CrewAgentExecutor class. @@ -63,7 +64,7 @@ class BaseAgent(ABC, BaseModel): Abstract method to create an agent executor. _parse_tools(tools: List[BaseTool]) -> List[Any]: Abstract method to parse tools. - get_delegation_tools(agents: List["BaseAgent"]): + get_delegation_tools(agents: Sequence["BaseAgent"]) -> Sequence[BaseTool]: Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew. get_output_converter(llm, model, instructions): Abstract method to get the converter class for the agent to create json/pydantic outputs. @@ -113,6 +114,10 @@ class BaseAgent(ABC, BaseModel): default=False, description="Enable agent to delegate and ask questions among each other.", ) + delegate_to: Optional[List["BaseAgent"]] = Field( + default=None, + description="List of agents this agent can delegate to. If None and allow_delegation is True, can delegate to all agents.", + ) tools: Optional[List[BaseTool]] = Field( default_factory=list, description="Tools at agents' disposal" ) @@ -258,7 +263,7 @@ def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]: pass @abstractmethod - def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]: + def get_delegation_tools(self, agents: Sequence["BaseAgent"]) -> Sequence[BaseTool]: """Set the task tools that init BaseAgenTools class.""" pass @@ -285,6 +290,7 @@ def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with "knowledge_sources", "knowledge_storage", "knowledge", + "delegate_to", } # Copy llm @@ -310,6 +316,10 @@ def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with copied_source.storage = shared_storage existing_knowledge_sources.append(copied_source) + existing_delegate_to = None + if self.delegate_to: + existing_delegate_to = list(self.delegate_to) + copied_data = self.model_dump(exclude=exclude) copied_data = {k: v for k, v in copied_data.items() if v is not None} copied_agent = type(self)( @@ -319,6 +329,7 @@ def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with knowledge_sources=existing_knowledge_sources, knowledge=copied_knowledge, knowledge_storage=copied_knowledge_storage, + delegate_to=existing_delegate_to, ) return copied_agent diff --git a/src/crewai/crew.py b/src/crewai/crew.py index c4216fb61c..9fc1dbb7b3 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -6,7 +6,7 @@ from concurrent.futures import Future from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union from pydantic import ( UUID4, @@ -36,6 +36,7 @@ from crewai.task import Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput +from crewai.tools import BaseTool from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.base_tool import Tool from crewai.types.usage_metrics import UsageMetrics @@ -759,22 +760,27 @@ def _run_hierarchical_process(self) -> CrewOutput: def _create_manager_agent(self): i18n = I18N(prompt_file=self.prompt_file) if self.manager_agent is not None: + # Ensure delegation is enabled for the manager agent self.manager_agent.allow_delegation = True + + # Set the delegate_to property to all agents in the crew + # If delegate_to is already set, it will be used instead of all agents + if self.manager_agent.delegate_to is None: + self.manager_agent.delegate_to = self.agents + manager = self.manager_agent - if manager.tools is not None and len(manager.tools) > 0: - self._logger.log( - "warning", "Manager agent should not have tools", color="orange" - ) - manager.tools = [] - raise Exception("Manager agent should not have tools") else: self.manager_llm = create_llm(self.manager_llm) + # Create delegation tools + delegation_tools = AgentTools(agents=self.agents).tools() + manager = Agent( role=i18n.retrieve("hierarchical_manager_agent", "role"), goal=i18n.retrieve("hierarchical_manager_agent", "goal"), backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), - tools=AgentTools(agents=self.agents).tools(), + tools=delegation_tools, allow_delegation=True, + delegate_to=self.agents, llm=self.manager_llm, verbose=self.verbose, ) @@ -818,8 +824,8 @@ def _execute_tasks( ) # Determine which tools to use - task tools take precedence over agent tools - tools_for_task = task.tools or agent_to_use.tools or [] - tools_for_task = self._prepare_tools(agent_to_use, task, tools_for_task) + initial_tools = task.tools or agent_to_use.tools or [] + prepared_tools = self._prepare_tools(agent_to_use, task, initial_tools) self._log_task_start(task, agent_to_use.role) @@ -838,7 +844,7 @@ def _execute_tasks( future = task.execute_async( agent=agent_to_use, context=context, - tools=tools_for_task, + tools=prepared_tools, ) futures.append((task, future, task_index)) else: @@ -850,7 +856,7 @@ def _execute_tasks( task_output = task.execute_sync( agent=agent_to_use, context=context, - tools=tools_for_task, + tools=prepared_tools, ) task_outputs.append(task_output) self._process_task_result(task, task_output) @@ -888,8 +894,8 @@ def _handle_conditional_task( return None def _prepare_tools( - self, agent: BaseAgent, task: Task, tools: List[Tool] - ) -> List[Tool]: + self, agent: BaseAgent, task: Task, tools: Sequence[BaseTool] + ) -> list[BaseTool]: # Add delegation tools if agent allows delegation if agent.allow_delegation: if self.process == Process.hierarchical: @@ -904,13 +910,15 @@ def _prepare_tools( tools = self._add_delegation_tools(task, tools) # Add code execution tools if agent allows code execution - if agent.allow_code_execution: + if hasattr(agent, "allow_code_execution") and getattr( + agent, "allow_code_execution", False + ): tools = self._add_code_execution_tools(agent, tools) - if agent and agent.multimodal: + if hasattr(agent, "multimodal") and getattr(agent, "multimodal", False): tools = self._add_multimodal_tools(agent, tools) - return tools + return list(tools) def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: if self.process == Process.hierarchical: @@ -918,8 +926,8 @@ def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]: return task.agent def _merge_tools( - self, existing_tools: List[Tool], new_tools: List[Tool] - ) -> List[Tool]: + self, existing_tools: Sequence[BaseTool], new_tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: """Merge new tools into existing tools list, avoiding duplicates by tool name.""" if not new_tools: return existing_tools @@ -936,21 +944,42 @@ def _merge_tools( return tools def _inject_delegation_tools( - self, tools: List[Tool], task_agent: BaseAgent, agents: List[BaseAgent] + self, + tools: Sequence[BaseTool], + task_agent: BaseAgent, + agents: Sequence[BaseAgent], ): delegation_tools = task_agent.get_delegation_tools(agents) return self._merge_tools(tools, delegation_tools) - def _add_multimodal_tools(self, agent: BaseAgent, tools: List[Tool]): - multimodal_tools = agent.get_multimodal_tools() - return self._merge_tools(tools, multimodal_tools) + def _add_multimodal_tools( + self, agent: BaseAgent, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + if hasattr(agent, "get_multimodal_tools"): + multimodal_tools = getattr(agent, "get_multimodal_tools")() + return self._merge_tools(tools, multimodal_tools) + return tools - def _add_code_execution_tools(self, agent: BaseAgent, tools: List[Tool]): - code_tools = agent.get_code_execution_tools() - return self._merge_tools(tools, code_tools) + def _add_code_execution_tools( + self, agent: BaseAgent, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + if hasattr(agent, "get_code_execution_tools"): + code_tools = getattr(agent, "get_code_execution_tools")() + return self._merge_tools(tools, code_tools) + return tools + + def _add_delegation_tools( + self, task: Task, tools: Sequence[BaseTool] + ) -> Sequence[BaseTool]: + # If the agent has specific agents to delegate to, use those + if task.agent and task.agent.delegate_to is not None: + agents_for_delegation = task.agent.delegate_to + else: + # Otherwise use all agents except the current one + agents_for_delegation = [ + agent for agent in self.agents if agent != task.agent + ] - def _add_delegation_tools(self, task: Task, tools: List[Tool]): - agents_for_delegation = [agent for agent in self.agents if agent != task.agent] if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent: if not tools: tools = [] @@ -965,7 +994,7 @@ def _log_task_start(self, task: Task, role: str = "None"): task_name=task.name, task=task.description, agent=role, status="started" ) - def _update_manager_tools(self, task: Task, tools: List[Tool]): + def _update_manager_tools(self, task: Task, tools: Sequence[BaseTool]): if self.manager_agent: if task.agent: tools = self._inject_delegation_tools(tools, task.agent, [task.agent]) diff --git a/src/crewai/tools/agent_tools/agent_tools.py b/src/crewai/tools/agent_tools/agent_tools.py index 77d3c2d891..d94aa8986f 100644 --- a/src/crewai/tools/agent_tools/agent_tools.py +++ b/src/crewai/tools/agent_tools/agent_tools.py @@ -1,3 +1,5 @@ +from typing import List + from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tools.base_tool import BaseTool from crewai.utilities import I18N @@ -9,11 +11,11 @@ class AgentTools: """Manager class for agent-related tools""" - def __init__(self, agents: list[BaseAgent], i18n: I18N = I18N()): + def __init__(self, agents: List[BaseAgent], i18n: I18N = I18N()): self.agents = agents self.i18n = i18n - def tools(self) -> list[BaseTool]: + def tools(self) -> List[BaseTool]: """Get all available agent tools""" coworkers = ", ".join([f"{agent.role}" for agent in self.agents]) diff --git a/src/crewai/tools/agent_tools/base_agent_tools.py b/src/crewai/tools/agent_tools/base_agent_tools.py index b00fbb7b56..851b38b232 100644 --- a/src/crewai/tools/agent_tools/base_agent_tools.py +++ b/src/crewai/tools/agent_tools/base_agent_tools.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import Optional, Sequence from pydantic import Field @@ -14,7 +14,7 @@ class BaseAgentTool(BaseTool): """Base class for agent-related tools""" - agents: list[BaseAgent] = Field(description="List of available agents") + agents: Sequence[BaseAgent] = Field(description="List of available agents") i18n: I18N = Field( default_factory=I18N, description="Internationalization settings" ) @@ -47,10 +47,7 @@ def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]: return coworker def _execute( - self, - agent_name: Optional[str], - task: str, - context: Optional[str] = None + self, agent_name: Optional[str], task: str, context: Optional[str] = None ) -> str: """ Execute delegation to an agent with case-insensitive and whitespace-tolerant matching. @@ -77,33 +74,43 @@ def _execute( # when it should look like this: # {"task": "....", "coworker": "...."} sanitized_name = self.sanitize_agent_name(agent_name) - logger.debug(f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'") + logger.debug( + f"Sanitized agent name from '{agent_name}' to '{sanitized_name}'" + ) available_agents = [agent.role for agent in self.agents] logger.debug(f"Available agents: {available_agents}") - agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None") + agent = [ # type: ignore # Incompatible types in assignment (expression has type "Sequence[BaseAgent]", variable has type "str | None") available_agent for available_agent in self.agents if self.sanitize_agent_name(available_agent.role) == sanitized_name ] - logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'") + logger.debug( + f"Found {len(agent)} matching agents for role '{sanitized_name}'" + ) except (AttributeError, ValueError) as e: # Handle specific exceptions that might occur during role name processing return self.i18n.errors("agent_tool_unexisting_coworker").format( coworkers="\n".join( - [f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents] + [ + f"- {self.sanitize_agent_name(agent.role)}" + for agent in self.agents + ] ), - error=str(e) + error=str(e), ) if not agent: # No matching agent found after sanitization return self.i18n.errors("agent_tool_unexisting_coworker").format( coworkers="\n".join( - [f"- {self.sanitize_agent_name(agent.role)}" for agent in self.agents] + [ + f"- {self.sanitize_agent_name(agent.role)}" + for agent in self.agents + ] ), - error=f"No agent found with role '{sanitized_name}'" + error=f"No agent found with role '{sanitized_name}'", ) agent = agent[0] @@ -114,11 +121,12 @@ def _execute( expected_output=agent.i18n.slice("manager_request"), i18n=agent.i18n, ) - logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}") + logger.debug( + f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}" + ) return agent.execute_task(task_with_assigned_agent, context) except Exception as e: # Handle task creation or execution errors return self.i18n.errors("agent_tool_execution_error").format( - agent_role=self.sanitize_agent_name(agent.role), - error=str(e) + agent_role=self.sanitize_agent_name(agent.role), error=str(e) ) diff --git a/src/crewai/tools/base_tool.py b/src/crewai/tools/base_tool.py index b3c0f997cb..35eafe051c 100644 --- a/src/crewai/tools/base_tool.py +++ b/src/crewai/tools/base_tool.py @@ -248,13 +248,18 @@ def to_langchain( def tool(*args): """ Decorator to create a tool from a function. + Ensures the decorated function is always wrapped as a BaseTool. """ - def _make_with_name(tool_name: str) -> Callable: + def _make_with_name(tool_name: str) -> Callable[[Callable], BaseTool]: def _make_tool(f: Callable) -> BaseTool: + # If f is already a BaseTool, return it + if isinstance(f, BaseTool): + return f + if f.__doc__ is None: raise ValueError("Function must have a docstring") - if f.__annotations__ is None: + if not f.__annotations__: raise ValueError("Function must have type annotations") class_name = "".join(tool_name.split()).title() @@ -278,7 +283,10 @@ def _make_tool(f: Callable) -> BaseTool: return _make_tool if len(args) == 1 and callable(args[0]): + if isinstance(args[0], BaseTool): + return args[0] return _make_with_name(args[0].__name__)(args[0]) - if len(args) == 1 and isinstance(args[0], str): + elif len(args) == 1 and isinstance(args[0], str): return _make_with_name(args[0]) - raise ValueError("Invalid arguments") + else: + raise ValueError("Invalid arguments") diff --git a/tests/agent_test.py b/tests/agent_test.py index b5b3aae931..31ef570f6a 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -1797,3 +1797,136 @@ def test_litellm_anthropic_error_handling(): # Verify the LLM call was only made once (no retries) mock_llm_call.assert_called_once() + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_agent_delegation_to_specific_agents(): + """Test that an agent can delegate to specific agents using the delegate_to property.""" + # Create agents in order so we can reference them in delegate_to + agent2 = Agent( + role="Agent 2", + goal="Goal for Agent 2", + backstory="Backstory for Agent 2", + allow_delegation=True, + ) + + agent3 = Agent( + role="Agent 3", + goal="Goal for Agent 3", + backstory="Backstory for Agent 3", + allow_delegation=True, + ) + + # Create agent1 without specific delegation first to test default behavior + agent1 = Agent( + role="Agent 1", + goal="Goal for Agent 1", + backstory="Backstory for Agent 1", + allow_delegation=True, + ) + + # Test default behavior (delegate to all agents) + all_agents = [agent1, agent2, agent3] + delegation_tools = agent1.get_delegation_tools(all_agents) + + # Verify that tools for all agents are returned + assert len(delegation_tools) == 2 # Delegate and Ask tools + + # Check that the tools can delegate to all agents + delegate_tool = delegation_tools[0] + ask_tool = delegation_tools[1] + + # Verify the tools description includes all agents + assert "Agent 1" in delegate_tool.description + assert "Agent 2" in delegate_tool.description + assert "Agent 3" in delegate_tool.description + assert "Agent 1" in ask_tool.description + assert "Agent 2" in ask_tool.description + assert "Agent 3" in ask_tool.description + + # Test delegation to specific agents by creating a new agent with delegate_to + agent1_with_specific_delegation = Agent( + role="Agent 1", + goal="Goal for Agent 1", + backstory="Backstory for Agent 1", + allow_delegation=True, + delegate_to=[agent2], # Only delegate to agent2 + ) + + specific_delegation_tools = agent1_with_specific_delegation.get_delegation_tools( + all_agents + ) + + # Verify that tools for only the specified agent are returned + assert len(specific_delegation_tools) == 2 # Delegate and Ask tools + + # Check that the tools can only delegate to agent2 + specific_delegate_tool = specific_delegation_tools[0] + specific_ask_tool = specific_delegation_tools[1] + + # Verify the tools description includes only agent2 + assert "Agent 2" in specific_delegate_tool.description + assert "Agent 1" not in specific_delegate_tool.description + assert "Agent 3" not in specific_delegate_tool.description + assert "Agent 2" in specific_ask_tool.description + assert "Agent 1" not in specific_ask_tool.description + assert "Agent 3" not in specific_ask_tool.description + + +def test_agent_copy_with_delegate_to(): + """Test that the delegate_to attribute is properly copied when copying an agent.""" + # Create a few agents for delegation + agent1 = Agent( + role="Researcher", + goal="Research topics", + backstory="Experienced researcher", + ) + + agent2 = Agent( + role="Writer", + goal="Write content", + backstory="Professional writer", + ) + + agent3 = Agent( + role="Manager", + goal="Manage the team", + backstory="Expert manager", + allow_delegation=True, + delegate_to=[agent1, agent2], # This manager can delegate to agent1 and agent2 + ) + + # Make a copy of the manager agent + copied_agent3 = agent3.copy() + + # Verify the copied agent has the same delegation settings + assert copied_agent3.allow_delegation == agent3.allow_delegation + assert ( + copied_agent3.delegate_to is not agent3.delegate_to + ) # Should be different objects + assert copied_agent3.delegate_to is not None + assert agent3.delegate_to is not None + assert len(copied_agent3.delegate_to) == len(agent3.delegate_to) + assert all(a in copied_agent3.delegate_to for a in agent3.delegate_to) + + # Modify the original agent's delegate_to list + assert agent3.delegate_to is not None + agent3.delegate_to.pop() + + # Verify the copied agent's delegate_to list is not affected + assert copied_agent3.delegate_to is not None + assert agent3.delegate_to is not None + assert len(copied_agent3.delegate_to) == 2 + assert len(agent3.delegate_to) == 1 + + # Test copying an agent with delegate_to=None + agent4 = Agent( + role="Solo Worker", + goal="Work independently", + backstory="Independent worker", + allow_delegation=False, + delegate_to=None, + ) + + copied_agent4 = agent4.copy() + assert copied_agent4.delegate_to == agent4.delegate_to diff --git a/tests/crew_test.py b/tests/crew_test.py index 39a3e9a081..2a26814e48 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -724,13 +724,14 @@ def _run(self, query: str) -> str: crew.kickoff() # Verify task tools override agent tools + assert task.tools is not None assert len(task.tools) == 1 # AnotherTestTool assert any(isinstance(tool, AnotherTestTool) for tool in task.tools) assert not any(isinstance(tool, TestTool) for tool in task.tools) # Verify agent tools remain unchanged + assert new_researcher.tools is not None assert len(new_researcher.tools) == 1 - assert isinstance(new_researcher.tools[0], TestTool) @pytest.mark.vcr(filter_headers=["authorization"]) @@ -868,11 +869,17 @@ def test_crew_verbose_output(capsys): event_listener.formatter.verbose = False crew.kickoff() captured = capsys.readouterr() + + # Filter out event listener logs, escape codes, and now also 'tools:' lines filtered_output = "\n".join( line for line in captured.out.split("\n") - if not line.startswith("[") and line.strip() and not line.startswith("\x1b") + if not line.startswith("[") + and line.strip() + and not line.startswith("\x1b") + and not "tools:" in line.lower() # Exclude 'tools:' lines ) + assert filtered_output == "" @@ -1599,6 +1606,8 @@ def look_up_greeting() -> str: crew = Crew(agents=[agent1], tasks=[essay]) result = crew.kickoff() assert result.raw == "Howdy!" + assert agent1.tools is not None + assert len(agent1.tools) == 1 @pytest.mark.vcr(filter_headers=["authorization"]) @@ -4025,3 +4034,442 @@ def test_crew_with_knowledge_sources_works_with_copy(): assert len(crew_copy.tasks) == len(crew.tasks) assert len(crew_copy.tasks) == len(crew.tasks) + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_crew_with_specific_delegation(): + """Test that agents in a crew can delegate to specific agents using the delegate_to property.""" + # Create editor agent first since it will be referenced in writer's delegate_to + editor = Agent( + role="Editor", + goal="Edit content", + backstory="You're an expert editor", + allow_delegation=True, + ) + + # Create writer with delegate_to set during initialization + writer = Agent( + role="Writer", + goal="Write content", + backstory="You're an expert writer", + allow_delegation=True, + delegate_to=[editor], # Writer can only delegate to Editor + ) + + # Create researcher with delegate_to set during initialization + researcher = Agent( + role="Researcher", + goal="Research information", + backstory="You're an expert researcher", + allow_delegation=True, + delegate_to=[writer], # Researcher can only delegate to Writer + ) + + # Create tasks + task1 = Task( + description="Research a topic", + expected_output="Research results", + agent=researcher, + ) + + task2 = Task( + description="Write an article", + expected_output="Written article", + agent=writer, + ) + + # Create crew + crew = Crew( + agents=[researcher, writer, editor], + tasks=[task1, task2], + ) + + # Test that the _add_delegation_tools method respects the delegate_to property + tools = [] + tools_with_delegation = crew._add_delegation_tools(task1, tools) + + # Verify that delegation tools are added + assert len(tools_with_delegation) > 0 + + # Find the delegation tool + delegate_tool = None + for tool in tools_with_delegation: + if "Delegate" in tool.name: + delegate_tool = tool + break + + assert delegate_tool is not None + + # Verify that the delegation tool only includes the writer + assert "Writer" in delegate_tool.description + assert "Editor" not in delegate_tool.description + assert "Researcher" not in delegate_tool.description + + # Test delegation for the writer + tools = [] + tools_with_delegation = crew._add_delegation_tools(task2, tools) + + # Find the delegation tool + delegate_tool = None + for tool in tools_with_delegation: + if "Delegate" in tool.name: + delegate_tool = tool + break + + assert delegate_tool is not None + + # Verify that the delegation tool only includes the editor + assert "Editor" in delegate_tool.description + assert "Writer" not in delegate_tool.description + assert "Researcher" not in delegate_tool.description + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_manager_agent_with_tools_and_delegation(): + """Test that a manager agent can have tools and still delegate to all agents.""" + from crewai.tools.base_tool import BaseTool + + # Create a simple tool for the manager + class SimpleTestTool(BaseTool): + name: str = "Simple Test Tool" + description: str = "A simple test tool" + + def _run(self) -> str: + return "Tool executed" + + # Create agents + researcher = Agent( + role="Researcher", + goal="Research information", + backstory="You're an expert researcher", + ) + + writer = Agent( + role="Writer", + goal="Write content", + backstory="You're an expert writer", + ) + + # Create a manager agent with tools + manager = Agent( + role="Manager", + goal="Manage the team", + backstory="You're an expert manager", + tools=[SimpleTestTool()], + allow_delegation=True, + ) + + # Create a crew with the manager agent + crew = Crew( + agents=[researcher, writer], + manager_agent=manager, + process=Process.hierarchical, + ) + + # Explicitly call _create_manager_agent to set up delegation + crew._create_manager_agent() + + # Verify that the manager agent has tools + assert manager.tools is not None + assert len(manager.tools) == 1 + assert manager.tools[0].name == "Simple Test Tool" + + # Verify that the manager agent can delegate to all agents + assert manager.allow_delegation is True + assert manager.delegate_to == crew.agents + + # Create a task + task = Task( + description="Complete a project", + expected_output="Project completed", + ) + + # Create a crew with the task + crew = Crew( + agents=[researcher, writer], + manager_agent=manager, + tasks=[task], + process=Process.hierarchical, + ) + + # Mock the execute_task method to avoid actual execution + with patch.object(Agent, "execute_task", return_value="Task executed"): + # Run the crew + result = crew.kickoff() + + # Verify that the result is as expected + assert result.raw == "Task executed" + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_crew_with_default_delegation(): + """Test that an agent with allow_delegation=True but without delegate_to specified can delegate to all agents in the crew.""" + # Create agents + researcher = Agent( + role="Researcher", + goal="Research information", + backstory="You're an expert researcher", + allow_delegation=True, # Allow delegation but don't specify delegate_to + ) + + writer = Agent( + role="Writer", + goal="Write content", + backstory="You're an expert writer", + allow_delegation=True, # Allow delegation but don't specify delegate_to + ) + + editor = Agent( + role="Editor", + goal="Edit content", + backstory="You're an expert editor", + allow_delegation=True, # Allow delegation but don't specify delegate_to + ) + + # Create tasks + task1 = Task( + description="Research a topic", + expected_output="Research results", + agent=researcher, + ) + + task2 = Task( + description="Write content based on research", + expected_output="Written content", + agent=writer, + ) + + task3 = Task( + description="Edit the content", + expected_output="Edited content", + agent=editor, + ) + + # Create crew + crew = Crew( + agents=[researcher, writer, editor], + tasks=[task1, task2, task3], + ) + + # Verify that all agents have allow_delegation=True + for agent in crew.agents: + assert agent.allow_delegation is True + # Verify that delegate_to is None (default delegation to all) + assert agent.delegate_to is None + + # Get delegation tools for researcher + delegation_tools = researcher.get_delegation_tools(crew.agents) + + # Verify that tools for all agents are returned + assert len(delegation_tools) == 2 # Delegate and Ask tools + + # Check that the tools can delegate to all agents + delegate_tool = delegation_tools[0] + ask_tool = delegation_tools[1] + + # Verify the tools description includes all agents + assert "Researcher" in delegate_tool.description + assert "Writer" in delegate_tool.description + assert "Editor" in delegate_tool.description + assert "Researcher" in ask_tool.description + assert "Writer" in ask_tool.description + assert "Editor" in ask_tool.description + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_update_manager_tools_functionality(): + """Test that _update_manager_tools correctly adds delegation tools to the manager agent.""" + # Create agents + researcher = Agent( + role="Researcher", + goal="Research information", + backstory="You're an expert researcher", + ) + + writer = Agent( + role="Writer", + goal="Write content", + backstory="You're an expert writer", + ) + + # Create a manager agent + manager = Agent( + role="Manager", + goal="Manage the team", + backstory="You're an expert manager", + allow_delegation=True, + ) + + # Create a crew with the manager agent + crew = Crew( + agents=[researcher, writer], + manager_agent=manager, + process=Process.hierarchical, + ) + + # Ensure the manager agent is set up + crew._create_manager_agent() + + # Case 1: Task with an assigned agent + task_with_agent = Task( + description="Research a topic", + expected_output="Research results", + agent=researcher, + ) + + # Create an initial set of tools + from crewai.tools.base_tool import BaseTool + + class TestTool(BaseTool): + name: str = "Test Tool" + description: str = "A test tool" + + def _run(self) -> str: + return "Tool executed" + + initial_tools = [TestTool()] + + # Test _update_manager_tools with a task that has an agent + updated_tools = crew._update_manager_tools(task_with_agent, initial_tools) + + # Verify that delegation tools for the task's agent were added + assert len(updated_tools) > len(initial_tools) + assert any( + f"Delegate a specific task to one of the following coworkers: {researcher.role}" + in tool.description + for tool in updated_tools + ) + assert any( + f"Ask a specific question to one of the following coworkers: {researcher.role}" + in tool.description + for tool in updated_tools + ) + + # Case 2: Task without an assigned agent + task_without_agent = Task( + description="General task", + expected_output="Task completed", + ) + + # Test _update_manager_tools with a task that doesn't have an agent + updated_tools = crew._update_manager_tools(task_without_agent, initial_tools) + + # Verify that delegation tools for all agents were added + assert len(updated_tools) > len(initial_tools) + assert any( + f"Delegate a specific task to one of the following coworkers: {researcher.role}, {writer.role}" + in tool.description + for tool in updated_tools + ) + assert any( + f"Ask a specific question to one of the following coworkers: {researcher.role}, {writer.role}" + in tool.description + for tool in updated_tools + ) + + +@pytest.mark.vcr(filter_headers=["authorization"]) +def test_manager_tools_during_task_execution(): + """Test that manager tools are correctly added during task execution in a hierarchical process.""" + # Create agents + researcher = Agent( + role="Researcher", + goal="Research information", + backstory="You're an expert researcher", + ) + + writer = Agent( + role="Writer", + goal="Write content", + backstory="You're an expert writer", + ) + + # Create tasks + task_with_agent = Task( + description="Research a topic", + expected_output="Research results", + agent=researcher, + ) + + task_without_agent = Task( + description="General task", + expected_output="Task completed", + ) + + # Create a crew with hierarchical process + crew_with_agent_task = Crew( + agents=[researcher, writer], + tasks=[task_with_agent], + process=Process.hierarchical, + manager_llm="gpt-4o", + ) + + crew_without_agent_task = Crew( + agents=[researcher, writer], + tasks=[task_without_agent], + process=Process.hierarchical, + manager_llm="gpt-4o", + ) + + # Mock task execution to capture the tools + mock_task_output = TaskOutput( + description="Mock description", raw="mocked output", agent="mocked agent" + ) + + # Test case 1: Task with an assigned agent + with patch.object( + Task, "execute_sync", return_value=mock_task_output + ) as mock_execute_sync: + # Set the output attribute to avoid None errors + task_with_agent.output = mock_task_output + + # Execute the crew + crew_with_agent_task.kickoff() + + # Verify execute_sync was called + mock_execute_sync.assert_called_once() + + # Get the tools argument from the call + _, kwargs = mock_execute_sync.call_args + tools = kwargs["tools"] + + # Verify that delegation tools for the task's agent were added + assert any( + f"Delegate a specific task to one of the following coworkers: {researcher.role}" + in tool.description + for tool in tools + ) + assert any( + f"Ask a specific question to one of the following coworkers: {researcher.role}" + in tool.description + for tool in tools + ) + + # Test case 2: Task without an assigned agent + with patch.object( + Task, "execute_sync", return_value=mock_task_output + ) as mock_execute_sync: + # Set the output attribute to avoid None errors + task_without_agent.output = mock_task_output + + # Execute the crew + crew_without_agent_task.kickoff() + + # Verify execute_sync was called + mock_execute_sync.assert_called_once() + + # Get the tools argument from the call + _, kwargs = mock_execute_sync.call_args + tools = kwargs["tools"] + + # Verify that delegation tools for all agents were added + assert any( + f"Delegate a specific task to one of the following coworkers: {researcher.role}, {writer.role}" + in tool.description + for tool in tools + ) + assert any( + f"Ask a specific question to one of the following coworkers: {researcher.role}, {writer.role}" + in tool.description + for tool in tools + )