Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions python/packages/autogen-ext/src/autogen_ext/teams/magentic_one.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,61 @@
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.openai._openai_client import BaseOpenAIChatCompletionClient

# Docker imports for default code executor
try:
import docker
from docker.errors import DockerException

from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor

_docker_available = True
except ImportError:
docker = None # type: ignore
DockerException = Exception # type: ignore
DockerCommandLineCodeExecutor = None # type: ignore
_docker_available = False

SyncInputFunc = Callable[[str], str]
AsyncInputFunc = Callable[[str, Optional[CancellationToken]], Awaitable[str]]
InputFuncType = Union[SyncInputFunc, AsyncInputFunc]


def _is_docker_available() -> bool:
"""Check if Docker is available and running."""
if not _docker_available:
return False

try:
if docker is not None:
client = docker.from_env()
client.ping() # type: ignore
return True
except DockerException:
return False

return False


def _create_default_code_executor() -> CodeExecutor:
"""Create the default code executor, preferring Docker if available."""
if _is_docker_available() and DockerCommandLineCodeExecutor is not None:
try:
return DockerCommandLineCodeExecutor()
except Exception:
# Fallback to local if Docker fails to initialize
pass

# Issue warning and use local executor if Docker is not available
warnings.warn(
"Docker is not available or not running. Using LocalCommandLineCodeExecutor instead of the recommended DockerCommandLineCodeExecutor. "
"For security, it is recommended to install Docker and ensure it's running before using MagenticOne. "
"To install Docker, visit: https://docs.docker.com/get-docker/",
UserWarning,
stacklevel=3,
)
return LocalCommandLineCodeExecutor()


