From e568059686505bfdd7b7752f4b1702226f0cb7ed Mon Sep 17 00:00:00 2001 From: Alex Raber Date: Fri, 9 May 2025 22:41:56 -0700 Subject: [PATCH] adding PocketFlow --- README.md | 2 + .../01_shared_state___shared__dictionary__.md | 208 +++++++++++ ...node___basenode____node____asyncnode___.md | 245 +++++++++++++ docs/PocketFlow/03_actions___transitions_.md | 249 +++++++++++++ .../04_flow___flow____asyncflow___.md | 323 +++++++++++++++++ ..._processing___asyncnode____asyncflow___.md | 238 ++++++++++++ ..._batchflow____asyncparallelbatchnode___.md | 321 +++++++++++++++++ ...gent_to_agent__communication_framework_.md | 338 ++++++++++++++++++ docs/PocketFlow/index.md | 61 ++++ 9 files changed, 1985 insertions(+) create mode 100644 docs/PocketFlow/01_shared_state___shared__dictionary__.md create mode 100644 docs/PocketFlow/02_node___basenode____node____asyncnode___.md create mode 100644 docs/PocketFlow/03_actions___transitions_.md create mode 100644 docs/PocketFlow/04_flow___flow____asyncflow___.md create mode 100644 docs/PocketFlow/05_asynchronous_processing___asyncnode____asyncflow___.md create mode 100644 docs/PocketFlow/06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md create mode 100644 docs/PocketFlow/07_a2a__agent_to_agent__communication_framework_.md create mode 100644 docs/PocketFlow/index.md diff --git a/README.md b/README.md index 9ed5ff1b..d98dfbb4 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,8 @@ This is a tutorial project of [Pocket Flow](https://github.com/The-Pocket/Pocket - [OpenManus](https://the-pocket.github.io/PocketFlow-Tutorial-Codebase-Knowledge/OpenManus) - Build AI agents with digital brains that think, learn, and use tools just like humans do! +- [PocketFlow](https://the-pocket.github.io/PocketFlow-Tutorial-Codebase-Knowledge/PocketFlow) - Pocket Flow: 100-line LLM framework. Let Agents build Agents! + - [Pydantic Core](https://the-pocket.github.io/PocketFlow-Tutorial-Codebase-Knowledge/Pydantic%20Core) - Validate data at rocket speed with just Python type hints! - [Requests](https://the-pocket.github.io/PocketFlow-Tutorial-Codebase-Knowledge/Requests) - Talk to the internet in Python with code so simple it feels like cheating! diff --git a/docs/PocketFlow/01_shared_state___shared__dictionary__.md b/docs/PocketFlow/01_shared_state___shared__dictionary__.md new file mode 100644 index 00000000..e0d0fda0 --- /dev/null +++ b/docs/PocketFlow/01_shared_state___shared__dictionary__.md @@ -0,0 +1,208 @@ +# Chapter 1: Shared State (`shared` dictionary) + +Welcome to your first step into the world of PocketFlow! Building powerful AI applications often involves breaking down complex tasks into smaller, manageable steps. But how do these steps communicate with each other? How does one part of your AI know what another part has done or figured out? That's where the **`shared` dictionary** comes into play. + +Imagine you're building a simple AI assistant. +1. First, it needs to get your question (e.g., "What's the weather like in London?"). +2. Then, it might need to search the web for "weather in London." +3. Finally, it uses your original question and the search results to give you an answer. + +For this to work, the "question understanding" step needs to pass the question to the "web searching" step. Then, both the original question and the search results need to be available to the "answering" step. The `shared` dictionary is the magic message board that lets all these steps share information. + +## What is the `shared` Dictionary? + +At its heart, the `shared` dictionary is a standard Python dictionary (`dict`). Think of it like a **communal backpack** or a **shared whiteboard**. +As your PocketFlow application (which we call a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md)) runs, different components (which we call [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)) can: +* **Put things into it** (write data). +* **Look at what's inside** (read data). +* **Update things** that are already there. + +This `shared` dictionary becomes the primary way for different parts of your workflow to pass data, intermediate results, and context to each other. It's available throughout the entire lifecycle of a single execution of a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). + +## How to Use the `shared` Dictionary + +Let's see how this works with a few simple examples. + +**1. Initializing `shared` with Starting Data** + +Before your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) even starts, you usually prepare some initial data. This data is placed into the `shared` dictionary. + +Consider this snippet from one of our example projects (`cookbook/pocketflow-node/main.py`): +```python +# This is how we can start with some data +text_to_summarize = """ +PocketFlow is a minimalist LLM framework... +""" +shared = {"data": text_to_summarize} + +# Later, this 'shared' dictionary is passed when running the flow: +# flow.run(shared) +``` +In this code: +* We have some `text_to_summarize`. +* We create a Python dictionary named `shared`. +* We add an entry to this dictionary: the key is `"data"` and its value is our `text_to_summarize`. +When the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) starts, this `shared` dictionary will be its starting point. + +Here's another example from `cookbook/pocketflow-a2a/main.py` where a question is put into `shared`: +```python +# Default question or one from command line +question = "Who won the Nobel Prize in Physics 2024?" + +# Process the question +shared = {"question": question} +# agent_flow.run(shared) +``` +Here, the `shared` dictionary is initialized with the `question` under the key `"question"`. + +**2. A [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) Reading from `shared`** + +[Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) are the workers in your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). They often need to read data from the `shared` dictionary to know what to do. This usually happens in a Node's `prep` method. + +Let's look at the `Summarize` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) from `cookbook/pocketflow-node/flow.py`: +```python +# Inside the Summarize Node class +# def prep(self, shared): +# """Read and preprocess data from shared store.""" +# return shared["data"] # Accesses the 'data' we set earlier +``` +When this `Summarize` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) is about to run, its `prep` method is called. PocketFlow automatically passes the current `shared` dictionary to this method. +The line `shared["data"]` retrieves the value associated with the key `"data"` – which is the text we want to summarize. + +Another example from `cookbook/pocketflow-a2a/nodes.py`, in the `DecideAction` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md): +```python +# Inside the DecideAction Node's prep method +# def prep(self, shared): +# Get the current context (default if none exists) +context = shared.get("context", "No previous search") +# Get the question from the shared store +question = shared["question"] +return question, context +``` +This `prep` method reads two items: +* `shared.get("context", "No previous search")`: This tries to get the value for the key `"context"`. If `"context"` isn't found (maybe it's the first time this runs), it defaults to `"No previous search"`. Using `.get()` is a safe way to read, as it prevents errors if a key might be missing. +* `shared["question"]`: This directly retrieves the value for the key `"question"`, assuming it will always be there. + +**3. A [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) Writing Results Back to `shared`** + +After a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) does its work (e.g., summarizes text, gets search results), it often needs to save its findings back into the `shared` dictionary. This typically happens in a Node's `post` method. + +Continuing with our `Summarize` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) (`cookbook/pocketflow-node/flow.py`): +```python +# Inside the Summarize Node class +# 'exec_res' below is the result from the Node's main task +# def post(self, shared, prep_res, exec_res): +# """Store the summary in shared store.""" +# shared["summary"] = exec_res # Stores the result +``` +Here, `exec_res` holds the summary generated by the [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). The line `shared["summary"] = exec_res` creates a new key `"summary"` in the `shared` dictionary (or updates it if it already exists) and stores the summary there. Now, subsequent [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) can access this summary! + +Similarly, in `DecideAction`'s `post` method (`cookbook/pocketflow-a2a/nodes.py`): +```python +# Inside DecideAction Node's post method +# def post(self, shared, prep_res, exec_res): +# 'exec_res' contains the decision made by an LLM +if exec_res["action"] == "search": + shared["search_query"] = exec_res["search_query"] + # ... +else: + shared["context"] = exec_res["answer"] + # ... +# ... +``` +Depending on the `action` decided, this `post` method writes either a `"search_query"` or an updated `"context"` (which is the answer) into the `shared` dictionary. + +**4. Modifying Existing Data in `shared`** + +Sometimes, a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) needs to update or add to existing information in `shared`. For example, in a chat application, you maintain a history of messages. + +From `cookbook/pocketflow-chat/main.py`, the `ChatNode`'s `prep` method does this: +```python +# Inside ChatNode's prep method +# def prep(self, shared): +# Initialize messages if this is the first run +if "messages" not in shared: + shared["messages"] = [] # Create an empty list if no history + +# ... user_input is obtained ... + +# Add user message to history +shared["messages"].append({"role": "user", "content": user_input}) +# ... +``` +Here: +1. It checks if `"messages"` (our chat history) exists in `shared`. If not, it initializes `shared["messages"]` as an empty list. +2. It then appends the new user message to this list. The `shared["messages"]` list grows with each turn of the conversation. + +**5. Accessing Final Results from `shared`** + +Once your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) has completed all its steps, the `shared` dictionary will contain the final outputs and any important intermediate data you chose to store. You can then access these results from your main script. + +Back to `cookbook/pocketflow-node/main.py`: +```python +# After the flow.run(shared) call: +# The 'shared' dictionary now contains the summary + +print("\nSummary:", shared["summary"]) +``` +This line simply prints the value associated with the key `"summary"` from the `shared` dictionary, which was put there by the `Summarize` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). + +## Key Characteristics of `shared` + +* **It's a Python Dictionary:** This makes it incredibly flexible and easy to use. If you know how to use dictionaries in Python (e.g., `my_dict['key'] = value`, `value = my_dict['key']`, `my_dict.get('key', default_value)`), you already know how to interact with `shared`. +* **Scoped to a Single Flow Execution:** Each time you run a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) (e.g., by calling `flow.run(shared_input)`), it operates on its own instance of the `shared` dictionary. If you run the same [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) twice, even simultaneously for different requests, they will have completely separate `shared` dictionaries. They won't interfere with each other. Think of it like two people filling out their own copies of the same form. +* **Persistent Throughout One Flow Execution:** The `shared` dictionary is created (or you provide an initial one) when a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) starts. The *exact same* dictionary object is then passed from one [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to the next. Any modifications made by one [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) are visible to all subsequent [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). + +## What Happens Under the Hood? (A Simplified View) + +You don't need to manage the passing of the `shared` dictionary yourself; PocketFlow handles it for you. Here's a simplified step-by-step: + +1. **You start a Flow:** You call something like `my_flow.run(initial_shared_data)`. `initial_shared_data` is the dictionary you've prepared. +2. **PocketFlow takes over:** It takes your `initial_shared_data` and passes it to the first [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) in your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). +3. **Node executes:** + * The [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)'s `prep` method is called with the `shared` dictionary. It can read from it. + * The [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)'s `exec` method (the main workhorse) is called. + * The [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)'s `post` method is called with the `shared` dictionary. It can write results back to it. +4. **Pass it on:** PocketFlow determines the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to run and passes the *same, possibly modified*, `shared` dictionary to it. +5. **Repeat:** Steps 3 and 4 repeat until there are no more [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to run in the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). +6. **Flow ends:** The `run` method finishes, and the `shared` dictionary you originally passed in now contains all the updates made by the [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). + +Here's a visual way to think about it: + +```mermaid +sequenceDiagram + participant You + participant PocketFlowEngine as PocketFlow Engine + participant NodeA as First Node + participant NodeB as Second Node + participant SharedDict as Shared Dictionary + + You->>PocketFlowEngine: my_flow.run(initial_shared) + PocketFlowEngine->>SharedDict: Initialize with initial_shared + PocketFlowEngine->>NodeA: process(SharedDict) + NodeA->>SharedDict: Reads input (e.g., shared['question']) + NodeA->>SharedDict: Writes output (e.g., shared['data_from_A'] = ...) + PocketFlowEngine->>NodeB: process(SharedDict) + NodeB->>SharedDict: Reads input (e.g., shared['data_from_A']) + NodeB->>SharedDict: Writes output (e.g., shared['final_answer'] = ...) + PocketFlowEngine->>You: Flow complete (initial_shared is now updated) +``` + +## Analogy Time! + +Think of the `shared` dictionary as: + +* **A Relay Race Baton (but smarter!):** Each runner ([Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)) takes the baton (`shared` dictionary), maybe adds a small note or a sticker to it, and then passes it to the next runner. By the end of the race, the baton has collected contributions from everyone. +* **A Project's Shared Folder:** Imagine a team working on a project. They have a shared folder (`shared` dictionary) on a server. The first person creates a document (initial data). The next person opens it, adds their part, and saves it. The next person does the same. Everyone works on the same set of files in that folder. + +## Conclusion + +You've now learned about the `shared` dictionary, the backbone of communication within a PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). It's a simple yet powerful Python dictionary that allows different [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to share data and context seamlessly. By reading from and writing to `shared`, your [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) can collaborate to achieve complex tasks. + +Now that you understand how data is passed around, you're probably wondering about the "workers" themselves – the [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). What are they, and how do you build them? Let's dive into that in the next chapter! + +Next up: [Chapter 2: Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/02_node___basenode____node____asyncnode___.md b/docs/PocketFlow/02_node___basenode____node____asyncnode___.md new file mode 100644 index 00000000..fdeb16ad --- /dev/null +++ b/docs/PocketFlow/02_node___basenode____node____asyncnode___.md @@ -0,0 +1,245 @@ +# Chapter 2: Node (`BaseNode`, `Node`, `AsyncNode`) + +In [Chapter 1: Shared State (`shared` dictionary)](01_shared_state___shared__dictionary__.md), we learned how different parts of a PocketFlow workflow can communicate using the `shared` dictionary. Now, let's meet the actual "workers" that perform the tasks and use this shared information: **Nodes**. + +## What are Nodes and Why Do We Need Them? + +Imagine you're building an AI that helps you write a story. This process might involve several steps: +1. Generate a story idea. +2. Write an outline based on the idea. +3. Write the first draft of a chapter using the outline. +4. Review and edit the chapter. + +Each of these steps is a distinct task. In PocketFlow, each such task would be handled by a **Node**. + +A **Node** is the fundamental building block in PocketFlow. It represents a single, atomic step in your workflow. Think of it as a highly specialized worker on an assembly line, responsible for one specific job. This job could be: +* Calling a Large Language Model (LLM) to generate text. +* Searching the web for information. +* Making a decision based on some data. +* Reading user input. +* Saving results to a file. + +By breaking down a complex process into a series of Nodes, we make our AI applications: +* **Modular:** Each Node focuses on one thing, making it easier to develop, test, and understand. +* **Reusable:** A Node designed for web search can be used in many different AI applications. +* **Manageable:** It's easier to build and debug a sequence of simple steps than one giant, monolithic piece of code. + +## The Anatomy of a Node: `prep`, `exec`, and `post` + +Most Nodes in PocketFlow have a similar structure, typically involving three key methods: + +1. **`prep(self, shared)` (Prepare):** + * **Purpose:** This method is called *before* the Node does its main work. Its job is to get any necessary input data from the [shared dictionary](01_shared_state___shared__dictionary__.md). + * **Analogy:** An assembly line worker picking up the necessary parts from a shared bin before starting their task. + * **Input:** It receives the `shared` dictionary. + * **Output:** It usually returns the specific data the Node needs for its core logic. + +2. **`exec(self, prep_res)` (Execute):** + * **Purpose:** This is where the Node performs its core task. This is the "brain" or "muscle" of the Node. + * **Analogy:** The assembly line worker actually assembling the parts or performing their specialized action. + * **Input:** It receives the result from the `prep` method (`prep_res`). + * **Output:** It returns the result of its execution (e.g., a summary, search results, a decision). + +3. **`post(self, shared, prep_res, exec_res)` (Post-process):** + * **Purpose:** This method is called *after* the Node has finished its main work. Its jobs are: + * To process the results from `exec`. + * To update the [shared dictionary](01_shared_state___shared__dictionary__.md) with these results or any other new information. + * To decide what should happen next in the workflow (this is crucial for [Actions / Transitions](03_actions___transitions_.md), which we'll cover in the next chapter). + * **Analogy:** The assembly line worker placing the finished component onto the conveyor belt (updating `shared`) and signaling if the item needs to go to a different station next (deciding the next action). + * **Input:** It receives the `shared` dictionary, the result from `prep` (`prep_res`), and the result from `exec` (`exec_res`). + * **Output:** It can return an "action" string that tells the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) which Node to execute next. If it returns nothing (or `None`), a default transition is usually followed. + +Let's make this concrete with a simple example: a `SummarizeNode` whose job is to take some text and produce a short summary. + +```python +# This is a conceptual Node, actual implementation details might vary slightly +from pocketflow import Node # We'll import the base class + +class SummarizeNode(Node): + def prep(self, shared): + # 1. Prepare: Get the text to summarize from 'shared' + print("SummarizeNode: Preparing...") + text_to_summarize = shared.get("document_text", "No text found.") + return text_to_summarize + + def exec(self, text_input): + # 2. Execute: Perform the summarization (e.g., call an LLM) + print(f"SummarizeNode: Executing with text: '{text_input[:30]}...'") + if not text_input or text_input == "No text found.": + return "Cannot summarize empty or missing text." + # In a real scenario, this would call an LLM or a summarization library + summary = f"This is a summary of: {text_input[:20]}..." + return summary + + def post(self, shared, prep_res, exec_res): + # 3. Post-process: Store the summary in 'shared' + print(f"SummarizeNode: Posting summary: '{exec_res}'") + shared["summary_output"] = exec_res + # We might decide the next step here, e.g., return "summarization_done" + # For now, we'll just let it end by returning nothing (None) +``` + +Let's imagine how this `SummarizeNode` would work: + +1. **Initialization:** You'd start with some text in the `shared` dictionary. + ```python + shared_data = {"document_text": "PocketFlow is a cool framework for building AI."} + ``` + +2. **Running the Node (simplified):** + * **`prep(shared_data)` is called:** It looks into `shared_data` and finds `"PocketFlow is a cool framework for building AI."`. It returns this text. + * **`exec("PocketFlow is a cool framework...")` is called:** It takes the text and (in our simplified example) creates a summary like `"This is a summary of: PocketFlow is a cool..."`. It returns this summary. + * **`post(shared_data, text_from_prep, summary_from_exec)` is called:** It takes the `shared_data` and the `summary_from_exec`. It then adds a new entry: `shared_data["summary_output"] = "This is a summary of: PocketFlow is a cool..."`. + +After the Node runs, `shared_data` would look like this: +``` +{ + "document_text": "PocketFlow is a cool framework for building AI.", + "summary_output": "This is a summary of: PocketFlow is a cool..." +} +``` +The summary is now available in the `shared` dictionary for other Nodes or for final output! + +## Types of Nodes: `BaseNode`, `Node`, `AsyncNode` + +PocketFlow provides a few variations of Nodes, built on top of each other: + +* **`BaseNode`:** + * This is the most fundamental type of Node. It provides the basic structure with `prep`, `exec`, and `post` methods. + * It's like the basic blueprint for any worker. + +* **`Node` (inherits from `BaseNode`):** + * This is the standard synchronous Node you'll often use. "Synchronous" means it performs its task and waits for it to complete before anything else happens. + * It adds helpful features on top of `BaseNode`, like automatic **retries** if the `exec` method fails (e.g., a network error when calling an LLM) and an `exec_fallback` method that can be called if all retries fail. + * From `cookbook/pocketflow-node/flow.py`, our `Summarize` Node is an example of `Node`: + ```python + from pocketflow import Node + # ... other imports ... + + class Summarize(Node): # Inherits from Node + # ... prep, exec, post methods ... + def exec_fallback(self, shared, prep_res, exc): + """Provide a simple fallback instead of crashing.""" + return "There was an error processing your request." + ``` + This `Summarize` Node, if its `exec` method fails (e.g., `call_llm` raises an error), will retry (default is 1 retry, but can be configured like `Summarize(max_retries=3)`). If all retries fail, `exec_fallback` is called. + +* **`AsyncNode` (inherits from `Node`):** + * This type of Node is for **asynchronous** tasks. Asynchronous tasks are those that might take some time to complete (like waiting for a web request or a user to type something) but don't need to block the entire program while they wait. They can "pause" and let other things run. + * `AsyncNode` uses `async` and `await` keywords from Python's `asyncio` library. + * It has asynchronous versions of the core methods: `prep_async`, `exec_async`, and `post_async`. + * We'll dive much deeper into asynchronous operations in [Chapter 5: Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md). For now, just know it exists for tasks that involve waiting. + * Example from `cookbook/pocketflow-async-basic/nodes.py`: + ```python + from pocketflow import AsyncNode + # ... other imports ... + + class FetchRecipes(AsyncNode): # Inherits from AsyncNode + async def prep_async(self, shared): + # ... prepare input asynchronously ... + ingredient = await get_user_input("Enter ingredient: ") # get_user_input is async + return ingredient + + async def exec_async(self, ingredient): + # ... execute task asynchronously ... + recipes = await fetch_recipes(ingredient) # fetch_recipes is async + return recipes + + async def post_async(self, shared, prep_res, recipes): + # ... post-process asynchronously ... + shared["recipes"] = recipes + return "suggest" # Action for the next step + ``` + Notice the `async def` and `await` keywords. This `FetchRecipes` Node can wait for user input and web requests without freezing the application. + +## How a Node Runs: Under the Hood (Simplified) + +When PocketFlow decides it's time for a particular Node to run (as part of a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md)), it essentially orchestrates the calling of its `prep`, `exec`, and `post` methods in sequence. + +Here's a simplified view of what happens when a synchronous `Node`'s internal `_run` method is invoked: + +1. **Call `prep`:** `prep_result = self.prep(shared)` + * Your Node's `prep` method is called with the current `shared` dictionary. + * Whatever `prep` returns is stored. + +2. **Call `_exec` (which internally calls your `exec` with retries):** `exec_result = self._exec(prep_result)` + * The Node's `_exec` method is called with the `prep_result`. + * This `_exec` method in the `Node` class handles the retry logic. It will try to call your `exec(prep_result)` method. + * If your `exec` succeeds, its result is stored. + * If your `exec` raises an exception, `_exec` might wait and try again (up to `max_retries`). + * If all retries fail, `exec_fallback(prep_result, exception)` is called, and its result is used as `exec_result`. + +3. **Call `post`:** `action = self.post(shared, prep_result, exec_result)` + * Your Node's `post` method is called with the `shared` dictionary, the `prep_result`, and the `exec_result`. + * `post` can modify `shared` and returns an action string (or `None`). + +4. **Return Action:** The `action` returned by `post` is then used by the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) to determine the next Node to run. + +Let's visualize this with a sequence diagram: + +```mermaid +sequenceDiagram + participant FlowEngine as PocketFlow Engine + participant YourNode as Your Node Instance + participant SharedDict as Shared Dictionary + + FlowEngine->>YourNode: _run(SharedDict) + YourNode->>YourNode: prep(SharedDict) + Note right of YourNode: Reads from SharedDict + YourNode-->>SharedDict: Access data (e.g., shared['input']) + YourNode->>YourNode: _exec(prep_result) + Note right of YourNode: Calls your exec(), handles retries/fallback + YourNode->>YourNode: post(SharedDict, prep_result, exec_result) + Note right of YourNode: Writes to SharedDict, decides next action + YourNode-->>SharedDict: Update data (e.g., shared['output'] = ...) + YourNode-->>FlowEngine: Returns action string +``` + +**Code Glimpse (from `pocketflow/__init__.py`):** + +The `BaseNode` class defines the fundamental execution flow in its `_run` method (this is a direct, slightly simplified version): +```python +# Inside BaseNode class from pocketflow/__init__.py +def _run(self, shared): + prep_output = self.prep(shared) + exec_output = self._exec(prep_output) # _exec calls self.exec + action = self.post(shared, prep_output, exec_output) + return action +``` +This is the core loop for a single Node's execution. + +The `Node` class (which inherits from `BaseNode`) overrides `_exec` to add retry and fallback logic: +```python +# Simplified concept from Node class in pocketflow/__init__.py +def _exec(self, prep_res): + for self.cur_retry in range(self.max_retries): # Loop for retries + try: + return self.exec(prep_res) # Call your Node's exec method + except Exception as e: + if self.cur_retry == self.max_retries - 1: # If last retry + return self.exec_fallback(prep_res, e) # Call fallback + if self.wait > 0: + time.sleep(self.wait) # Wait before retrying +``` +This shows how `Node` makes your worker more robust by automatically handling temporary failures. + +For `AsyncNode`, the methods are `prep_async`, `exec_async`, `post_async`, and they are `await`ed, allowing other tasks to run while waiting for I/O operations. This will be detailed in [Chapter 5](05_asynchronous_processing___asyncnode____asyncflow___.md). + +## Conclusion + +You've now been introduced to **Nodes**, the workhorses of PocketFlow! +* They represent **single, atomic steps** in your workflow. +* They typically follow a **`prep` -> `exec` -> `post`** lifecycle. +* `prep` gets data from the [shared dictionary](01_shared_state___shared__dictionary__.md). +* `exec` performs the core logic. +* `post` updates the `shared` dictionary and can decide what happens next. +* **`Node`** provides synchronous execution with retries and fallbacks. +* **`AsyncNode`** provides asynchronous execution for I/O-bound tasks. + +Nodes are the building blocks you'll use to define the individual capabilities of your AI agents and applications. But how do these Nodes connect to form a sequence or a more complex workflow? And how does the `post` method's return value actually control the flow? That's where [Actions / Transitions](03_actions___transitions__.md) come in, which we'll explore in the next chapter! + +Next up: [Chapter 3: Actions / Transitions](03_actions___transitions__.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/03_actions___transitions_.md b/docs/PocketFlow/03_actions___transitions_.md new file mode 100644 index 00000000..b68d4f32 --- /dev/null +++ b/docs/PocketFlow/03_actions___transitions_.md @@ -0,0 +1,249 @@ +# Chapter 3: Actions / Transitions + +In [Chapter 2: Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md), we learned that Nodes are the individual workers in our PocketFlow application, each performing a specific task. We also touched upon the `post` method of a Node, mentioning that it can return an "action" string. Now, it's time to explore exactly what these "actions" are and how they create "transitions," guiding the workflow dynamically. + +Imagine you're building an AI research assistant. After the AI receives your question, it needs to decide: should it search the web for more information, or does it already have enough context to answer? This decision point, and acting upon it, is where Actions and Transitions shine. + +## What are Actions and Transitions? + +**Actions** and **Transitions** are the mechanism by which a PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) determines the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to execute. + +* An **Action** is usually a simple string (e.g., `"search"`, `"answer"`, `"proceed"`) returned by a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)'s `post` method after it completes its work. This string signals the outcome or a desired next step. +* A **Transition** is the rule defined within the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) that says, "If *this* Node returns *this* action, then go to *that* Node next." + +Think of it like a "Choose Your Own Adventure" book. At the end of a section (a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) finishing its task), you might be told, "If you want to open the door, turn to page 42. If you want to look under the bed, turn to page 55." The "open the door" part is the "action," and "turn to page 42" is the "transition." + +This allows your workflow to be dynamic and intelligent, not just a fixed sequence of steps. + +## How to Use Actions and Transitions + +Let's break down how you implement this, using our AI research assistant idea from `cookbook/pocketflow-a2a/`. + +**1. A Node Returns an Action String** + +The `post` method of a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) is where the decision for the next action is typically made and returned. + +Consider the `DecideAction` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) from `cookbook/pocketflow-a2a/nodes.py`. Its job is to decide whether to search the web or try to answer the question directly. + +```python +# Inside DecideAction Node class (cookbook/pocketflow-a2a/nodes.py) +# ... (prep and exec methods are here) ... + +class DecideAction(Node): + # ... + def post(self, shared, prep_res, exec_res): + """Save the decision and determine the next step in the flow.""" + # 'exec_res' is a dictionary like {"action": "search", "search_query": "..."} + # or {"action": "answer", "answer": "..."} + if exec_res["action"] == "search": + shared["search_query"] = exec_res["search_query"] + print(f"🔍 Agent decided to search for: {exec_res['search_query']}") + else: + # ... store answer if action is "answer" ... + print(f"💡 Agent decided to answer the question") + + # Return the action string to guide the Flow + return exec_res["action"] # This could be "search" or "answer" +``` +In this `post` method: +* It first updates the [shared dictionary](01_shared_state___shared__dictionary__.md) based on the decision made in `exec_res`. +* Crucially, it returns `exec_res["action"]`. If the LLM in the `exec` method decided to search, this will be the string `"search"`. If it decided to answer, it will be `"answer"`. This returned string is the **action**. + +**2. Defining Transitions in a Flow** + +Now that our `DecideAction` [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) can return an action like `"search"` or `"answer"`, we need to tell the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) what to do for each of these actions. This is done when you set up your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). + +PocketFlow uses a very intuitive syntax: `current_node - "action_string" >> next_node`. + +Let's look at `cookbook/pocketflow-a2a/flow.py`: +```python +# From cookbook/pocketflow-a2a/flow.py +from pocketflow import Flow +from nodes import DecideAction, SearchWeb, AnswerQuestion + +# Create instances of each node +decide = DecideAction() +search = SearchWeb() +answer = AnswerQuestion() + +# Connect the nodes using actions +# If DecideAction returns "search", go to SearchWeb node +decide - "search" >> search + +# If DecideAction returns "answer", go to AnswerQuestion node +decide - "answer" >> answer + +# After SearchWeb completes and returns "decide", go back to DecideAction +search - "decide" >> decide + +# Create the flow, starting with the DecideAction node +agent_flow = Flow(start=decide) +``` +Here's what's happening: +* `decide - "search" >> search`: This line says, "If the `decide` Node returns the action string `"search"`, then the *next* Node to execute should be the `search` Node." +* `decide - "answer" >> answer`: Similarly, "If `decide` returns `"answer"`, then go to the `answer` Node." +* `search - "decide" >> decide`: This creates a loop! After the `search` Node (which performs a web search) completes, its `post` method returns `"decide"`. This transition sends the control *back* to the `decide` Node, perhaps with new search results in the [shared dictionary](01_shared_state___shared__dictionary__.md), to re-evaluate. + +When `agent_flow.run(shared_data)` is called: +1. The `decide` Node runs. Let's say its `post` method returns `"search"`. +2. The `Flow` sees this action. It looks at the transitions defined for `decide`. It finds `decide - "search" >> search`. +3. So, the `search` Node runs next. +4. Let's say the `search` Node's `post` method returns `"decide"`. +5. The `Flow` sees this. It finds `search - "decide" >> decide`. +6. The `decide` Node runs again. This time, with the search results in `shared`, it might return `"answer"`. +7. The `Flow` finds `decide - "answer" >> answer`. +8. The `answer` Node runs, generates the final answer, and its `post` method might return `"done"` (or `None`). If `"done"` isn't a defined transition for the `answer` Node, the flow might end. + +**3. Default Transitions** + +What if a Node's `post` method returns `None` (i.e., nothing), or it returns an action string for which you haven't defined a specific transition (e.g., `decide` returns `"unknown_action"`)? + +Often, you'll define a **default transition**. This is like the "else" in an if-else statement. If no specific action matches, the default transition is taken. + +The syntax for a default transition is simpler: `current_node >> next_node_for_default_action`. + +Let's look at `cookbook/pocketflow-supervisor/flow.py`: +```python +# From cookbook/pocketflow-supervisor/flow.py +# ... (agent_flow is an inner Flow, supervisor is a SupervisorNode) ... + +# Connect the components +# After agent_flow completes, go to supervisor (this is a default transition) +agent_flow >> supervisor + +# If supervisor rejects the answer (returns "retry"), go back to agent_flow +supervisor - "retry" >> agent_flow + +# Create and return the outer flow +supervised_flow = Flow(start=agent_flow) +``` +Here: +* `agent_flow >> supervisor`: If the `agent_flow` (which is treated as a single unit here) completes and its `post` method returns an action that is *not* specifically handled by `agent_flow` itself for transitions *within* it, or if it returns `None`, it will transition to the `supervisor` Node. This is a default transition. +* `supervisor - "retry" >> agent_flow`: This is a specific action-based transition. If the `supervisor` Node's `post` method returns `"retry"`, the flow goes back to `agent_flow`. + +If a Node's `post` returns `None`, and there's a default transition defined (e.g., `node1 >> node2`), then `node2` will be executed. If there's no specific transition for the returned action *and* no default transition, the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) typically ends for that branch. + +## What Happens Under the Hood? (A Simplified View) + +1. **Node Execution:** Your [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) runs a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). +2. **`post` Method Returns Action:** The `post` method of that [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) completes and returns an action string (e.g., `"search"`). +3. **Flow Receives Action:** The [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) (specifically, its orchestrator logic) gets this action string. +4. **Lookup Successor:** The [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) looks at the current [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) and checks its defined successors. Each [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) object internally stores a dictionary called `successors`. This dictionary maps action strings to the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) object. + * The syntax `node1 - "actionX" >> node2` effectively does `node1.successors["actionX"] = node2`. + * The syntax `node1 >> node2` effectively does `node1.successors["default"] = node2`. +5. **Find Next Node:** The [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) tries to find an entry in `current_node.successors` for the returned action string. If not found, it tries to find an entry for `"default"`. +6. **Transition or End:** + * If a next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) is found, the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) prepares to execute it. + * If no matching transition (neither specific nor default) is found, that path of the [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) typically concludes. + +Here's a sequence diagram illustrating this: + +```mermaid +sequenceDiagram + participant FlowEngine + participant CurrentNode as Current Node + participant NextNodeSearch as Next Node (for "search") + participant NextNodeAnswer as Next Node (for "answer") + + FlowEngine->>CurrentNode: _run(shared_data) + Note over CurrentNode: prep(), exec() run... + CurrentNode->>CurrentNode: post() method executes + CurrentNode-->>FlowEngine: Returns action_string (e.g., "search") + FlowEngine->>FlowEngine: get_next_node(CurrentNode, "search") + Note over FlowEngine: Looks up "search" in CurrentNode.successors + FlowEngine->>NextNodeSearch: _run(shared_data) +``` + +**Diving into the Code (from `pocketflow/__init__.py`):** + +1. **Storing Transitions:** + The `BaseNode` class (which all [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) and [Flows (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) inherit from) has a `next` method to define successors: + ```python + # Inside BaseNode class (pocketflow/__init__.py) + class BaseNode: + def __init__(self): + self.successors = {} # Stores action -> next_node mapping + # ... other initializations ... + + def next(self, node, action="default"): + # ... (warning if overwriting) ... + self.successors[action] = node + return node # Allows chaining + ``` + The cool `node - "action" >> next_node` syntax is made possible by Python's special methods (`__sub__` for `-` and `__rshift__` for `>>`): + ```python + # Inside BaseNode class (pocketflow/__init__.py) + def __sub__(self, action_str): # When you do 'node - "action_str"' + if isinstance(action_str, str): + return _ConditionalTransition(self, action_str) + # ... error handling ... + + class _ConditionalTransition: # A temporary helper object + def __init__(self, src_node, action_name): + self.src_node, self.action_name = src_node, action_name + + def __rshift__(self, target_node): # When you do '... >> target_node' + return self.src_node.next(target_node, self.action_name) + ``` + And for the default transition `node1 >> node2`: + ```python + # Inside BaseNode class (pocketflow/__init__.py) + def __rshift__(self, other_node): # When you do 'node1 >> other_node' + return self.next(other_node) # Calls .next() with action="default" + ``` + So, these operators are just convenient ways to populate the `successors` dictionary of a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). + +2. **Flow Orchestration and Using Actions:** + The `Flow` class has an `_orch` (orchestration) method that manages running [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) in sequence. + ```python + # Inside Flow class (pocketflow/__init__.py) + class Flow(BaseNode): + # ... + def get_next_node(self, current_node, action_str): + # Tries the specific action, then "default" + next_node = current_node.successors.get(action_str) + if not next_node: # If specific action not found + next_node = current_node.successors.get("default") + + # ... (warning if action not found and successors exist) ... + return next_node + + def _orch(self, shared, params=None): + current_node = self.start_node + last_action = None + while current_node: + # ... (set params for current_node) ... + last_action = current_node._run(shared) # Node returns an action + current_node = self.get_next_node(current_node, last_action) + return last_action # Returns the last action from the entire flow + ``` + The `_orch` method: + * Starts with the `self.start_node`. + * In a loop, it runs the `current_node` (whose `_run` method calls `prep`, `exec`, and `post`). The `post` method's return value becomes `last_action`. + * It then calls `self.get_next_node(current_node, last_action)` to determine the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). + * If `get_next_node` returns a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md), the loop continues. If it returns `None` (no transition found), the loop (and thus the flow for that path) ends. + +## Analogy: A Mail Sorter + +Think of a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) as a mail processing station. +* It receives a package (data via `prep` from the [shared dictionary](01_shared_state___shared__dictionary__.md)). +* It processes the package (its `exec` method). +* Then, its `post` method looks at the package and decides which destination bin it should go to next. It writes a "destination code" (the action string like `"LOCAL_DELIVERY"` or `"INTERNATIONAL_FORWARD"`) on the package. +* The [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) is like the conveyor belt system. It reads the "destination code" and uses its routing rules (the `node - "code" >> next_station` definitions) to send the package to the correct next station. If there's no specific code, it might send it to a "default processing" station. + +## Conclusion + +Actions and Transitions are the control flow mechanism in PocketFlow. They allow you to build dynamic and responsive workflows where the path of execution can change based on the outcomes of individual [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). +* A [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md)'s `post` method returns an **action string**. +* The [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) uses this action string to find a **transition rule** (e.g., `current_node - "action" >> next_node` or a default `current_node >> next_node`). +* This determines the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) to execute. + +By mastering actions and transitions, you can design sophisticated logic for your AI agents, enabling them to make decisions and navigate complex tasks. + +Now that we understand how individual [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) are defined, how they share data using the [shared dictionary](01_shared_state___shared__dictionary__.md), and how they connect using Actions and Transitions, we're ready to look at the bigger picture: the container that orchestrates all of this. + +Next up: [Chapter 4: Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/04_flow___flow____asyncflow___.md b/docs/PocketFlow/04_flow___flow____asyncflow___.md new file mode 100644 index 00000000..adb0996a --- /dev/null +++ b/docs/PocketFlow/04_flow___flow____asyncflow___.md @@ -0,0 +1,323 @@ +# Chapter 4: Flow (`Flow`, `AsyncFlow`) + +In [Chapter 3: Actions / Transitions](03_actions___transitions__.md), we saw how individual [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) can decide what should happen next by returning "action" strings, and how these actions lead to "transitions" between Nodes. But what actually manages this sequence? What's the conductor of this orchestra of Nodes? That's where **Flows** come in! + +## What Problem Do Flows Solve? Meet the Orchestrator! + +Imagine you're building a simple AI application that interacts with a user: +1. **Greet User Node**: Displays a welcome message. +2. **Get Name Node**: Asks the user for their name and stores it. +3. **Personalized Message Node**: Uses the name to give a personalized response. + +Each step is a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md). But how do you ensure they run in the correct order? How does the "Get Name Node" know to run after "Greet User Node", and how is the name passed along? This is the job of a **Flow**. + +A **Flow** is like the **blueprint** or the **manager** of an assembly line. It defines the sequence of operations by connecting multiple [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) into a complete workflow. It dictates: +* Which [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) starts the process. +* How to move from one [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) to another based on the [Actions / Transitions](03_actions___transitions__.md) we learned about. +* It ensures the [shared dictionary](01_shared_state___shared__dictionary__.md) is passed along, so all [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) have access to the data they need. + +PocketFlow offers two main types of Flows: +* **`Flow`**: For workflows that consist primarily of synchronous [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) (tasks that run one after another, blocking until complete). +* **`AsyncFlow`**: For workflows that include asynchronous [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) (tasks that can "pause" and let other operations run, like waiting for user input or a network request). + +Let's see how to build and use them! + +## Building Your First `Flow` + +Let's create a simple text transformation workflow using `Flow`. It will: +1. Get text input from the user. +2. Offer transformation choices (uppercase, lowercase, etc.). +3. Transform the text. +4. Ask if the user wants to do another transformation or exit. + +This example is inspired by `cookbook/pocketflow-flow/flow.py`. + +**Step 1: Define Your Nodes** + +First, we need our worker [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md). (We'll use conceptual Node definitions here for brevity; refer to [Chapter 2](02_node___basenode____node____asyncnode__.md) for Node details). + +```python +# Assume these Nodes are defined (simplified from cookbook/pocketflow-flow/flow.py) +# from pocketflow import Node + +class TextInput(Node): # Gets input and choice + def post(self, shared, prep_res, exec_res): + # ... (gets user input for text and choice) ... + # shared["text"] = user_text + # shared["choice"] = user_choice + if shared["choice"] == "5": # Exit choice + return "exit" + return "transform" # Action to proceed to transformation + +class TextTransform(Node): # Transforms text based on choice + def post(self, shared, prep_res, exec_res): + # ... (transforms text, prints result) ... + # shared["transformed_text"] = result + if input("Convert another? (y/n): ") == 'y': + shared.pop("text", None) # Clear for next input + return "input" # Action to go back to TextInput + return "exit" # Action to end + +class EndNode(Node): # A simple Node to mark the end + pass +``` +* `TextInput`: Its `post` method will return `"transform"` to move to the `TextTransform` Node, or `"exit"`. +* `TextTransform`: Its `post` method will return `"input"` to loop back to `TextInput`, or `"exit"`. + +**Step 2: Instantiate Your Nodes** + +Create an instance of each Node class: +```python +text_input = TextInput() +text_transform = TextTransform() +end_node = EndNode() +``` + +**Step 3: Connect Nodes Using Transitions** + +Now, tell PocketFlow how these [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) connect based on the actions they return. We learned this in [Chapter 3: Actions / Transitions](03_actions___transitions__.md). + +```python +# If text_input returns "transform", go to text_transform +text_input - "transform" >> text_transform +# If text_input returns "exit" (or any other unhandled action by default for this setup) +# we'll eventually want it to go to end_node or the flow just ends. +# For simplicity here, let's make "exit" explicit if we want a dedicated end. +text_input - "exit" >> end_node # Or simply let it end if no "exit" transition + +# If text_transform returns "input", go back to text_input +text_transform - "input" >> text_input +# If text_transform returns "exit", go to end_node +text_transform - "exit" >> end_node +``` + +**Step 4: Create the `Flow`** + +Now, create an instance of the `Flow` class, telling it which [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) to start with. + +```python +from pocketflow import Flow + +# Create the flow, starting with the text_input node +app_flow = Flow(start=text_input) +``` +And that's it! `app_flow` is now a complete, runnable workflow. + +**Step 5: Run the `Flow`** + +To execute your workflow, you call its `run` method, usually with an initial [shared dictionary](01_shared_state___shared__dictionary__.md). + +```python +initial_shared_data = {} # Start with an empty shared dictionary +app_flow.run(initial_shared_data) + +# After the flow finishes, initial_shared_data might contain final results +# if your nodes were designed to store them there. +print("Flow finished!") +``` +When you run this: +1. `app_flow` will start with `text_input`. +2. `text_input` will execute (prompting you for text and choice). +3. Based on the action returned by `text_input` (e.g., `"transform"`), the `Flow` will look at the transitions you defined and execute the next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) (e.g., `text_transform`). +4. This continues until a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) returns an action for which no transition is defined, or it transitions to a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) like `end_node` that doesn't lead anywhere else. + +## Orchestrating Asynchronous Tasks with `AsyncFlow` + +What if your workflow involves tasks that wait for external operations, like fetching data from a website or waiting for a user to type something slowly? If you use a regular `Flow` and synchronous [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) for these, your whole application would freeze during these waits. + +This is where `AsyncFlow` and [Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md) come in. `AsyncFlow` is designed to work with `AsyncNode`s, which can perform tasks asynchronously. + +Let's look at a conceptual recipe finder flow (inspired by `cookbook/pocketflow-async-basic/flow.py`). + +**Step 1: Define Your AsyncNodes** +You'd define [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) using `AsyncNode` and `async def` methods. + +```python +# from pocketflow import AsyncNode, Node + +class FetchRecipes(AsyncNode): # Gets ingredient & fetches recipes (async) + async def post_async(self, shared, prep_res, exec_res): + # ... (stores recipes in shared) ... + return "suggest" # Action to suggest a recipe + +class SuggestRecipe(Node): # Suggests a recipe (can be sync) + def post(self, shared, prep_res, exec_res): + # ... (prints suggestion) ... + return "approve" # Action to get approval + +class GetApproval(AsyncNode): # Gets user approval (async) + async def post_async(self, shared, prep_res, exec_res): + # ... (gets approval) ... + if approved: return "accept" + return "retry" # Action to suggest another + +class EndFlowNode(Node): pass # Simple synchronous end node +``` + +**Step 2 & 3: Instantiate and Connect** +This is very similar to `Flow`: + +```python +fetch_recipes = FetchRecipes() +suggest_recipe = SuggestRecipe() +get_approval = GetApproval() +end_node = EndFlowNode() + +fetch_recipes - "suggest" >> suggest_recipe +suggest_recipe - "approve" >> get_approval +get_approval - "retry" >> suggest_recipe # Loop back +get_approval - "accept" >> end_node +``` + +**Step 4: Create the `AsyncFlow`** + +```python +from pocketflow import AsyncFlow + +recipe_flow = AsyncFlow(start=fetch_recipes) +``` +Notice we use `AsyncFlow` here. + +**Step 5: Run the `AsyncFlow`** + +Running an `AsyncFlow` involves `async` and `await` because the flow itself is asynchronous. + +```python +import asyncio + +async def main(): + initial_shared = {} + await recipe_flow.run_async(initial_shared) # Use run_async() + print("Recipe flow finished!") + +# To run the main async function +# asyncio.run(main()) +``` +The `AsyncFlow` will manage the `AsyncNode`s, allowing them to `await` their operations without blocking the entire event loop (if you're running other async tasks). We'll explore this more in [Chapter 5: Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md). + +## Nesting Flows: Managing Complexity + +What if your workflow becomes very large and complex? You can break it down! A **Flow can itself be treated as a Node and nested within another Flow.** This is like having a project manager who oversees several team leads, and each team lead manages their own team's tasks. + +Consider the `cookbook/pocketflow-supervisor/flow.py` example. It has an `agent_inner_flow` which handles research, and then an outer `Flow` that uses this `agent_inner_flow` as a step, followed by a `SupervisorNode` to check the agent's work. + +```python +# Conceptual: from cookbook/pocketflow-supervisor/flow.py +# agent_inner_flow is a complete Flow instance itself +agent_inner_flow = create_agent_inner_flow() +supervisor = SupervisorNode() + +# The inner flow is treated like a node in the outer flow's transitions +agent_inner_flow >> supervisor # Default transition +supervisor - "retry" >> agent_inner_flow + +supervised_flow = Flow(start=agent_inner_flow) +``` +Here, `agent_inner_flow` runs completely. When it finishes, the `supervised_flow` transitions to the `supervisor` Node. This is a powerful way to create hierarchical and modular workflows. + +## Under the Hood: How Do Flows Orchestrate? + +At its core, a `Flow` (or `AsyncFlow`) runs a loop that: +1. Identifies the current [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) to run (starting with its `start_node`). +2. Executes this [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) (which involves its `prep`, `exec`, and `post` methods). +3. Gets the "action" string returned by the Node's `post` method. +4. Uses this action string to look up the *next* [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) based on the transitions you defined (e.g., `current_node - "action" >> next_node`). +5. If a next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) is found, it becomes the current [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md), and the loop continues. +6. If no next [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) is found (no matching transition), the flow (or that branch of it) ends. + +Here's a simplified sequence diagram: + +```mermaid +sequenceDiagram + participant You + participant MyFlow as Flow Object + participant NodeA as Start Node + participant NodeB as Next Node + participant SharedDict as Shared Dictionary + + You->>MyFlow: flow.run(initial_shared) + MyFlow->>SharedDict: Initialize with initial_shared + MyFlow->>NodeA: _run(SharedDict) + NodeA-->>MyFlow: returns action_A (from NodeA's post method) + MyFlow->>MyFlow: get_next_node(NodeA, action_A) + Note right of MyFlow: Finds NodeB based on NodeA's transitions + MyFlow->>NodeB: _run(SharedDict) + NodeB-->>MyFlow: returns action_B (from NodeB's post method) + MyFlow->>MyFlow: get_next_node(NodeB, action_B) + Note right of MyFlow: No more nodes or no transition found. Flow ends. + MyFlow-->>You: Flow execution complete +``` + +**A Glimpse into the Code (`pocketflow/__init__.py`):** + +The `Flow` class inherits from `BaseNode`, so it also has `prep`, `exec`, `post` methods. Its main job is done in its orchestration logic. + +1. **Initialization:** When you create a `Flow`, you give it a starting [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md). + ```python + # Inside Flow class + def __init__(self, start=None): + super().__init__() # Initialize BaseNode parts + self.start_node = start # Store the starting node + ``` + +2. **Getting the Next Node:** The `get_next_node` method is crucial. It checks the current node's `successors` dictionary (which was populated by your transition definitions like `nodeA - "action" >> nodeB`). + ```python + # Inside Flow class + def get_next_node(self, current_node, action_str): + # Try specific action, then "default" + next_node = current_node.successors.get(action_str) + if not next_node: # If specific action's successor not found + next_node = current_node.successors.get("default") + # ... (warnings if no successor found but some exist) ... + return next_node + ``` + +3. **The Orchestration Loop (`_orch`):** This is the heart of the `Flow`. + ```python + # Inside Flow class (simplified) + def _orch(self, shared, params=None): + current_node = self.start_node + last_action = None + while current_node: + # ... (set parameters for current_node if any) ... + last_action = current_node._run(shared) # Run the node + # Get the next node based on the action from the current one + current_node = self.get_next_node(current_node, last_action) + return last_action # Returns the very last action from the flow + ``` + The `current_node._run(shared)` call is what executes the `prep -> exec -> post` cycle of that [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md). + +For `AsyncFlow`, the structure is very similar. It has an `_orch_async` method: +```python +# Inside AsyncFlow class (conceptual) +async def _orch_async(self, shared, params=None): + current_node = self.start_node + last_action = None + while current_node: + # ... + if isinstance(current_node, AsyncNode): + last_action = await current_node._run_async(shared) # Await async nodes + else: + last_action = current_node._run(shared) # Run sync nodes normally + current_node = self.get_next_node(current_node, last_action) + return last_action +``` +The key difference is that it `await`s the `_run_async` method of `AsyncNode`s, allowing for non-blocking execution. + +## Conclusion + +You've now learned about **`Flow`** and **`AsyncFlow`**, the orchestrators that bring your [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) together to form complete, dynamic workflows! +* Flows define the sequence and logic of how [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) are executed. +* They use the "action" strings returned by [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) and the transition rules you define (e.g., `nodeA - "action" >> nodeB`) to decide the path of execution. +* `Flow` is for synchronous workflows, while `AsyncFlow` handles workflows with asynchronous tasks using `AsyncNode`s. +* Flows can be nested to manage complexity. + +With Flows, you can build anything from simple linear sequences to complex, branching, and looping AI applications. + +In the next chapter, we'll take a much deeper dive into the world of asynchronous operations specifically, exploring how `AsyncNode` and `AsyncFlow` enable you to build responsive, I/O-bound applications efficiently. + +Next up: [Chapter 5: Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/05_asynchronous_processing___asyncnode____asyncflow___.md b/docs/PocketFlow/05_asynchronous_processing___asyncnode____asyncflow___.md new file mode 100644 index 00000000..52fb4917 --- /dev/null +++ b/docs/PocketFlow/05_asynchronous_processing___asyncnode____asyncflow___.md @@ -0,0 +1,238 @@ +# Chapter 5: Asynchronous Processing (`AsyncNode`, `AsyncFlow`) + +In [Chapter 4: Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md), we learned how `Flow` and `AsyncFlow` orchestrate sequences of [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) to create complete applications. Now, we're going to zoom in on a powerful feature that `AsyncFlow` enables: **Asynchronous Processing**. This is key to building AI applications that feel responsive and can handle tasks that involve waiting, like calling web APIs or interacting with users. + +## The Problem: Waiting Can Be Wasteful! + +Imagine you're building an AI assistant that needs to: +1. Ask the user for a city name. +2. Fetch the current weather for that city from an online weather service (this involves a network request, which can take a few seconds). +3. Tell the user the weather. + +If we build this "synchronously" (one step strictly after the other, waiting for each to finish), your application would *freeze* while it's waiting for the weather service. The user can't do anything else; the app just hangs. This isn't a great experience! + +This is where asynchronous processing helps. It's like a skilled chef in a busy kitchen. +* A **synchronous chef** would prepare one dish from start to finish: chop vegetables, put it on the stove, wait for it to simmer, then plate it. Only *after* that one dish is completely done would they start the next. If simmering takes 20 minutes, they're just standing there waiting! +* An **asynchronous chef** is much more efficient! They can start chopping vegetables for dish A, put it on the stove to simmer, and *while it's simmering* (a waiting period), they can start preparing dish B, or perhaps clean up. They don't idly wait; they switch to other tasks that can be done. + +PocketFlow's `AsyncNode` and `AsyncFlow` let your AI application be like that efficient, asynchronous chef. + +## What is Asynchronous Processing? + +Asynchronous processing allows your program to start a potentially long-running task (like an API call or waiting for user input) and then, instead of freezing and waiting for it to complete, it can switch to doing other work. When the long-running task eventually finishes, the program can pick up where it left off with that task. + +This is especially crucial for **I/O-bound tasks**. "I/O" stands for Input/Output, like: +* Reading/writing files from a disk. +* Making requests over a network (e.g., to an LLM API, a database, or a web service). +* Waiting for user input. + +These tasks often involve waiting for something external to the program itself. Asynchronous processing ensures your application remains responsive and can handle multiple things (seemingly) at once, improving overall throughput and user experience. + +In Python, this is often achieved using the `async` and `await` keywords. +* `async def` is used to define an asynchronous function (also called a "coroutine"). +* `await` is used inside an `async` function to pause its execution until an awaited task (another coroutine or an I/O operation) completes. While paused, other asynchronous tasks can run. + +## Meet `AsyncNode`: The Asynchronous Worker + +In PocketFlow, an `AsyncNode` is a special type of [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) designed for asynchronous operations. It looks very similar to a regular `Node`, but its core methods (`prep`, `exec`, `post`) are defined as `async` functions: + +* `async def prep_async(self, shared)` +* `async def exec_async(self, prep_res)` +* `async def post_async(self, shared, prep_res, exec_res)` + +Inside these methods, you can use `await` to call other asynchronous functions or perform non-blocking I/O operations. + +Let's create a simple `AsyncNode` that simulates fetching data from a website. We'll use `asyncio.sleep()` to mimic the delay of a network request. + +```python +import asyncio +from pocketflow import AsyncNode + +class WeatherFetcherNode(AsyncNode): + async def prep_async(self, shared): + city = shared.get("city_name", "Unknown city") + print(f"WeatherFetcherNode: Preparing to fetch weather for {city}.") + return city + + async def exec_async(self, city): + print(f"WeatherFetcherNode: Calling weather API for {city}...") + await asyncio.sleep(2) # Simulate a 2-second API call + weather_data = f"Sunny in {city}" + print(f"WeatherFetcherNode: Got weather: {weather_data}") + return weather_data + + async def post_async(self, shared, prep_res, exec_res): + shared["weather_report"] = exec_res + print(f"WeatherFetcherNode: Weather report stored in shared.") + return "done" # Action to signify completion +``` +In this `WeatherFetcherNode`: +* All methods are `async def`. +* `exec_async` uses `await asyncio.sleep(2)` to pause for 2 seconds. If this were a real application, it might be `await http_client.get(...)`. While this `await` is active, other asynchronous tasks in your program could run. + +## Orchestrating with `AsyncFlow` + +To run `AsyncNode`s, you need an `AsyncFlow`. As we saw in [Chapter 4: Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md), an `AsyncFlow` can manage both `AsyncNode`s and regular `Node`s. When it encounters an `AsyncNode`, it will correctly `await` its asynchronous methods. + +Let's set up an `AsyncFlow` to use our `WeatherFetcherNode`. + +**1. Instantiate your Node(s):** +```python +weather_node = WeatherFetcherNode() +# You could have other nodes here, sync or async +``` + +**2. (Optional) Define Transitions:** +If you have multiple nodes, you define transitions as usual. Since we only have one node, its returned action `"done"` will simply end this branch of the flow. + +```python +# Example: weather_node - "done" >> some_other_node +# For this example, we'll let it end. +``` + +**3. Create the `AsyncFlow`:** +```python +from pocketflow import AsyncFlow + +weather_flow = AsyncFlow(start=weather_node) +``` + +**4. Run the `AsyncFlow`:** +Running an `AsyncFlow` requires `await` because the flow itself is an asynchronous operation. You'll typically do this inside an `async` function. + +```python +# main.py +import asyncio + +# Assume WeatherFetcherNode is defined as above +# Assume weather_flow is created as above + +async def main(): + shared_data = {"city_name": "London"} + print("Starting weather flow...") + await weather_flow.run_async(shared_data) # Use run_async() + print("Weather flow finished.") + print(f"Final shared data: {shared_data}") + +if __name__ == "__main__": + asyncio.run(main()) # Standard way to run an async main function +``` + +**Expected Output/Behavior:** + +When you run `main.py`: +1. "Starting weather flow..." is printed. +2. `WeatherFetcherNode: Preparing to fetch weather for London.` is printed. +3. `WeatherFetcherNode: Calling weather API for London...` is printed. +4. The program will *pause* here for about 2 seconds (due to `await asyncio.sleep(2)`). If other `async` tasks were scheduled, Python's event loop could run them during this time. +5. After 2 seconds: + * `WeatherFetcherNode: Got weather: Sunny in London` is printed. + * `WeatherFetcherNode: Weather report stored in shared.` is printed. +6. "Weather flow finished." is printed. +7. `Final shared data: {'city_name': 'London', 'weather_report': 'Sunny in London'}` is printed. + +The key is that during the 2-second "API call," a well-structured asynchronous application wouldn't be frozen. It could be handling other user requests, updating a UI, or performing other background tasks. + +## What Happens Under the Hood? + +When an `AsyncFlow` runs an `AsyncNode`, it leverages Python's `asyncio` event loop. + +1. **`AsyncFlow` starts:** You call `await my_async_flow.run_async(shared)`. +2. **Node Execution:** The `AsyncFlow`'s orchestrator (`_orch_async`) identifies the current node. +3. **Calling `_run_async`:** If the current node is an `AsyncNode` (like our `WeatherFetcherNode`), the `AsyncFlow` calls `await current_node._run_async(shared)`. +4. **Inside `AsyncNode`:** + * `_run_async` calls `await self.prep_async(shared)`. + * Then, `await self._exec(prep_result)` (which internally calls `await self.exec_async(prep_result)`). + * Finally, `await self.post_async(shared, prep_result, exec_result)`. +5. **The `await` Keyword:** When an `AsyncNode`'s method encounters an `await` statement (e.g., `await asyncio.sleep(2)` or `await some_api_call()`), execution of *that specific node's task* pauses. Control is yielded back to the `asyncio` event loop. +6. **Event Loop Magic:** The event loop can then run other pending asynchronous tasks. It keeps track of the paused task. +7. **Task Resumes:** When the awaited operation completes (e.g., `asyncio.sleep(2)` finishes, or the API responds), the event loop resumes the paused `AsyncNode` task from where it left off. +8. **Action and Next Node:** The `AsyncNode`'s `post_async` eventually returns an action, and the `AsyncFlow` determines the next node, continuing the process. + +Here's a sequence diagram to visualize it: + +```mermaid +sequenceDiagram + participant UserApp as Your main() + participant AFlow as AsyncFlow + participant ANode as AsyncNode (e.g., WeatherFetcherNode) + participant IOSim as Simulated I/O (e.g., asyncio.sleep) + participant EventLoop as Python Event Loop + + UserApp->>AFlow: await flow.run_async(shared) + AFlow->>ANode: await node._run_async(shared) + ANode->>ANode: await self.prep_async(shared) + ANode->>ANode: await self.exec_async(prep_res) + Note over ANode,IOSim: e.g., await asyncio.sleep(2) + ANode->>IOSim: Start sleep operation + Note over ANode, EventLoop: Task yields control to Event Loop + EventLoop->>EventLoop: (Runs other tasks, if any) + IOSim-->>ANode: Sleep operation complete + Note over ANode, EventLoop: Task resumes + ANode->>ANode: await self.post_async(shared, exec_res) + ANode-->>AFlow: Returns action (e.g., "done") + AFlow-->>UserApp: Flow complete (shared is updated) +``` + +**Diving into PocketFlow's Code (Simplified):** + +* **`AsyncNode`'s Execution (`pocketflow/__init__.py`):** + The `AsyncNode` has an `_run_async` method: + ```python + # Inside AsyncNode class + async def _run_async(self, shared): + p = await self.prep_async(shared) + e = await self._exec(p) # _exec calls exec_async with retries + return await self.post_async(shared, p, e) + ``` + And its `_exec` method handles calling `exec_async` (and retries, similar to `Node` but `async`): + ```python + # Inside AsyncNode class (simplified _exec) + async def _exec(self, prep_res): + # ... (retry loop) ... + try: + return await self.exec_async(prep_res) # Key: await exec_async + except Exception as e: + # ... (fallback logic) ... + ``` + +* **`AsyncFlow`'s Orchestration (`pocketflow/__init__.py`):** + The `AsyncFlow` has an `_orch_async` method that handles running nodes: + ```python + # Inside AsyncFlow class + async def _orch_async(self, shared, params=None): + curr, p, last_action = self.start_node, (params or {}), None + while curr: + # ... (set params for current node) ... + if isinstance(curr, AsyncNode): + last_action = await curr._run_async(shared) # AWAIT AsyncNode + else: + last_action = curr._run(shared) # Run sync Node normally + curr = self.get_next_node(curr, last_action) + return last_action + ``` + Notice how it checks if `curr` is an `AsyncNode` and uses `await curr._run_async(shared)` if it is. Otherwise, for regular synchronous [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md), it calls `curr._run(shared)` directly. + +## Benefits of Asynchronous Processing + +1. **Responsiveness:** Your application doesn't freeze while waiting for I/O tasks. This is vital for user interfaces or servers handling multiple requests. +2. **Improved Throughput:** For applications with many I/O-bound tasks (e.g., making multiple API calls), asynchronous processing allows these tasks to overlap their waiting periods, leading to faster overall completion. Imagine our chef preparing multiple simmering dishes at once! +3. **Efficient Resource Usage:** Threads can be resource-intensive. `asyncio` often uses a single thread more efficiently by switching between tasks during their I/O wait times. + +Use `AsyncNode` and `AsyncFlow` when your workflow involves tasks that spend significant time waiting for external operations. + +## Conclusion + +You've now unlocked the power of asynchronous processing in PocketFlow with `AsyncNode` and `AsyncFlow`! +* Asynchronous operations prevent your application from freezing during I/O-bound tasks like API calls. +* `AsyncNode` defines its logic with `async def` methods (`prep_async`, `exec_async`, `post_async`) and uses `await` for non-blocking waits. +* `AsyncFlow` orchestrates these `AsyncNode`s (and regular `Node`s) using `await flow.run_async()`. +* This approach leads to more responsive and efficient applications, especially when dealing with network requests or user interactions. + +This "asynchronous chef" model is incredibly useful. What if you have many similar items to process, perhaps even asynchronously and in parallel? That's where batch processing comes in. + +Next up: [Chapter 6: Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`)](06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md b/docs/PocketFlow/06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md new file mode 100644 index 00000000..d8896b58 --- /dev/null +++ b/docs/PocketFlow/06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md @@ -0,0 +1,321 @@ +# Chapter 6: Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`) + +In [Chapter 5: Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md), we explored how `AsyncNode` and `AsyncFlow` help build responsive applications that can handle waiting for tasks like API calls. Now, what if you need to perform a similar operation on *many* different items? For example, imagine you have a document, and you want to translate it into ten different languages. Doing this one by one, or even coordinating many asynchronous calls manually, can be cumbersome. PocketFlow provides specialized tools for exactly this: **Batch Processing**. + +Batch processing in PocketFlow allows you to efficiently apply a piece of logic to a collection of items, simplifying the code and often improving performance, especially with parallel execution. + +Our main use case for this chapter will be: **Translating a single document into multiple target languages.** + +Let's explore the tools PocketFlow offers for this: + +## 1. `BatchNode`: The Sequential Worker for Batches + +A `BatchNode` is designed to process a list of items one after the other (sequentially). It's like a meticulous librarian who takes a stack of books and processes each one individually before moving to the next. + +**How it Works:** +1. **`prep(self, shared)`**: This method is responsible for preparing your list of individual items to be processed. It should return an iterable (like a list) where each element is a single item for processing. +2. **`exec(self, item)`**: This method is called *for each individual item* returned by `prep`. It contains the logic to process that single `item`. +3. **`post(self, shared, prep_res, exec_res_list)`**: After all items have been processed by `exec`, this method is called. `exec_res_list` will be a list containing the results from each call to `exec`, in the same order as the input items. + +**Example: Processing a Large CSV in Chunks** + +Let's look at `CSVProcessor` from `cookbook/pocketflow-batch-node/nodes.py`. It reads a large CSV file not all at once, but in smaller "chunks" (batches of rows). + +```python +# cookbook/pocketflow-batch-node/nodes.py +import pandas as pd +from pocketflow import BatchNode + +class CSVProcessor(BatchNode): + def __init__(self, chunk_size=1000): + super().__init__() + self.chunk_size = chunk_size + + def prep(self, shared): + # Returns an iterator of DataFrame chunks + chunks = pd.read_csv( + shared["input_file"], chunksize=self.chunk_size + ) + return chunks # Each 'chunk' is an item + + def exec(self, chunk): # Called for each chunk + # Process one chunk (a pandas DataFrame) + return { "total_sales": chunk["amount"].sum(), # ... more stats ... + } + + def post(self, shared, prep_res, exec_res_list): + # exec_res_list contains results from all chunks + # ... (combine statistics from all chunks) ... + shared["statistics"] = { # ... final aggregated stats ... + } + return "show_stats" +``` +* `prep`: Reads the CSV specified in `shared["input_file"]` and returns an iterator where each item is a `DataFrame` (a chunk of rows). +* `exec`: Takes one `chunk` (a `DataFrame`) and calculates some statistics for it. This method will be called multiple times, once for each chunk from `prep`. +* `post`: Receives `exec_res_list`, which is a list of dictionaries (one from each `exec` call). It then aggregates these results and stores the final statistics in `shared`. + +This `BatchNode` processes each chunk sequentially. + +## 2. `AsyncParallelBatchNode`: The Concurrent Worker for Batches + +What if processing each item involves waiting (like an API call), and you want to do them concurrently to save time? That's where `AsyncParallelBatchNode` comes in. It's like `BatchNode` but for asynchronous operations that can run in parallel. Imagine a team of librarians, each given a book from the stack, processing them all at the same time. + +**How it Works:** +1. **`async def prep_async(self, shared)`**: Similar to `BatchNode.prep`, but asynchronous. It returns a list of items to be processed. +2. **`async def exec_async(self, item)`**: This asynchronous method is called for each item. PocketFlow will use `asyncio.gather` to run these `exec_async` calls concurrently for all items. +3. **`async def post_async(self, shared, prep_res, exec_res_list)`**: Called after all `exec_async` calls have completed. `exec_res_list` contains their results. + +**Solving Our Use Case: Translating a Document into Multiple Languages** + +The `AsyncParallelBatchNode` is perfect for our document translation task. Let's look at `TranslateTextNodeParallel` from `cookbook/pocketflow-parallel-batch/main.py`. + +```python +# cookbook/pocketflow-parallel-batch/main.py (simplified) +from pocketflow import AsyncFlow, AsyncParallelBatchNode +# from utils import call_llm # Assumed async LLM call + +class TranslateTextNodeParallel(AsyncParallelBatchNode): + async def prep_async(self, shared): + text = shared.get("text", "") + languages = shared.get("languages", []) + # Create a list of (text_to_translate, target_language) tuples + return [(text, lang) for lang in languages] + + async def exec_async(self, data_tuple): + text, language = data_tuple # One (text, language) pair + # prompt = f"Translate '{text}' to {language}..." + # result = await call_llm(prompt) # Call LLM API + print(f"Translated to {language}") # Simplified + return {"language": language, "translation": f"Translated: {language}"} + + async def post_async(self, shared, prep_res, exec_res_list): + # exec_res_list has all translation results + # ... (code to save each translation to a file) ... + print(f"All {len(exec_res_list)} translations processed.") + return "default" # Or some other action + +# To run this, you'd typically wrap it in an AsyncFlow: +# translate_node = TranslateTextNodeParallel() +# translation_flow = AsyncFlow(start=translate_node) +# await translation_flow.run_async(shared_data_with_text_and_languages) +``` +In this example: +* `prep_async`: Takes the document `text` and a list of `languages` from `shared`. It returns a list of tuples, e.g., `[(original_text, "Spanish"), (original_text, "French"), ...]`. Each tuple is an "item" for `exec_async`. +* `exec_async`: Takes one `(text, language)` tuple, calls an asynchronous LLM function (`call_llm`) to perform the translation, and returns a dictionary with the result. Because this is an `AsyncParallelBatchNode`, PocketFlow will try to run these LLM calls for all languages concurrently! +* `post_async`: Gets the list of all translation results and, in the full example, saves them to files. + +This drastically speeds up the overall translation process compared to doing them one by one. + +## 3. `BatchFlow`: Running a Sub-Workflow Multiple Times + +Sometimes, the "logic" you want to apply to a collection isn't just a single `exec` method, but a whole sub-workflow (which could be a single [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) or a more complex [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md)). You want to run this sub-workflow multiple times, each time with slightly different *parameters*. This is what `BatchFlow` is for. + +Think of a film director who has a specific scene (the sub-workflow) and wants to shoot it multiple times, but each time with different actors or lighting (the parameters). + +**How it Works:** +1. The `BatchFlow` is initialized with a `start` component, which is the sub-workflow (a [Node (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md) or [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md)) to be run multiple times. +2. **`prep(self, shared)`**: This method of the `BatchFlow` itself should return a list of parameter dictionaries. Each dictionary represents one "run" of the sub-workflow. +3. For each parameter dictionary from `prep`, the `BatchFlow` executes its `start` component (the sub-workflow). The parameters from the dictionary are made available to the sub-workflow for that particular run, usually merged into its `shared` context or node `params`. +4. **`post(self, shared, prep_res, exec_res)`**: This is called after all batch executions of the sub-workflow are done. Note: `exec_res` here is often `None` because the results of each sub-workflow execution are typically handled within those sub-workflows by writing to `shared`. + +**Example: Applying Different Filters to Multiple Images** + +Consider `cookbook/pocketflow-batch-flow/flow.py`. We want to process several images, applying a different filter to each (or multiple filters to each image). + +First, a base [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) defines how to process *one* image with *one* filter: +```python +# cookbook/pocketflow-batch-flow/flow.py (simplified base_flow) +# from pocketflow import Flow +# from nodes import LoadImage, ApplyFilter, SaveImage + +def create_base_flow(): # This is our sub-workflow + load = LoadImage() + filter_node = ApplyFilter() + save = SaveImage() + + load - "apply_filter" >> filter_node + filter_node - "save" >> save + return Flow(start=load) # Base flow for one image-filter pair +``` + +Now, the `ImageBatchFlow`: +```python +# cookbook/pocketflow-batch-flow/flow.py (ImageBatchFlow) +# from pocketflow import BatchFlow + +class ImageBatchFlow(BatchFlow): + def prep(self, shared): + images = ["cat.jpg", "dog.jpg"] + filters = ["grayscale", "blur"] + params = [] # List of parameter dictionaries + for img in images: + for f in filters: + # Each dict is one set of params for the base_flow + params.append({"input_image_path": img, "filter_type": f}) + return params + +# How to use it: +# base_processing_logic = create_base_flow() +# image_processor = ImageBatchFlow(start=base_processing_logic) +# image_processor.run(initial_shared_data) +``` +* `ImageBatchFlow.prep`: Generates a list of parameter dictionaries. Each dictionary specifies an input image and a filter type, e.g., `[{"input_image_path": "cat.jpg", "filter_type": "grayscale"}, {"input_image_path": "cat.jpg", "filter_type": "blur"}, ...]`. +* When `image_processor.run()` is called, the `base_processing_logic` ([Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md)) will be executed for *each* of these parameter dictionaries. The `LoadImage` node inside `base_processing_logic` would then use `params["input_image_path"]`, and `ApplyFilter` would use `params["filter_type"]`. + +## 4. `AsyncParallelBatchFlow`: Running Sub-Workflows in Parallel + +Just as `AsyncParallelBatchNode` is the concurrent version of `BatchNode`, `AsyncParallelBatchFlow` is the concurrent version of `BatchFlow`. It runs the multiple executions of its sub-workflow *in parallel*. + +This is like having multiple film crews, each with their own set, shooting different variations of the same scene (sub-workflow with different parameters) all at the same time. + +**How it Works:** +Similar to `BatchFlow`, but: +1. Uses `async def prep_async(self, shared)` to generate the list of parameter dictionaries. +2. When run with `await my_flow.run_async()`, it executes the sub-workflow for each parameter set concurrently using `asyncio.gather`. + +**Example: Parallel Image Processing with Filters** +The `cookbook/pocketflow-parallel-batch-flow/flow.py` shows an `ImageParallelBatchFlow`. +```python +# cookbook/pocketflow-parallel-batch-flow/flow.py (Conceptual) +# from pocketflow import AsyncParallelBatchFlow +# from nodes import LoadImageAsync, ApplyFilterAsync, SaveImageAsync +# (assuming async versions of nodes for the base async flow) + +# def create_async_base_flow(): ... returns an AsyncFlow ... + +class ImageParallelBatchFlow(AsyncParallelBatchFlow): + async def prep_async(self, shared): + # ... (generates list of param dicts like before) ... + # params.append({"image_path": img, "filter": f_type}) + return params + +# How to use it: +# async_base_logic = create_async_base_flow() # An AsyncFlow +# parallel_processor = ImageParallelBatchFlow(start=async_base_logic) +# await parallel_processor.run_async(initial_shared_data) +``` +This would run the `async_base_logic` for each image-filter combination in parallel, potentially speeding up processing if the sub-workflow involves `await`able operations. + +## Under the Hood: A Glimpse + +Let's briefly see how these batch components achieve their magic, using simplified logic. + +**`BatchNode`** +Its `_exec` method essentially loops through the items from `prep` and calls its parent's `_exec` (which eventually calls your `exec` method) for each one. +```python +# pocketflow/__init__.py (BatchNode simplified) +class BatchNode(Node): + def _exec(self, items_from_prep): + results = [] + for item in (items_from_prep or []): + # Calls Node._exec(item), which calls self.exec(item) + result_for_item = super(BatchNode, self)._exec(item) + results.append(result_for_item) + return results # This list becomes exec_res_list in post() +``` + +**`AsyncParallelBatchNode`** +Its `_exec` method uses `asyncio.gather` to run the processing of all items concurrently. +```python +# pocketflow/__init__.py (AsyncParallelBatchNode simplified) +class AsyncParallelBatchNode(AsyncNode, BatchNode): # Inherits from AsyncNode + async def _exec(self, items_from_prep_async): + tasks = [] + for item in items_from_prep_async: + # Create a task for super()._exec(item) + # super()._exec eventually calls self.exec_async(item) + task = super(AsyncParallelBatchNode, self)._exec(item) + tasks.append(task) + return await asyncio.gather(*tasks) # Run all tasks concurrently +``` +```mermaid +sequenceDiagram + participant UserApp + participant APBN as AsyncParallelBatchNode + participant Item1Proc as exec_async(item1) + participant Item2Proc as exec_async(item2) + participant EventLoop + + UserApp->>APBN: await node.run_async(shared) + APBN->>APBN: await self.prep_async(shared) + Note right of APBN: Returns [item1, item2] + APBN->>APBN: await self._exec([item1, item2]) + APBN->>EventLoop: asyncio.gather(exec_async(item1), exec_async(item2)) + EventLoop-->>Item1Proc: Start + EventLoop-->>Item2Proc: Start + Note over Item1Proc, Item2Proc: Both run concurrently + Item1Proc-->>EventLoop: Done (result1) + Item2Proc-->>EventLoop: Done (result2) + EventLoop-->>APBN: Returns [result1, result2] + APBN->>APBN: await self.post_async(shared, ..., [result1, result2]) + APBN-->>UserApp: Final action +``` + +**`BatchFlow`** +Its `_run` method iterates through the parameter dictionaries from `prep` and, for each one, calls `_orch` (the standard [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) orchestration method) to run its `start` component with those parameters. +```python +# pocketflow/__init__.py (BatchFlow simplified) +class BatchFlow(Flow): + def _run(self, shared): + param_list = self.prep(shared) or [] + for param_set in param_list: + # Run the entire sub-workflow (self.start_node) + # with current param_set merged. + # self.params are the BatchFlow's own params. + merged_params = {**self.params, **param_set} + self._orch(shared, merged_params) # _orch runs the sub-flow + return self.post(shared, param_list, None) +``` +```mermaid +sequenceDiagram + participant UserApp + participant BF as BatchFlow + participant SubFlowOrch as Sub-Workflow Orchestration (_orch) + + UserApp->>BF: flow.run(shared) + BF->>BF: self.prep(shared) + Note right of BF: Returns [params1, params2] + BF->>SubFlowOrch: _orch(shared, params1) + Note right of SubFlowOrch: Sub-workflow runs with params1 + SubFlowOrch-->>BF: Completes + BF->>SubFlowOrch: _orch(shared, params2) + Note right of SubFlowOrch: Sub-workflow runs with params2 + SubFlowOrch-->>BF: Completes + BF->>BF: self.post(shared, ...) + BF-->>UserApp: Final action +``` + +**`AsyncParallelBatchFlow`** +Its `_run_async` method is similar to `BatchFlow._run` but uses `asyncio.gather` to run all the `_orch_async` calls (for its sub-workflow) in parallel. +```python +# pocketflow/__init__.py (AsyncParallelBatchFlow simplified) +class AsyncParallelBatchFlow(AsyncFlow, BatchFlow): + async def _run_async(self, shared): + param_list = await self.prep_async(shared) or [] + tasks = [] + for param_set in param_list: + merged_params = {**self.params, **param_set} + # Create a task for each sub-workflow run + task = self._orch_async(shared, merged_params) + tasks.append(task) + await asyncio.gather(*tasks) # Run all sub-workflow instances concurrently + return await self.post_async(shared, param_list, None) +``` + +## Conclusion + +Batch processing tools in PocketFlow—`BatchNode`, `AsyncParallelBatchNode`, `BatchFlow`, and `AsyncParallelBatchFlow`—provide powerful and convenient ways to handle collections of items or run workflows multiple times with varying parameters. +* Use **`BatchNode`** for sequential processing of a list of items where `exec` defines the logic for one item. +* Use **`AsyncParallelBatchNode`** for concurrent processing of items, ideal for I/O-bound tasks like multiple API calls (our translation example). +* Use **`BatchFlow`** when you have a sub-workflow that needs to be run multiple times sequentially, each time with different parameters. +* Use **`AsyncParallelBatchFlow`** to run instances of a sub-workflow concurrently with different parameters. + +These abstractions help keep your code clean, manage complexity, and leverage concurrency for better performance. + +So far, we've seen how individual agents or flows can be constructed. But what if you need multiple, distinct AI agents to collaborate and communicate with each other? + +Next up: [Chapter 7: A2A (Agent-to-Agent) Communication Framework](07_a2a__agent_to_agent__communication_framework.md) + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/07_a2a__agent_to_agent__communication_framework_.md b/docs/PocketFlow/07_a2a__agent_to_agent__communication_framework_.md new file mode 100644 index 00000000..e754c143 --- /dev/null +++ b/docs/PocketFlow/07_a2a__agent_to_agent__communication_framework_.md @@ -0,0 +1,338 @@ +# Chapter 7: A2A (Agent-to-Agent) Communication Framework + +Welcome to the final chapter of our PocketFlow journey! In [Chapter 6: Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`)](06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md), we saw how to process multiple items or run workflows repeatedly. Now, we'll explore how to make your PocketFlow agents available to the wider world, allowing them to communicate with other systems or agents using a standard "language." + +## The Challenge: Making Your Agent a Team Player + +Imagine you've built a fantastic PocketFlow agent that can research topics and answer questions. It's great for your own use, but what if: +* Another team in your company wants their AI system to ask questions of your agent? +* You want to offer your agent's capabilities as a service that other applications can call? +* You want to build a larger system composed of multiple specialized agents that need to collaborate? + +These scenarios require a **standardized way for agents to talk to each other**. Simply sharing Python code or relying on custom integrations isn't scalable or interoperable. This is where the **A2A (Agent-to-Agent) Communication Framework** comes in. + +**Our Use Case:** We want to take the PocketFlow-based research agent we've been conceptualizing (which uses [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) and a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md)) and make it accessible via a standard A2A interface. Another program (a client) should be able to send it a question (e.g., "What is PocketFlow?") and receive an answer, all using this A2A standard. + +The A2A framework in PocketFlow provides components that wrap your agent, allowing it to understand and speak the A2A JSON-RPC specification. Think of it like giving your agent a universal translator and a public phone line. + +## Key Components of the A2A Framework + +The A2A framework in `PocketFlow` consists of a few main parts that work together: + +1. **A2A JSON-RPC Specification (The "Language")**: This isn't code, but a standard agreement on how agents communicate. It uses JSON-RPC, a lightweight remote procedure call protocol using JSON. It defines methods like `tasks/send` (to give an agent a job) and `tasks/get` (to check on a job), and the structure of messages. PocketFlow's A2A components adhere to this spec. + * **Analogy**: If agents are from different countries, JSON-RPC is the agreed-upon common language (like Esperanto or English as a lingua franca) they'll use to talk. + +2. **Common `types` (The "Vocabulary and Grammar")**: These are pre-defined Python Pydantic models (found in `cookbook/pocketflow-a2a/common/types.py`) that represent the structure of all A2A messages. This includes `Task`, `Message`, `Artifact`, `TextPart`, `JSONRPCRequest`, `JSONRPCResponse`, etc. Using these types ensures that both the client and server understand the format of the data being exchanged. + * **Analogy**: These are the specific words and sentence structures within the agreed-upon language. + +3. **`A2AServer` (The "Receptionist")**: This component hosts your PocketFlow agent. It listens for incoming A2A requests over HTTP, understands the A2A JSON-RPC protocol, and passes the work to your agent (via the `TaskManager`). + * **Analogy**: The `A2AServer` is like the public-facing receptionist for your PocketFlow agent. It answers the "phone" (HTTP requests) and speaks the standard A2A language. + +4. **`TaskManager` (The "Internal Translator")**: This is the crucial bridge. It receives instructions from the `A2AServer` (which are in the A2A format), translates them into something your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) can understand (typically by preparing the [shared dictionary](01_shared_state___shared__dictionary__.md)), runs your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md), and then takes the results from the [shared dictionary](01_shared_state___shared__dictionary__.md) and packages them back into the A2A format for the `A2AServer` to send out. + * **Analogy**: If your PocketFlow agent only speaks "PocketFlow-ese," the `TaskManager` is the internal assistant who translates A2A language from the receptionist into PocketFlow-ese and vice-versa. + +5. **`A2AClient` (The "Caller")**: This component allows you (or another system) to interact with an agent hosted by an `A2AServer`. It knows how to formulate A2A JSON-RPC requests and understand the responses. + * **Analogy**: The `A2AClient` is someone using the public phone line to call your agent's receptionist. + +Let's see how to use these to make our PocketFlow research agent accessible. + +## Making Your PocketFlow Agent A2A-Compatible + +Let's assume you've already built your core PocketFlow agent using [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md) and a [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md), perhaps similar to the one in `cookbook/pocketflow-a2a/flow.py` that can take a question and produce an answer. The main function in `flow.py` to get this flow is `create_agent_flow()`. + +**Step 1: Create Your `TaskManager`** + +The `TaskManager` connects the A2A world to your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). We'll create a `PocketFlowTaskManager` that inherits from a base `InMemoryTaskManager` (which handles storing task states). + +Here's a simplified look at `PocketFlowTaskManager` from `cookbook/pocketflow-a2a/task_manager.py`: + +```python +# In task_manager.py +from common.server.task_manager import InMemoryTaskManager +from common.types import ( # A2A standard message types + SendTaskRequest, SendTaskResponse, TaskState, TaskStatus, + TextPart, Artifact, Message +) +from flow import create_agent_flow # Your PocketFlow agent logic + +class PocketFlowTaskManager(InMemoryTaskManager): + async def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse: + # 1. Get the question from the A2A request + query = self._get_user_query(request.params) # Helper to extract text + if not query: + # ... handle error: no query found ... + + # 2. Prepare shared data for your PocketFlow agent + shared_data = {"question": query} + agent_flow = create_agent_flow() # Get your PocketFlow flow + + # 3. Run your PocketFlow agent + try: + agent_flow.run(shared_data) # This modifies shared_data + # 'shared_data' now contains the answer, e.g., shared_data["answer"] + except Exception as e: + # ... handle agent execution error ... + + # 4. Package the result into A2A format + answer_text = shared_data.get("answer", "No answer.") + final_status = TaskStatus(state=TaskState.COMPLETED) + final_artifact = Artifact(parts=[TextPart(text=answer_text)]) + + # Store final task details (InMemoryTaskManager helps here) + final_task = await self.update_store( + request.params.id, final_status, [final_artifact] + ) + return SendTaskResponse(id=request.id, result=final_task) + + def _get_user_query(self, task_params) -> str | None: + # Simplified: Extracts text from the A2A message parts + # (Actual code in common/types.py & task_manager.py is more robust) + if task_params.message and task_params.message.parts: + for part in task_params.message.parts: + if part.type == "text": # Assuming part is a Pydantic model + return part.text + return None +``` +**Explanation:** +* `on_send_task`: This method is called when the `A2AServer` receives a `tasks/send` request. +* It extracts the user's question from the A2A request's `message.parts` (using `_get_user_query`). +* It prepares the [shared dictionary](01_shared_state___shared__dictionary__.md) (`shared_data`) for your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow___.md). +* It runs your `agent_flow` with this `shared_data`. The `agent_flow` does its work and puts the answer back into `shared_data["answer"]`. +* It retrieves the answer from `shared_data` and packages it into an A2A `Artifact` with a `TextPart`. +* It updates the task's status to `COMPLETED` and returns an A2A `SendTaskResponse` containing the final `Task` object (which includes the answer artifact). + +**Step 2: Set Up the `A2AServer`** + +Now, we need to host our `PocketFlowTaskManager` using `A2AServer`. This involves defining an `AgentCard` (metadata about your agent) and starting the server. + +A simplified `main` function from `cookbook/pocketflow-a2a/a2a_server.py`: +```python +# In a2a_server.py +from common.server import A2AServer +from common.types import AgentCard, AgentCapabilities, AgentSkill # For metadata +from task_manager import PocketFlowTaskManager # Your task manager +import os + +def main(host="localhost", port=10003): + # (Error checking for API keys like OPENAI_API_KEY happens here) + + # 1. Define Agent's "Business Card" (AgentCard) + capabilities = AgentCapabilities(streaming=False) # Our agent isn't streaming + skill = AgentSkill( + id="web_research_qa", name="Web Research and Answering", + # ... (more skill details: description, examples) ... + inputModes=["text"], outputModes=["text"] + ) + agent_card = AgentCard( + name="PocketFlow Research Agent (A2A)", + url=f"http://{host}:{port}/", # Where clients connect + # ... (more card details: description, version, skills) ... + capabilities=capabilities, skills=[skill] + ) + + # 2. Initialize TaskManager and Server + task_manager = PocketFlowTaskManager() + server = A2AServer( + agent_card=agent_card, + task_manager=task_manager, + host=host, port=port, + ) + + print(f"Starting PocketFlow A2A server on http://{host}:{port}") + server.start() # This starts the HTTP server (e.g., Uvicorn) + +if __name__ == "__main__": + # This would typically call main() + # For example: main() + pass +``` +**Explanation:** +* `AgentCard`: This provides metadata about your agent (name, URL, capabilities, skills offered). Other A2A systems can fetch this card (from `/.well-known/agent.json`) to learn about your agent. +* We instantiate our `PocketFlowTaskManager`. +* We create an `A2AServer`, giving it the `agent_card`, our `task_manager`, and the `host`/`port` to listen on. +* `server.start()` launches the web server. Now your PocketFlow agent is listening for A2A requests! + +**Step 3: Interact Using an `A2AClient`** + +With the server running, other programs can now "call" your agent. The `A2AClient` helps with this. + +A simplified CLI client from `cookbook/pocketflow-a2a/a2a_client.py`: +```python +# In a2a_client.py +import asyncio +from common.client import A2AClient # The A2A client utility +from common.types import TextPart # To structure our question + +async def run_client(agent_url="http://localhost:10003"): + client = A2AClient(url=agent_url) + + # Get question from user + question_text = input("Enter your question: ") + if not question_text: return + + # 1. Prepare the A2A request payload (matches TaskSendParams) + # This is a simplified representation of the common.types.TaskSendParams + payload = { + "id": "some_unique_task_id", # Each task needs an ID + "message": { + "role": "user", + "parts": [{"type": "text", "text": question_text}], # Our question + }, + "acceptedOutputModes": ["text"], # We want text back + } + + print("Sending task to agent...") + try: + # 2. Send the task to the server + response = await client.send_task(payload) # This makes the HTTP call + + # 3. Process the response + if response.error: + print(f"Error from agent: {response.error.message}") + elif response.result and response.result.artifacts: + # Extract answer from the first text part of the first artifact + answer_part = response.result.artifacts[0].parts[0] + if isinstance(answer_part, TextPart) or answer_part.type == "text": + print(f"Agent Answer: {answer_part.text}") + else: + print("Agent did not return a clear answer.") + + except Exception as e: + print(f"Client error: {e}") + +# To run this: +# if __name__ == "__main__": +# asyncio.run(run_client()) +``` +**Explanation:** +* An `A2AClient` is initialized with the server's URL. +* A `payload` dictionary is created. This structure matches the A2A specification for sending a task (specifically, `TaskSendParams` from `common.types`). Our question is placed in `message.parts` as a `TextPart`. +* `client.send_task(payload)` sends the JSON-RPC request to the `A2AServer`. +* The response (an A2A `Task` object) is processed. The answer is typically found in the `artifacts` of the `Task`. + +**Example Interaction:** +1. You run `a2a_server.py`. It starts listening on `http://localhost:10003`. +2. You run `a2a_client.py`. +3. Client prompts: `Enter your question:` +4. You type: `What is PocketFlow?` +5. Client sends this to the server. +6. Server (via `PocketFlowTaskManager` and your `agent_flow`) processes it. +7. Client receives the response and might print: `Agent Answer: PocketFlow is a minimalist LLM framework...` + +Your PocketFlow agent is now communicating via a standard A2A interface! + +## Under the Hood: The A2A Conversation Flow + +Let's trace a request from client to server and back: + +1. **Client Prepares**: The `A2AClient` takes your input (e.g., a question) and constructs a JSON object according to the A2A spec. This is a JSON-RPC request, often for the method `tasks/send`. + * `A2AClient._send_request` (from `common/client/client.py`) assembles this. It uses `httpx` to make an HTTP POST request to the server's URL, with the JSON-RPC payload. + +2. **Server Receives**: The `A2AServer` (built with Starlette) receives the HTTP POST request. + * `A2AServer._process_request` (from `common/server/server.py`) handles this. It parses the JSON body into an `A2ARequest` Pydantic model (e.g., `SendTaskRequest`). + +3. **Server Routes to TaskManager**: Based on the JSON-RPC method in the request (e.g., `tasks/send`), the `A2AServer` calls the corresponding method on your `TaskManager`. + * E.g., for `tasks/send`, it calls `task_manager.on_send_task(request_model)`. + +4. **TaskManager -> PocketFlow**: Your `PocketFlowTaskManager`'s `on_send_task` method: + * Extracts relevant data (like the question) from the `request_model`. + * Prepares the [shared dictionary](01_shared_state___shared__dictionary__.md) for your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md). + * Calls `your_pocketflow_flow.run(shared)`. + +5. **PocketFlow Executes**: Your PocketFlow [Flow (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) runs its [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode__.md), using and updating the [shared dictionary](01_shared_state___shared__dictionary__.md). The final answer is placed in `shared` (e.g., `shared["answer"]`). + +6. **PocketFlow -> TaskManager**: Control returns to `PocketFlowTaskManager`. It: + * Retrieves the result (e.g., `shared["answer"]`). + * Constructs an A2A `Task` object (from `common.types`), including `Artifacts` containing the answer. + +7. **TaskManager -> Server**: The `TaskManager` returns the populated `Task` object (wrapped in a `JSONRPCResponse` model) to the `A2AServer`. + +8. **Server Responds**: The `A2AServer` serializes the `JSONRPCResponse` (which contains the `Task` with the answer) back into a JSON string. + * It sends this JSON as the body of an HTTP 200 OK response back to the client. + +9. **Client Processes**: The `A2AClient` receives the HTTP response. + * It parses the JSON body into its own Pydantic models (e.g., `SendTaskResponse` containing the `Task`). + * It extracts the answer from the `Task`'s artifacts for you to see. + +Here's a simplified sequence diagram of this interaction: + +```mermaid +sequenceDiagram + participant UserApp as User App (e.g., CLI) + participant Client as A2AClient + participant Server as A2AServer + participant TaskMgr as PocketFlowTaskManager + participant PF_Flow as Your PocketFlow Flow + + UserApp->>Client: User provides question + Client->>Server: HTTP POST / (JSON-RPC: tasks/send {question}) + Server->>TaskMgr: on_send_task(a2a_request_with_question) + TaskMgr->>PF_Flow: flow.run(shared={"question": ...}) + Note over PF_Flow: Flow processes, puts answer in shared + PF_Flow-->>TaskMgr: Returns (shared modified with answer) + TaskMgr->>TaskMgr: Creates A2A Task object with answer + TaskMgr-->>Server: Returns A2A Task object + Server-->>Client: HTTP 200 OK (JSON-RPC response {A2A Task with answer}) + Client->>UserApp: Displays answer from A2A Task +``` + +**Key Code Snippets (Highly Simplified):** + +* **`A2AClient` sending request (from `common/client/client.py`):** + ```python + # Inside A2AClient + async def _send_request(self, request_model: JSONRPCRequest) -> dict: + # request_model is e.g., SendTaskRequest + payload = request_model.model_dump(exclude_none=True) + # self.fetchImpl is an httpx.AsyncClient + http_response = await self.fetchImpl.post(self.url, json=payload) + http_response.raise_for_status() # Check for HTTP errors + return http_response.json() # Return parsed JSON response + ``` + This shows the client converting a Pydantic model to a dictionary (`payload`) and sending it via HTTP. + +* **`A2AServer` processing request (from `common/server/server.py`):** + ```python + # Inside A2AServer + async def _process_request(self, http_request: Request): + raw_body = await http_request.body() + parsed_body = json.loads(raw_body) + # A2ARequest.validate_python converts dict to Pydantic model + a2a_request_model = A2ARequest.validate_python(parsed_body) + + if isinstance(a2a_request_model, SendTaskRequest): + # self.task_manager is your PocketFlowTaskManager + result_model = await self.task_manager.on_send_task(a2a_request_model) + # ... (other request types like GetTaskRequest) ... + + # result_model is e.g., SendTaskResponse + return JSONResponse(result_model.model_dump(exclude_none=True)) + ``` + This shows the server parsing the incoming JSON, converting it to a Pydantic model, and calling the appropriate `TaskManager` method. + +## Conclusion: Your Agent is Now a Global Citizen! + +You've reached the end of our PocketFlow tutorial series! With the **A2A (Agent-to-Agent) Communication Framework**, you've learned how to: +* Understand the roles of `A2AServer`, `A2AClient`, and `TaskManager`. +* Wrap your existing PocketFlow [Flows (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md) with a `TaskManager` to handle A2A requests and responses. +* Host your agent using `A2AServer`, making it accessible via a standard JSON-RPC interface. +* Use `A2AClient` to interact with A2A-compatible agents. + +This framework transforms your PocketFlow agent from a standalone application into a component that can integrate with larger systems and collaborate with other agents, regardless of how they are built internally, as long as they speak the A2A language. + +**Reflecting on Your PocketFlow Journey:** + +Throughout this tutorial, you've explored the core concepts of PocketFlow: +* Managing data with the [Shared State (`shared` dictionary)](01_shared_state___shared__dictionary__.md). +* Building modular tasks with [Nodes (`BaseNode`, `Node`, `AsyncNode`)](02_node___basenode____node____asyncnode___.md). +* Creating dynamic workflows with [Actions / Transitions](03_actions___transitions_.md). +* Orchestrating nodes into powerful [Flows (`Flow`, `AsyncFlow`)](04_flow___flow____asyncflow__.md). +* Handling I/O-bound tasks efficiently with [Asynchronous Processing (`AsyncNode`, `AsyncFlow`)](05_asynchronous_processing___asyncnode____asyncflow___.md). +* Processing collections of data using [Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`)](06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md). +* And finally, enabling standardized inter-agent communication with the A2A framework. + +You now have a solid foundation to build sophisticated, modular, and interoperable AI applications with PocketFlow. The world of intelligent agents awaits your creativity! Happy building! + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file diff --git a/docs/PocketFlow/index.md b/docs/PocketFlow/index.md new file mode 100644 index 00000000..ac672488 --- /dev/null +++ b/docs/PocketFlow/index.md @@ -0,0 +1,61 @@ +# Tutorial: PocketFlow + +PocketFlow is a *Python framework* for building modular workflows and AI agents. +It allows you to define complex processes by connecting individual **Nodes**, which represent *atomic tasks* like calling an LLM or searching the web. +A **Flow** then *orchestrates* these Nodes, guiding the execution sequence based on **Actions** (string identifiers) returned by each Node. +Data is passed between Nodes and managed throughout the workflow execution via a **Shared State** (a Python dictionary). +PocketFlow also offers advanced features like **Batch Processing** for efficiently handling collections of items, and **Asynchronous Processing** for non-blocking operations crucial for I/O-bound tasks. +Additionally, it demonstrates an **A2A (Agent-to-Agent) Communication Framework** to wrap PocketFlow agents, enabling them to communicate with other systems using a standardized JSON-RPC protocol. + + +**Source Repository:** [https://github.com/The-Pocket/PocketFlow](https://github.com/The-Pocket/PocketFlow) + +```mermaid +flowchart TD + A0["Node (`BaseNode`, `Node`, `AsyncNode`) +"] + A1["Flow (`Flow`, `AsyncFlow`) +"] + A2["Shared State (`shared` dictionary) +"] + A3["Actions / Transitions +"] + A4["Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`) +"] + A5["Asynchronous Processing (`AsyncNode`, `AsyncFlow`) +"] + A6["A2A (Agent-to-Agent) Communication Framework +"] + A1 -- "Orchestrates Nodes" --> A0 + A0 -- "Accesses Shared State" --> A2 + A0 -- "Returns Action" --> A3 + A1 -- "Uses Action for dispatch" --> A3 + A4 -- "Specializes Node (batch)" --> A0 + A4 -- "Specializes Flow (batch)" --> A1 + A5 -- "Specializes Node (async)" --> A0 + A5 -- "Specializes Flow (async)" --> A1 + A6 -- "Executes Flow" --> A1 + A6 -- "Initializes Shared State" --> A2 +``` + +## Chapters + +1. [Shared State (`shared` dictionary) +](01_shared_state___shared__dictionary__.md) +2. [Node (`BaseNode`, `Node`, `AsyncNode`) +](02_node___basenode____node____asyncnode___.md) +3. [Actions / Transitions +](03_actions___transitions_.md) +4. [Flow (`Flow`, `AsyncFlow`) +](04_flow___flow____asyncflow___.md) +5. [Asynchronous Processing (`AsyncNode`, `AsyncFlow`) +](05_asynchronous_processing___asyncnode____asyncflow___.md) +6. [Batch Processing (`BatchNode`, `BatchFlow`, `AsyncParallelBatchNode`) +](06_batch_processing___batchnode____batchflow____asyncparallelbatchnode___.md) +7. [A2A (Agent-to-Agent) Communication Framework +](07_a2a__agent_to_agent__communication_framework_.md) + + +--- + +Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) \ No newline at end of file