feat: Add conversation variable persistence layer #11
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
WalkthroughThis PR refactors conversation variable persistence from node-level database updates to an event-driven persistence layer. It moves Changes
Sequence DiagramsequenceDiagram
participant VA as VariableAssigner<br/>Node
participant Graph as Workflow Graph
participant Layer as Conversation<br/>VariablePersistence<br/>Layer
participant Updater as Conversation<br/>VariableUpdater
participant DB as Database
VA->>Graph: emit NodeRunSucceededEvent<br/>(with process_data)
Graph->>Layer: on_event(NodeRunSucceededEvent)
activate Layer
Layer->>Layer: extract conversation_id<br/>from system_variables
Layer->>Layer: validate selector<br/>refers to conversation var
Layer->>Graph: variable_pool.get(selector)
Graph-->>Layer: Variable instance
Layer->>Updater: update(conversation_id,<br/>variable)
deactivate Layer
Note over Updater: enqueue update
Graph->>Layer: on_graph_end()
activate Layer
Layer->>Updater: flush()
activate Updater
loop for each enqueued update
Updater->>DB: query ConversationVariable<br/>by id and conversation_id
DB-->>Updater: result
Updater->>DB: update row data
DB-->>Updater: commit
end
deactivate Updater
deactivate Layer
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py`:
- Line 91: The code calls common_helpers.variable_to_processed_data with
original_variable instead of the newly updated variable, causing stale values;
update the call in the v1 flow to pass the updated variable (updated_variable)
or retrieve the variable from the pool after the write (as v2 does) before
calling variable_to_processed_data; locate the creation of updated_variable (and
the updated_variables list) and replace original_variable with updated_variable
(or pool.fetch the variable and pass that result) so variable_to_processed_data
receives the up-to-date value.
In `@api/services/conversation_variable_updater.py`:
- Around line 13-31: The flush method in ConversationVariableUpdaterImpl replays
prior updates because it never clears self._pending_updates; modify the flush
method to clear or reset self._pending_updates (e.g., assign [] or call clear())
after successfully iterating/committing updates (and preferably in a finally
block to ensure clearing even on errors) so repeated calls to flush do not
duplicate previous writes.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
api/.importlinterapi/core/app/apps/advanced_chat/app_runner.pyapi/core/app/layers/conversation_variable_persist_layer.pyapi/core/workflow/nodes/node_factory.pyapi/core/workflow/nodes/variable_assigner/common/impl.pyapi/core/workflow/nodes/variable_assigner/v1/node.pyapi/core/workflow/nodes/variable_assigner/v2/node.pyapi/core/workflow/runtime/graph_runtime_state_protocol.pyapi/core/workflow/runtime/read_only_wrappers.pyapi/services/conversation_service.pyapi/services/conversation_variable_updater.pyapi/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.pyapi/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py
💤 Files with no reviewable changes (3)
- api/core/workflow/nodes/node_factory.py
- api/core/workflow/nodes/variable_assigner/common/impl.py
- api/.importlinter
🧰 Additional context used
🧬 Code graph analysis (9)
api/core/app/apps/advanced_chat/app_runner.py (3)
api/core/app/layers/conversation_variable_persist_layer.py (1)
ConversationVariablePersistenceLayer(14-56)api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(33-34)api/core/workflow/graph_engine/graph_engine.py (1)
layer(215-218)
api/core/workflow/runtime/graph_runtime_state_protocol.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/read_only_wrappers.py (1)
get(21-24)api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
get(70-78)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (2)
api/core/workflow/nodes/node_factory.py (2)
DifyNodeFactory(29-130)create_node(66-130)api/core/workflow/nodes/variable_assigner/v2/node.py (1)
VariableAssignerNode(53-248)
api/core/workflow/nodes/variable_assigner/v2/node.py (4)
api/core/workflow/enums.py (1)
NodeType(36-85)api/core/workflow/nodes/base/node.py (2)
Node(61-721)graph_init_params(244-245)api/core/workflow/runtime/graph_runtime_state.py (1)
GraphRuntimeState(132-478)api/tests/unit_tests/core/workflow/nodes/llm/test_node.py (2)
graph_init_params(73-83)graph_runtime_state(87-95)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (6)
api/core/app/layers/conversation_variable_persist_layer.py (1)
on_event(22-53)api/core/workflow/enums.py (1)
NodeType(36-85)api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/runtime/graph_runtime_state_protocol.py (8)
ReadOnlyGraphRuntimeState(25-83)system_variable(35-35)get(12-14)get_all_by_node(16-18)get_by_prefix(20-22)variable_pool(38-40)outputs(58-60)start_at(43-45)api/core/workflow/runtime/read_only_wrappers.py (7)
system_variable(47-48)get(21-24)get_all_by_node(26-32)get_by_prefix(34-36)variable_pool(51-52)outputs(67-68)start_at(55-56)api/core/workflow/system_variable.py (3)
SystemVariable(11-118)conversation_id(159-160)as_view(117-118)
api/services/conversation_service.py (1)
api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(33-34)
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/core/workflow/runtime/read_only_wrappers.py (1)
get(21-24)
api/core/workflow/runtime/read_only_wrappers.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
get(70-78)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (2)
api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/nodes/base/node.py (1)
run(267-349)
🔇 Additional comments (24)
api/core/workflow/nodes/variable_assigner/v2/node.py (3)
25-28: LGTM! Proper use of TYPE_CHECKING for forward references.Using
TYPE_CHECKINGto importGraphInitParamsandGraphRuntimeStateavoids circular imports at runtime while maintaining type safety for static analysis.
56-68: LGTM! Constructor properly delegates to base class.The explicit
__init__correctly passes all required parameters to the parentNodeclass, aligning with the base class signature per the relevant context.
108-211: LGTM! Clean separation of validation and execution logic.The
_runmethod properly:
- Validates variables before modification
- Updates the variable pool after successful operations
- Handles deduplication of updated selectors (line 191)
- Constructs appropriate
NodeRunResultwith process_data for downstream consumersThe removal of direct
ConversationVariableUpdaterintegration aligns with the PR's shift to event-driven persistence viaConversationVariablePersistenceLayer.api/core/workflow/nodes/variable_assigner/v1/node.py (1)
22-34: LGTM! Constructor aligned with v2 pattern.The constructor properly delegates to the base
Nodeclass with all required parameters.api/core/app/apps/advanced_chat/app_runner.py (3)
23-23: LGTM! New persistence layer import.The import correctly references the new
ConversationVariablePersistenceLayerfrom the layers module.
44-44: LGTM! Factory import from services module.The import path correctly references the relocated
conversation_variable_updater_factoryin the services module.
204-206: LGTM! Conversation variable persistence layer integration.The
ConversationVariablePersistenceLayeris correctly instantiated with a fresh updater from the factory and added to the graph engine before the generalWorkflowPersistenceLayer. This ensures conversation variable updates are captured and persisted during workflow execution via the event-driven pattern.api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (1)
395-431: LGTM! Good test coverage for node factory integration.The test verifies that
DifyNodeFactorycorrectly creates aVariableAssignerNodeinstance from a configuration withversion: "2". This ensures the factory-to-node wiring works correctly after the constructor changes.api/services/conversation_service.py (1)
20-20: LGTM! Import path updated to reflect module relocation.The import correctly references the new location of
conversation_variable_updater_factoryin the services module, consistent with the PR's refactoring to move the updater implementation out ofcore.workflow.api/core/workflow/runtime/read_only_wrappers.py (2)
3-3: LGTM! Added Sequence import for new selector type.The import supports the updated method signature using
Sequence[str]for selectors.
21-24: LGTM! Signature updated to match protocol.The
getmethod signature now accepts aSequence[str]selector, aligning with theReadOnlyVariablePoolprotocol defined ingraph_runtime_state_protocol.py. The positional-only parameter (/) enforces the calling convention and thedeepcopypreserves read-only semantics.api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
12-12: Selector-based accessor signature looks consistent.The updated signature matches the new selector-based access pattern.
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (7)
18-31: Mock variable pool selector handling is aligned.The selector guard and tuple lookup mirror the new pool access pattern.
34-41: Graph runtime state helper is clean and focused.Simple setup for variable pool + system variable is sufficient.
44-60: Event builder is straightforward and reusable.Encapsulates node run result creation cleanly.
63-85: Covers the happy path clearly.Assertions validate both update and flush behavior as expected.
88-108: Good negative case for missing outputs.Skips are verified without updater interaction.
110-119: Non-assigner nodes are correctly ignored.Test aligns with layer filtering.
122-144: Non-conversation selector case is validated.Ensures environment-scoped variables are skipped.
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
70-78: Selector-based get implementation is consistent.Guard + tuple access aligns with new ReadOnlyVariablePool.get contract.
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (3)
106-111: Event-driven assertions look solid.The test now validates updated variable payloads from the node event.
202-207: Append flow assertions are correct.Validates updated variable data via the event process_data.
287-292: Clear flow assertions are correct.Event-driven update payload check matches the new behavior.
api/core/app/layers/conversation_variable_persist_layer.py (1)
22-53: No issue—the pool guaranteesVariableinstances despite the broad type annotation.The
VariablePool.get()method is annotated to returnSegment | None, but its implementation always stores and returnsVariableinstances. Theadd()method converts any input (Variable, Segment, or raw value) to aVariablebefore storage usingsegment_to_variable(), and the pool's type isVariableUnion, which is a union of all Variable subclasses. Theisinstance(variable, Variable)check in the layer will always pass for any non-None result, so persistence will not be silently skipped. The type annotation is conservatively broad compared to the runtime guarantee, but this is not a functional defect.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| conv_var_updater.flush() | ||
| updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)] | ||
|
|
||
| updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, original_variable)] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "node.py" | grep "variable_assigner" | head -20Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 192
🏁 Script executed:
git ls-files | grep -E "variable_assigner.*node\.py"Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 188
🏁 Script executed:
sed -n '80,100p' api/core/workflow/nodes/variable_assigner/v1/node.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1329
🏁 Script executed:
sed -n '190,210p' api/core/workflow/nodes/variable_assigner/v2/node.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1049
🏁 Script executed:
rg -n "def variable_to_processed_data" --type pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 235
🏁 Script executed:
rg -B3 -A10 "def variable_to_processed_data" --type pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1342
🏁 Script executed:
sed -n '20,40p' api/core/workflow/nodes/variable_assigner/common/helpers.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 635
Pass updated_variable to variable_to_processed_data instead of original_variable.
Line 91 passes original_variable to variable_to_processed_data, but updated_variable (created on line 89) contains the new value that was just written to the pool. Since variable_to_processed_data extracts the .value property to populate new_value in the returned UpdatedVariable, using original_variable causes stale data to appear in process_data.
The v2 implementation (lines 194-197) correctly fetches the updated variable from the pool after the update before passing it to variable_to_processed_data. Either pass updated_variable directly or fetch from the pool to match v2's approach.
🤖 Prompt for AI Agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py` at line 91, The code
calls common_helpers.variable_to_processed_data with original_variable instead
of the newly updated variable, causing stale values; update the call in the v1
flow to pass the updated variable (updated_variable) or retrieve the variable
from the pool after the write (as v2 does) before calling
variable_to_processed_data; locate the creation of updated_variable (and the
updated_variables list) and replace original_variable with updated_variable (or
pool.fetch the variable and pass that result) so variable_to_processed_data
receives the up-to-date value.
| class ConversationVariableUpdaterImpl: | ||
| def __init__(self) -> None: | ||
| self._pending_updates: list[tuple[str, Variable]] = [] | ||
|
|
||
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| self._pending_updates.append((conversation_id, variable)) | ||
|
|
||
| def flush(self) -> None: | ||
| for conversation_id, variable in self._pending_updates: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
There was a problem hiding this comment.
Clear pending updates after flush to avoid duplicate writes.
_pending_updates is never cleared, so every flush replays prior updates; with per-update flushes this can cause repeated DB writes.
🛠️ Proposed fix
def flush(self) -> None:
for conversation_id, variable in self._pending_updates:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()
+ self._pending_updates.clear()🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 13 - 31, The
flush method in ConversationVariableUpdaterImpl replays prior updates because it
never clears self._pending_updates; modify the flush method to clear or reset
self._pending_updates (e.g., assign [] or call clear()) after successfully
iterating/committing updates (and preferably in a finally block to ensure
clearing even on errors) so repeated calls to flush do not duplicate previous
writes.
Benchmark PR from qodo-benchmark#167
Summary by CodeRabbit
New Features
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.