Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async run and deprecate run_once #234

Merged
merged 3 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/controlflow/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
handle_tool_call_async,
)
from controlflow.utilities.context import ctx
from controlflow.utilities.types import ControlFlowModel, hash_objects
from controlflow.utilities.general import ControlFlowModel, hash_objects

from .memory import Memory
from .names import AGENTS
Expand Down Expand Up @@ -245,7 +245,7 @@ async def _run_async(self, context: "AgentContext"):
context.add_tools(self.get_tools())
context.add_instructions(get_instructions())
messages = context.compile_messages(agent=self)
async for event in await self._run_model(
async for event in self._run_model_async(
messages=messages, tools=context.tools
):
context.handle_event(event)
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/agents/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pydantic import Field

from controlflow.utilities.context import ctx
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

if TYPE_CHECKING:
from controlflow.tools import Tool
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import controlflow.utilities
import controlflow.utilities.logging
from controlflow.llm.models import BaseChatModel
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

from .agents import Agent
from .events.history import History, InMemoryHistory
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/events/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pydantic import Field

from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

if TYPE_CHECKING:
from controlflow.agents.agent import BaseAgent
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/events/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import controlflow
from controlflow.events.base import Event
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

# This is a global variable that will be shared between all instances of InMemoryStore
IN_MEMORY_STORE = {}
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from controlflow.flows.graph import Graph
from controlflow.tasks.task import Task
from controlflow.utilities.context import ctx
from controlflow.utilities.general import ControlFlowModel
from controlflow.utilities.logging import get_logger
from controlflow.utilities.prefect import prefect_flow_context
from controlflow.utilities.types import ControlFlowModel

if TYPE_CHECKING:
from controlflow.orchestration.agent_context import AgentContext
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/llm/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from langchain_openai import AzureChatOpenAI, ChatOpenAI

from controlflow.llm.models import BaseChatModel
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel


class LLMRules(ControlFlowModel):
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/orchestration/agent_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from controlflow.tasks.task import Task
from controlflow.tools.tools import Tool, as_tools
from controlflow.utilities.context import ctx
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

__all__ = [
"AgentContext",
Expand Down
38 changes: 37 additions & 1 deletion src/controlflow/orchestration/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
create_task_success_tool,
)
from controlflow.tools.tools import Tool
from controlflow.utilities.general import ControlFlowModel
from controlflow.utilities.prefect import prefect_task as prefect_task
from controlflow.utilities.types import ControlFlowModel

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,6 +120,42 @@ def run(self, steps: Optional[int] = None):
self.handle_event(OrchestratorEnd(orchestrator=self))
i += 1

async def run_async(self, steps: Optional[int] = None):
from controlflow.events.orchestrator_events import (
OrchestratorEnd,
OrchestratorError,
OrchestratorStart,
)

i = 0
while any(t.is_incomplete() for t in self.tasks) and i < (steps or math.inf):
self.handle_event(OrchestratorStart(orchestrator=self))

try:
ready_tasks = self.get_ready_tasks()
if not ready_tasks:
return
agent = self.get_agent(task=ready_tasks[0])
tasks = self.get_agent_tasks(agent=agent, ready_tasks=ready_tasks)
tools = self.get_tools(tasks=tasks)

context = AgentContext(
flow=self.flow,
tasks=tasks,
agents=[agent],
tools=tools,
handlers=self.handlers,
)
with context:
await agent._run_async(context=context)

except Exception as exc:
self.handle_event(OrchestratorError(orchestrator=self, error=exc))
raise
finally:
self.handle_event(OrchestratorEnd(orchestrator=self))
i += 1

def get_ready_tasks(self) -> list[Task]:
all_tasks = self.flow.graph.upstream_tasks(self.tasks)
ready_tasks = [t for t in all_tasks if t.is_ready()]
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/orchestration/prompt_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from controlflow.flows import Flow
from controlflow.orchestration.agent_context import AgentContext
from controlflow.tasks.task import Task
from controlflow.utilities.general import ControlFlowModel
from controlflow.utilities.jinja import prompt_env
from controlflow.utilities.types import ControlFlowModel


class Template(ControlFlowModel):
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/planning/auto_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from controlflow.agents import Agent
from controlflow.tasks.task import Task
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

ToolLiteral = TypeVar("ToolLiteral", bound=str)

Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/planning/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from controlflow.flows import Flow
from controlflow.tasks.task import Task
from controlflow.tools import Tool, as_tools
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel

ToolLiteral = TypeVar("ToolLiteral", bound=str)

Expand Down
37 changes: 23 additions & 14 deletions src/controlflow/tasks/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@
from controlflow.tools import Tool
from controlflow.tools.talk_to_user import talk_to_user
from controlflow.utilities.context import ctx
from controlflow.utilities.logging import get_logger
from controlflow.utilities.general import (
NOTSET,
ControlFlowModel,
hash_objects,
)
from controlflow.utilities.logging import deprecated, get_logger
from controlflow.utilities.prefect import PrefectTrackingTask
from controlflow.utilities.prefect import prefect_task as prefect_task
from controlflow.utilities.tasks import (
collect_tasks,
visit_task_collection,
)
from controlflow.utilities.types import (
NOTSET,
ControlFlowModel,
hash_objects,
)

if TYPE_CHECKING:
from controlflow.flows import Flow
Expand Down Expand Up @@ -89,7 +89,6 @@ class Task(ControlFlowModel):
agent=AgentTeam(agents=[...]) instead. The agents assigned to the
task. If not provided, agents "will be inferred from the parent
task, flow, or global default.""",
validate_default=True,
)
context: dict = Field(
default_factory=dict,
Expand Down Expand Up @@ -165,10 +164,10 @@ def __init__(
).strip()
if agents:
if kwargs.get("agent") is None:
logger.warning(
logger.warn(
'Passing a list of agents to the "agents" argument is '
"deprecated and will be removed in future versions. "
"Please provide a single agent or team of agents instead."
"deprecated as of version 0.9, and will be removed in future versions. "
"Please provide a single agent or team of agents instead.",
)
from controlflow.agents.teams import Team

Expand Down Expand Up @@ -351,8 +350,8 @@ def run(
else:
if steps:
logger.warning(
"It is not recommended to call Task.run() without a flow "
"argument when steps are provided, because the history will be lost."
"Running a task with a steps argument but no flow is not "
"recommended, because the agent's history will be lost."
)
flow = Flow()

Expand Down Expand Up @@ -391,8 +390,8 @@ async def run_async(
else:
if steps:
logger.warning(
"It is not recommended to call Task.run_async() without a flow "
"argument when steps are provided, because the history will be lost."
"Running a task with a steps argument but no flow is not "
"recommended, because the agent's history will be lost."
)
flow = Flow()
from controlflow.orchestration import Orchestrator
Expand Down Expand Up @@ -547,6 +546,16 @@ def generate_subtasks(self, instructions: str = None, agent: "Agent" = None):
context=self.context,
)

# Deprecated ---------------------------

@deprecated("Use Task.run(steps=1) instead.", version="0.9")
def run_once(self, *args, **kwargs):
return self.run(*args, steps=1, **kwargs)

@deprecated("Use Task.run_async(steps=1) instead.", version="0.9")
async def run_once_async(self, *args, **kwargs):
return await self.run_async(*args, steps=1, **kwargs)


def validate_result(result: Any, result_type: type[T]) -> T:
if result_type is None and result is not None:
Expand Down
2 changes: 1 addition & 1 deletion src/controlflow/tools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from pydantic import Field, PydanticSchemaGenerationError, TypeAdapter

import controlflow
from controlflow.utilities.general import ControlFlowModel
from controlflow.utilities.prefect import create_markdown_artifact, prefect_task
from controlflow.utilities.types import ControlFlowModel

TOOL_CALL_FUNCTION_RESULT_TEMPLATE = """
# Tool call: {name}
Expand Down
29 changes: 29 additions & 0 deletions src/controlflow/utilities/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,32 @@ def get_logger(name: Optional[str] = None) -> logging.Logger:
logger = parent_logger

return logger


def deprecated(message: str, version: str):
"""
Decorator to mark a function as deprecated.

Args:
message (str): The deprecation message.
version (str): The version in which the function is deprecated.

Returns:
function: The decorated function.

Example:
@deprecated("This function is deprecated", "1.0")
def my_function():
pass
"""

def decorator(func):
def wrapper(*args, **kwargs):
get_logger(__file__).warn(
f"WARNING: {func.__name__} is deprecated as of version {version}. {message}".strip(),
)
return func(*args, **kwargs)

return wrapper

return decorator
2 changes: 1 addition & 1 deletion src/controlflow/utilities/prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from pydantic import TypeAdapter

import controlflow
from controlflow.utilities.types import ControlFlowModel
from controlflow.utilities.general import ControlFlowModel


def prefect_task(*args, **kwargs):
Expand Down
13 changes: 13 additions & 0 deletions tests/tasks/test_deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from controlflow.utilities.testing import SimpleTask


def test_run_once(default_fake_llm, caplog):
default_fake_llm.set_responses(["Hello"])
SimpleTask().run_once()
assert "run_once is deprecated" in caplog.text


async def test_run_once_async(default_fake_llm, caplog):
default_fake_llm.set_responses(["Hello"])
await SimpleTask().run_once_async()
assert "run_once_async is deprecated" in caplog.text
20 changes: 20 additions & 0 deletions tests/tasks/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ def test_task_loads_agent_from_parent_before_flow():
assert child.get_agent() == agent2


class TestDeprecated:
def test_warn_on_steps_without_flow(self, default_fake_llm, caplog):
default_fake_llm.set_responses(["Hi."])
task = SimpleTask()
task.run(steps=1)
assert (
"Running a task with a steps argument but no flow is not recommended"
in caplog.text
)

async def test_warn_on_steps_without_flow_async(self, default_fake_llm, caplog):
default_fake_llm.set_responses(["Hi."])
task = SimpleTask()
await task.run_async(steps=1)
assert (
"Running a task with a steps argument but no flow is not recommended"
in caplog.text
)


class TestFlowRegistration:
def test_task_tracking(self):
with Flow() as flow:
Expand Down