Skip to content

Commit abcc987

Browse files
JPPhotolstein
andauthored
Rework graph.py (#8642)
* Rework graph, add documentation * Minor fixes to README.md * Updated schema * Fixed test to match behavior - all nodes executed, parents before children * Update invokeai/app/services/shared/graph.py Cleaned up code Co-authored-by: Lincoln Stein <lincoln.stein@gmail.com> * Change silent corrections to enforcing invariants --------- Co-authored-by: Lincoln Stein <lincoln.stein@gmail.com>
1 parent 36e400d commit abcc987

File tree

4 files changed

+367
-105
lines changed

4 files changed

+367
-105
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# InvokeAI Graph - Design Overview
2+
3+
High-level design for the graph module. Focuses on responsibilities, data flow, and how traversal works.
4+
5+
## 1) Purpose
6+
7+
Provide a typed, acyclic workflow model (**Graph**) plus a runtime scheduler (**GraphExecutionState**) that expands
8+
iterator patterns, tracks readiness via indegree (the number of incoming edges to a node in the directed graph), and
9+
executes nodes in class-grouped batches. Source graphs remain immutable during a run; runtime expansion happens in a
10+
separate execution graph.
11+
12+
## 2) Major Data Types
13+
14+
### EdgeConnection
15+
16+
* Fields: `node_id: str`, `field: str`.
17+
* Hashable; printed as `node.field` for readable diagnostics.
18+
19+
### Edge
20+
21+
* Fields: `source: EdgeConnection`, `destination: EdgeConnection`.
22+
* One directed connection from a specific output port to a specific input port.
23+
24+
### AnyInvocation / AnyInvocationOutput
25+
26+
* Pydantic wrappers that carry concrete invocation models and outputs.
27+
* No registry logic in this file; they are permissive containers for heterogeneous nodes.
28+
29+
### IterateInvocation / CollectInvocation
30+
31+
* Control nodes used by validation and execution:
32+
33+
* **IterateInvocation**: input `collection`, outputs include `item` (and index/total).
34+
* **CollectInvocation**: many `item` inputs aggregated to one `collection` output.
35+
36+
## 3) Graph (author-time model)
37+
38+
A container for declared nodes and edges. Does **not** perform iteration expansion.
39+
40+
### 3.1 Data
41+
42+
* `nodes: dict[str, AnyInvocation]` - key must equal `node.id`.
43+
* `edges: list[Edge]` - zero or more.
44+
* Utility: `_get_input_edges(node_id, field?)`, `_get_output_edges(node_id, field?)`
45+
These scan `self.edges` (no adjacency indices in the current code).
46+
47+
### 3.2 Validation (`validate_self`)
48+
49+
Runs a sequence of checks:
50+
51+
1. **Node ID uniqueness**
52+
No duplicate IDs; map key equals `node.id`.
53+
2. **Endpoint existence**
54+
Source and destination node IDs must exist.
55+
3. **Port existence**
56+
Input ports must exist on the node class; output ports on the node's output model.
57+
4. **Type compatibility**
58+
`get_output_field_type` vs `get_input_field_type` and `are_connection_types_compatible`.
59+
5. **DAG constraint**
60+
Build a *flat* `DiGraph` (no runtime expansion) and assert acyclicity.
61+
6. **Iterator / collector structure**
62+
Enforce special rules:
63+
64+
* Iterator's input must be `collection`; its outgoing edges use `item`.
65+
* Collector accepts many `item` inputs; outputs a single `collection`.
66+
* Edge fan-in to a non-collector input is rejected.
67+
68+
### 3.3 Edge admission (`_validate_edge`)
69+
70+
Checks a single prospective edge before insertion:
71+
72+
* Endpoints/ports exist.
73+
* Destination port is not already occupied unless it's a collector `item`.
74+
* Adding the edge to the flat DAG must keep it acyclic.
75+
* Iterator/collector constraints re-checked when the edge creates relevant patterns.
76+
77+
### 3.4 Topology utilities
78+
79+
* `nx_graph()` - DiGraph of declared nodes and edges.
80+
* `nx_graph_with_data()` - includes node/edge attributes.
81+
* `nx_graph_flat()` - "flattened" DAG (still author-time; no runtime copies).
82+
Used in validation and in `_prepare()` during execution planning.
83+
84+
### 3.5 Mutation helpers
85+
86+
* `add_node`, `update_node` (preserve edges, rewrite endpoints if id changes), `delete_node`.
87+
* `add_edge`, `delete_edge` (with validation).
88+
89+
## 4) GraphExecutionState (runtime)
90+
91+
Holds the state for a single run. Keeps the source graph intact; materializes a separate execution graph.
92+
93+
### 4.1 Data
94+
95+
* `graph: Graph` - immutable source during a run.
96+
* `execution_graph: Graph` - materialized runtime nodes/edges.
97+
* `executed: set[str]`, `executed_history: list[str]`.
98+
* `results: dict[str, AnyInvocationOutput]`, `errors: dict[str, str]`.
99+
* `prepared_source_mapping: dict[str, str]` - exec id → source id.
100+
* `source_prepared_mapping: dict[str, set[str]]` - source id → exec ids.
101+
* `indegree: dict[str, int]` - unmet inputs per exec node.
102+
* **Ready queues grouped by class** (private attrs):
103+
`_ready_queues: dict[class_name, deque[str]]`, `_active_class: Optional[str]`. Optional `ready_order: list[str]` to
104+
prioritize classes.
105+
106+
### 4.2 Core methods
107+
108+
* `next()`
109+
Returns the next ready exec node. If none, calls `_prepare()` to materialize more, then retries. Before returning a
110+
node, `_prepare_inputs()` deep-copies inbound values into the node fields.
111+
* `complete(node_id, output)`
112+
Record result; mark exec node executed; if all exec copies of the same **source** are done, mark the source executed.
113+
For each outgoing exec edge, decrement child indegree and enqueue when it reaches zero.
114+
115+
### 4.3 Preparation (`_prepare()`)
116+
117+
* Build a flat DAG from the **source** graph.
118+
* Choose the **next source node** in topological order that:
119+
120+
1. has not been prepared,
121+
2. if it is an iterator, *its inputs are already executed*,
122+
3. it has *no unexecuted iterator ancestors*.
123+
* If the node is a **CollectInvocation**: collapse all prepared parents into one mapping and create **one** exec node.
124+
* Otherwise: compute all combinations of prepared iterator ancestors. For each combination, pick the matching prepared parent per upstream and create **one** exec node.
125+
* For each new exec node:
126+
127+
* Deep-copy the source node; assign a fresh ID (and `index` for iterators).
128+
* Wire edges from chosen prepared parents.
129+
* Set `indegree = number of unmet inputs` (i.e., parents not yet executed).
130+
* If `indegree == 0`, enqueue into its class queue.
131+
132+
### 4.4 Readiness and batching
133+
134+
* `_enqueue_if_ready(nid)` enqueues by class name only when `indegree == 0` and not executed.
135+
* `_get_next_node()` drains the `_active_class` queue FIFO; when empty, selects the next nonempty class queue (by `ready_order` if set, else alphabetical), and continues. Optional fairness knobs can limit batch size per class; default is drain fully.
136+
137+
#### 4.4.1 Indegree (what it is and how it's used)
138+
139+
**Indegree** is the number of incoming edges to a node in the execution graph that are still unmet. In this engine:
140+
* For every materialized exec node, `indegree[node]` equals the count of its prerequisite parents that have **not** finished yet.
141+
* A node is "ready" exactly when `indegree[node] == 0`; only then is it enqueued.
142+
* When a node completes, the scheduler decrements `indegree[child]` for each outgoing edge. Any child that reaches 0 is enqueued.
143+
144+
Example: edges `A→C`, `B→C`, `C→D`. Start: `A:0, B:0, C:2, D:1`. Run `A``C:1`. Run `B``C:0` → enqueue `C`. Run `C`
145+
`D:0` → enqueue `D`. Run `D` → done.
146+
147+
### 4.5 Input hydration (`_prepare_inputs()`)
148+
149+
* For **CollectInvocation**: gather all incoming `item` values into `collection`.
150+
* For all others: deep-copy each incoming edge's value into the destination field.
151+
This prevents cross-node mutation through shared references.
152+
153+
## 5) Traversal Summary
154+
155+
1. Author builds a valid **Graph**.
156+
2. Create **GraphExecutionState** with that graph.
157+
3. Loop:
158+
159+
* `node = state.next()` → may trigger `_prepare()` expansion.
160+
* Execute node externally → `output`.
161+
* `state.complete(node.id, output)` → updates indegrees and queues.
162+
4. Finish when `next()` returns `None`.
163+
164+
The source graph is never mutated; all expansion occurs in `execution_graph` with traceability back to source nodes.
165+
166+
## 6) Invariants
167+
168+
* Source **Graph** remains a DAG and type-consistent.
169+
* `execution_graph` remains a DAG.
170+
* Nodes are enqueued only when `indegree == 0`.
171+
* `results` and `errors` are keyed by **exec node id**.
172+
* Collectors only aggregate `item` inputs; other inputs behave one-to-one.
173+
174+
## 7) Extensibility
175+
176+
* **New node types**: implement as Pydantic models with typed fields and outputs. Register per your invocation system; this file accepts them as `AnyInvocation`.
177+
* **Scheduling policy**: adjust `ready_order` to batch by class; add a batch cap for fairness without changing complexity.
178+
* **Dynamic behaviors** (future): can be added in `GraphExecutionState` by creating exec nodes and edges at `complete()` time, as long as the DAG invariant holds.
179+
180+
## 8) Error Model (selected)
181+
182+
* `DuplicateNodeIdError`, `NodeAlreadyInGraphError`
183+
* `NodeNotFoundError`, `NodeFieldNotFoundError`
184+
* `InvalidEdgeError`, `CyclicalGraphError`
185+
* `NodeInputError` (raised when preparing inputs for execution)
186+
187+
Messages favor short, precise diagnostics (node id, field, and failing condition).
188+
189+
## 9) Rationale
190+
191+
* **Two-graph approach** isolates authoring from execution expansion and keeps validation simple.
192+
* **Indegree + queues** gives O(1) scheduling decisions with clear batching semantics.
193+
* **Iterator/collector separation** keeps fan-out/fan-in explicit and testable.
194+
* **Deep-copy hydration** avoids incidental aliasing bugs between nodes.

0 commit comments

Comments
 (0)