Skip to content

Commit

Permalink
Merge pull request #214 from PrefectHQ/run
Browse files Browse the repository at this point in the history
Change run_once to steps
  • Loading branch information
jlowin authored Jul 5, 2024
2 parents db81d1d + 75d2904 commit d36ed8e
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 146 deletions.
8 changes: 3 additions & 5 deletions docs/concepts/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ This is useful for testing tasks in isolation or running them as standalone unit

### Controlling Iteration

The `run()` method starts a loop and orchestrates the execution of the task, its subtasks, and dependencies until the task is completed. If you need more fine-grained control over task execution, you can use the `run_once()` method to execute only a single step of the graph.
The `run()` method starts a loop and orchestrates the execution of the task, its subtasks, and dependencies until the task is completed. If you need more fine-grained control over task execution, you can provide a `steps` argument to control the number of iterations of the [agentic loop](/guides/agentic-loop).

```python
from controlflow import Flow, Task
Expand All @@ -272,8 +272,6 @@ poem_task = Task(

with Flow():
while poem_task.is_incomplete():
poem_task.run_once()
poem_task.run(steps=1)
print(poem_task.result)
```

Note that run_once requires the task to be run within a flow context, as it relies on the flow to manage the task's execution and history over each invocation.
```
22 changes: 8 additions & 14 deletions docs/guides/agentic-loop.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ In this way, tasks act as contracts between the developer and the agents. The de

Another challenge in agentic workflows is controlling the execution of the loop - including starting it! Developers need the ability to run the loop until completion or step through it iteratively for finer control and debugging. Since there is no single software object that represents the loop itself, ControlFlow ensures that developers have a variety of tools for managing its execution.

Most ControlFlow objects provide two methods for executing the agentic loop: `run()` and `run_once()`:

- `run()`: Executes the loop until the object is in a completed state. For tasks, this means running until that task is complete; For flows, it means running until all tasks within the flow are complete. At each step, the system will make decisions about what to do next based on the current state of the workflow.
- `run_once()`: Executes a single iteration of the loop, allowing developers to step through the workflow incrementally. For example, developers could use this method to control exactly which agent is invoked at each step, or to provide ad-hoc instructions to the agent that only last for a single iteration.
Most ControlFlow objects have a `run()` method that can be used to start the loop. This method will run the loop until the object is in a completed state. For tasks, this means running until that task is complete; For flows, it means running until all tasks within the flow are complete. At each step, the system will make decisions about what to do next based on the current state of the workflow.

Consider the following illustrative setup, which involves two dependent tasks in a flow:
```python
Expand All @@ -70,26 +67,23 @@ with cf.Flow() as flow:
t1 = cf.Task('Choose a language')
t2 = cf.Task('Say hello', context=dict(language=t1))
```

### The `run()` Method
Now, let's explore how the `run()` and `run_once()` methods can be used to control the execution of the loop. First, the behavior of the various `run()` methods:
Here is how the various `run()` methods would behave in this scenario:

- Calling `t1.run()` would execute the loop until `t1` is complete.
- Calling `t2.run()` would execute the loop until both `t2` is complete, which would also require completing `t1` because it is a dependency of `t2`.
- Calling `flow.run()` would execute the loop until both `t1` and `t2` are complete.

In general, `run()` tells the system to run the loop until the object is complete (and has a result available). It is the most common way to eagerly run workflows using the [imperative API](/guides/apis).

### The `run_once()` Method
Next, the behavior of the various `run_once()` methods:
### Running a specific number of steps
Sometimes, you may want to run the loop for a specific number of steps, rather than until completion. You

- Calling `t1.run_once()` would execute a single iteration of the loop, starting with `t1`.
- Calling `t2.run_once()` would execute a single iteration of the loop, starting with `t1`.
- Calling `flow.run_once()` would execute a single iteration of the loop, starting with `t1`.
- Calling `t1.run(steps=1)` would execute a single iteration of the loop, starting with `t1`.
- Calling `t2.run(steps=2)` would execute two iterations of the loop, starting with `t1`.
- Calling `flow.run(steps=1)` would execute a single iteration of the loop, starting with `t1`.

Note that since `run_once()` always runs a single iteration, in all three cases it would focus on the first task in the flow, which is `t1`. However, the behavior of these three calls in practice could be different. For example, you could call `t1.run_once()` before `t2` was created, in which case knowledge of `t2` would not be included in the prompt. This could lead to different behavior than if you called `t2.run_once()`, even though both methods would start by running `t1`.
Note that all three cases begin with the first task in the flow, `t1`. However, in practice the behavior of these three calls could be different. For example, you could call `t1.run(steps=1)` before `t2` was created, in which case knowledge of `t2` would not be included in the prompt. This could lead to different behavior than if you called `t2.run(steps=1)`, even though both methods would start by running `t1`.

By offering these execution methods, ControlFlow gives developers the flexibility to either let the loop run autonomously or manually guide its execution, depending on their specific requirements.

<Tip>
Note that when using the `@task` and `@flow` decorators in the [functional API](/guides/apis), the `run()` method is automatically called when the decorated function is invoked. This is because the functional API uses [eager execution](/guides/execution-modes) by default.
Expand Down
2 changes: 1 addition & 1 deletion docs/guides/execution-modes.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Lazy execution is generally recommended because it permits the orchestration eng

This can lead to more efficient execution, especially in complex workflows with many dependencies.

In addition, lazy execution allows you to exercise more precise control over how and when tasks are executed. Instead of running tasks to completion, you can use `run_once()` to run a single step or the agentic loop, or assign a specific agent to work on the task.
In addition, lazy execution allows you to exercise more precise control over how and when tasks are executed. Instead of running tasks to completion, you can use `run(steps=1)` to run a single step or the agentic loop, or assign a specific agent to work on the task.

### When Do Lazy Tasks Run?

Expand Down
10 changes: 5 additions & 5 deletions docs/patterns/instructions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The effect of the instructions depends on whether you're creating or running tas

### Providing instructions for specific agent interactions

You can use `instructions()` with `run_once()` in a loop to provide targeted, temporary guidance for specific agent interactions within a task. This is particularly useful when you want to steer the agent's behavior based on the current state of the task or external conditions.
You can use `instructions()` in a loop to provide targeted, temporary guidance for specific agent interactions within a task. This is particularly useful when you want to steer the agent's behavior based on the current state of the task or external conditions.

```python
import controlflow as cf
Expand All @@ -59,12 +59,12 @@ def guided_conversation_flow():
while conversation.is_incomplete():
if some_condition:
with cf.instructions("Steer the conversation towards travel"):
conversation.run_once()
conversation.run(steps=1)
elif some_other_condition:
with cf.instructions("Try to wrap up the conversation politely"):
conversation.run_once()
conversation.run(steps=1)
else:
conversation.run_once()
conversation.run(steps=1)