class MagenticOne(MagenticOneGroupChat):
"""
MagenticOne is a specialized group chat class that integrates various agents
Expand Down Expand Up @@ -77,7 +127,7 @@ class MagenticOne(MagenticOneGroupChat):

async def example_usage():
client = OpenAIChatCompletionClient(model="gpt-4o")
m1 = MagenticOne(client=client)
m1 = MagenticOne(client=client) # Uses DockerCommandLineCodeExecutor by default
task = "Write a Python script to fetch data from an API."
result = await Console(m1.run_stream(task=task))
print(result)
Expand All @@ -89,20 +139,22 @@ async def example_usage():

.. code-block:: python

# Enable human-in-the-loop mode
# Enable human-in-the-loop mode with explicit Docker executor
import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.teams.magentic_one import MagenticOne
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_agentchat.ui import Console


async def example_usage_hil():
client = OpenAIChatCompletionClient(model="gpt-4o")
# to enable human-in-the-loop mode, set hil_mode=True
m1 = MagenticOne(client=client, hil_mode=True)
task = "Write a Python script to fetch data from an API."
result = await Console(m1.run_stream(task=task))
print(result)
# Explicitly specify Docker code executor for better security
async with DockerCommandLineCodeExecutor() as code_executor:
m1 = MagenticOne(client=client, hil_mode=True, code_executor=code_executor)
task = "Write a Python script to fetch data from an API."
result = await Console(m1.run_stream(task=task))
print(result)


if __name__ == "__main__":
Expand Down Expand Up @@ -134,11 +186,11 @@ def __init__(

if code_executor is None:
warnings.warn(
"Instantiating MagenticOne without a code_executor is deprecated. Provide a code_executor to clear this warning (e.g., code_executor=LocalCommandLineCodeExecutor() ).",
"Instantiating MagenticOne without a code_executor is deprecated. Provide a code_executor to clear this warning (e.g., code_executor=DockerCommandLineCodeExecutor() ).",
DeprecationWarning,
stacklevel=2,
)
code_executor = LocalCommandLineCodeExecutor()
code_executor = _create_default_code_executor()

fs = FileSurfer("FileSurfer", model_client=client)
ws = MultimodalWebSurfer("WebSurfer", model_client=client)
Expand Down
1 change: 1 addition & 0 deletions python/packages/autogen-ext/tests/teams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Init file for teams tests."""
138 changes: 138 additions & 0 deletions python/packages/autogen-ext/tests/teams/test_magentic_one.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Tests for MagenticOne team."""

import os
import warnings
from unittest.mock import Mock, patch

import pytest
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_core.models import ChatCompletionClient
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.teams.magentic_one import MagenticOne


def docker_tests_enabled() -> bool:
"""Check if Docker tests should be enabled."""
if os.environ.get("SKIP_DOCKER", "unset").lower() == "true":
return False

try:
import docker
from docker.errors import DockerException
except ImportError:
return False

try:
client = docker.from_env()
client.ping() # type: ignore
return True
except DockerException:
return False


def _is_docker_available() -> bool:
"""Local implementation of Docker availability check."""
return docker_tests_enabled()


@pytest.fixture
def mock_chat_client() -> Mock:
"""Create a mock chat completion client."""
mock_client = Mock(spec=ChatCompletionClient)
mock_client.model_info = {"function_calling": True, "json_output": True, "vision": True}
return mock_client


@pytest.mark.skipif(not docker_tests_enabled(), reason="Docker is not available")
def test_magentic_one_uses_docker_by_default(mock_chat_client: Mock) -> None:
"""Test that MagenticOne uses Docker code executor by default when Docker is available."""
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor

with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)

m1 = MagenticOne(client=mock_chat_client)

# Find the CodeExecutorAgent in the participants list
code_executor_agent = None
for agent in m1._participants: # type: ignore[reportPrivateUsage]
if isinstance(agent, CodeExecutorAgent):
code_executor_agent = agent
break

assert code_executor_agent is not None, "CodeExecutorAgent not found"
assert isinstance(
code_executor_agent._code_executor, # type: ignore[reportPrivateUsage]
DockerCommandLineCodeExecutor, # type: ignore[reportPrivateUsage]
), f"Expected DockerCommandLineCodeExecutor, got {type(code_executor_agent._code_executor)}" # type: ignore[reportPrivateUsage]


def test_docker_availability_check() -> None:
"""Test the Docker availability check function."""
# This test should pass regardless of Docker availability
result = _is_docker_available()
assert isinstance(result, bool)


@patch("autogen_ext.teams.magentic_one._is_docker_available")
def test_magentic_one_falls_back_to_local_when_docker_unavailable(
mock_docker_check: Mock, mock_chat_client: Mock
) -> None:
"""Test that MagenticOne falls back to local executor when Docker is not available."""
mock_docker_check.return_value = False

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")

m1 = MagenticOne(client=mock_chat_client)

# Find the CodeExecutorAgent in the participants list
code_executor_agent = None
for agent in m1._participants: # type: ignore[reportPrivateUsage]
if isinstance(agent, CodeExecutorAgent):
code_executor_agent = agent
break

assert code_executor_agent is not None, "CodeExecutorAgent not found"
assert isinstance(
code_executor_agent._code_executor, # type: ignore[reportPrivateUsage]
LocalCommandLineCodeExecutor, # type: ignore[reportPrivateUsage]
), f"Expected LocalCommandLineCodeExecutor, got {type(code_executor_agent._code_executor)}" # type: ignore[reportPrivateUsage]

# Check that appropriate warnings were issued
warning_messages = [str(warning.message) for warning in w]
docker_warning_found = any("Docker is not available" in msg for msg in warning_messages)
deprecated_warning_found = any(
"Instantiating MagenticOne without a code_executor is deprecated" in msg for msg in warning_messages
)

assert docker_warning_found, f"Docker unavailable warning not found in: {warning_messages}"
assert deprecated_warning_found, f"Deprecation warning not found in: {warning_messages}"


def test_magentic_one_with_explicit_code_executor(mock_chat_client: Mock) -> None:
"""Test that MagenticOne uses the provided code executor when explicitly given."""
explicit_executor = LocalCommandLineCodeExecutor()

with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")

m1 = MagenticOne(client=mock_chat_client, code_executor=explicit_executor)

# Find the CodeExecutorAgent in the participants list
code_executor_agent = None
for agent in m1._participants: # type: ignore[reportPrivateUsage]
if isinstance(agent, CodeExecutorAgent):
code_executor_agent = agent
break

assert code_executor_agent is not None, "CodeExecutorAgent not found"
assert code_executor_agent._code_executor is explicit_executor, "Expected the explicitly provided code executor" # type: ignore[reportPrivateUsage]

# No deprecation warning should be issued when explicitly providing a code executor
warning_messages = [str(warning.message) for warning in w]
deprecated_warning_found = any(
"Instantiating MagenticOne without a code_executor is deprecated" in msg for msg in warning_messages
)

assert not deprecated_warning_found, f"Unexpected deprecation warning found: {warning_messages}"
Loading