return conversation.result
```
Expand Down Expand Up @@ -93,4 +93,4 @@ In these cases, the `instructions()` context manager provides a flexible way to
4. Use instructions judiciously. Overusing ad-hoc instructions can lead to inconsistent agent behavior and make your workflow harder to understand and maintain.
5. Prefer the `instructions` parameter for truly permanent guidance. Only use the context manager when you need the added flexibility or temporary nature.

By using the `instructions()` context manager appropriately, you can fine-tune agent behavior on the fly, adapting to the dynamic needs of your workflow without sacrificing the structure and reusability provided by well-defined tasks. The ability to provide targeted guidance for specific agent interactions using `run_once()` further enhances the power and flexibility of this feature.
By using the `instructions()` context manager appropriately, you can fine-tune agent behavior on the fly, adapting to the dynamic needs of your workflow without sacrificing the structure and reusability provided by well-defined tasks.
4 changes: 2 additions & 2 deletions docs/patterns/stopping-tasks-early.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def iterative_task_flow():
task = cf.Task("Generate a comprehensive report on AI trends")

while task.is_incomplete():
task.run_once()
task.run(steps=1)

# Optionally, you can add logic here to modify the task,
# create new tasks, or make decisions based on other results
Expand All @@ -36,4 +36,4 @@ This pattern is particularly useful when:
- You want to implement custom logic between iterations
- You need to dynamically adjust the workflow based on other results

By using a while loop with `task.is_incomplete()`, you ensure that the agent continues working until the task is either marked as complete or fails. The `run_once()` method allows for step-by-step execution, giving you control over each iteration of the agentic loop.
By using a while loop with `task.is_incomplete()`, you ensure that the agent continues working until the task is either marked as complete or fails.
2 changes: 1 addition & 1 deletion docs/reference/task-decorator.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The `lazy` parameter determines whether the task should be executed eagerly or l

The default `lazy` behavior is determined by the global `eager_mode` setting in ControlFlow. Eager mode is enabled by default, which means that tasks are executed immediately. The `lazy` parameter allows you to override this behavior for a specific task.

When `lazy` is set to `True`, the task is not executed immediately. Instead, a `Task` instance is returned, representing the deferred execution of the task. The task can be run later using the `run()` or `run_once()` methods.
When `lazy` is set to `True`, the task is not executed immediately. Instead, a `Task` instance is returned, representing the deferred execution of the task. The task can be run later using the `run()` method.

When `lazy` is set to `False` (default), the task is executed immediately when the decorated function is called. Setting `lazy=False` ensures the task is executed eagerly, even if the global `eager_mode` is disabled.
</ParamField>
Expand Down
14 changes: 4 additions & 10 deletions src/controlflow/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,11 @@ def __enter__(self):
def __exit__(self, *exc_info):
return self._cm_stack.pop().__exit__(*exc_info)

def run_once(self, task: "Task"):
return task.run_once(agents=[self])
def run(self, task: "Task", steps: Optional[int] = None):
return task.run(agents=[self], steps=steps)

async def run_once_async(self, task: "Task"):
return await task.run_once_async(agents=[self])

def run(self, task: "Task"):
return task.run(agents=[self])

async def run_async(self, task: "Task"):
return await task.run_async(agents=[self])
async def run_async(self, task: "Task", steps: Optional[int] = None):
return await task.run_async(agents=[self], steps=steps)

def _run_model(
self,
Expand Down
10 changes: 5 additions & 5 deletions src/controlflow/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,23 @@ def create_context(self, create_prefect_flow_context: bool = True):
with ctx(**ctx_args), prefect_ctx:
yield self

def run_once(self):
def run(self, steps: Optional[int] = None):
"""
Runs one step of the flow.
Runs the flow.
"""
from controlflow.orchestration import Controller

controller = Controller(flow=self)
controller.run_once()
controller.run(steps=steps)

def run(self):
async def run_async(self, steps: Optional[int] = None):
"""
Runs the flow.
"""
from controlflow.orchestration import Controller

controller = Controller(flow=self)
controller.run()
await controller.run_async(steps=steps)


def get_flow() -> Optional[Flow]:
Expand Down
96 changes: 44 additions & 52 deletions src/controlflow/orchestration/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,52 +91,6 @@ def handle_event(
if event.persist:
self.flow.add_events([event])

def run_once(self):
"""
Core pipeline for running the controller.
"""
from controlflow.events.controller_events import (
ControllerEnd,
ControllerError,
ControllerStart,
)

self.handle_event(ControllerStart(controller=self))

try:
ready_tasks = self.get_ready_tasks()
context = self.get_agent_context(ready_tasks=ready_tasks)
context.run()

except Exception as exc:
self.handle_event(ControllerError(controller=self, error=exc))
raise
finally:
self.handle_event(ControllerEnd(controller=self))

async def run_once_async(self):
"""
Core pipeline for running the controller.
"""
from controlflow.events.controller_events import (
ControllerEnd,
ControllerError,
ControllerStart,
)

self.handle_event(ControllerStart(controller=self))

try:
ready_tasks = self.get_ready_tasks()
context = self.get_agent_context(ready_tasks=ready_tasks)
await context.run_async()

except Exception as exc:
self.handle_event(ControllerError(controller=self, error=exc))
raise
finally:
self.handle_event(ControllerEnd(controller=self))

def get_agent_context(self, ready_tasks: list[Task]) -> "AgentContext":
# select an agent
agent = self.get_agent(ready_tasks=ready_tasks)
Expand All @@ -151,13 +105,51 @@ def get_agent_context(self, ready_tasks: list[Task]) -> "AgentContext":
)
return context

def run(self):
while any(t.is_incomplete() for t in self.tasks):
self.run_once()
def run(self, steps: Optional[int] = None):
from controlflow.events.controller_events import (
ControllerEnd,
ControllerError,
ControllerStart,
)

i = 0
while any(t.is_incomplete() for t in self.tasks) and i < (steps or math.inf):
self.handle_event(ControllerStart(controller=self))

try:
ready_tasks = self.get_ready_tasks()
context = self.get_agent_context(ready_tasks=ready_tasks)
context.run()

except Exception as exc:
self.handle_event(ControllerError(controller=self, error=exc))
raise
finally:
self.handle_event(ControllerEnd(controller=self))
i += 1

async def run_async(self, steps: Optional[int] = None):
from controlflow.events.controller_events import (
ControllerEnd,
ControllerError,
ControllerStart,
)

async def run_async(self):
while any(t.is_incomplete() for t in self.tasks):
await self.run_once_async()
i = 0
while any(t.is_incomplete() for t in self.tasks) and i < (steps or math.inf):
self.handle_event(ControllerStart(controller=self))

try:
ready_tasks = self.get_ready_tasks()
context = self.get_agent_context(ready_tasks=ready_tasks)
await context.run_async()

except Exception as exc:
self.handle_event(ControllerError(controller=self, error=exc))
raise
finally:
self.handle_event(ControllerEnd(controller=self))
i += 1

def get_ready_tasks(self) -> list[Task]:
all_tasks = self.flow.graph.upstream_tasks(self.tasks)
Expand Down
Loading

0 comments on commit d36ed8e

Please sign in to comment.