Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion apps/sim/executor/execution/edge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ export class EdgeManager {
this.deactivatedEdges.clear()
}

/**
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
* This ensures error/success edges can be re-evaluated on each iteration.
*/
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
const edgesToRemove: string[] = []
for (const edgeKey of this.deactivatedEdges) {
// Edge key format is "sourceId-targetId-handle"
// Check if either source or target is in the nodeIds set
for (const nodeId of nodeIds) {
if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) {
edgesToRemove.push(edgeKey)
break
}
}
}
for (const edgeKey of edgesToRemove) {
this.deactivatedEdges.delete(edgeKey)
}
}

private shouldActivateEdge(edge: DAGEdge, output: NormalizedBlockOutput): boolean {
const handle = edge.sourceHandle

Expand Down Expand Up @@ -180,7 +201,7 @@ export class EdgeManager {
const sourceNode = this.dag.nodes.get(sourceId)
if (!sourceNode) continue

for (const [_, edge] of sourceNode.outgoingEdges) {
for (const [, edge] of sourceNode.outgoingEdges) {
if (edge.target === node.id) {
const edgeKey = this.createEdgeKey(sourceId, edge.target, edge.sourceHandle)
if (!this.deactivatedEdges.has(edgeKey)) {
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ export class ExecutionEngine {
})

this.addMultipleToQueue(readyNodes)

// Check for dynamically added nodes (e.g., from parallel expansion)
if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) {
const dynamicNodes = this.context.pendingDynamicNodes
this.context.pendingDynamicNodes = []
logger.info('Adding dynamically expanded parallel nodes', { dynamicNodes })
this.addMultipleToQueue(dynamicNodes)
}
}

private buildPausedResult(startTime: number): ExecutionResult {
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ export class DAGExecutor {
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface ParallelScope {
branchOutputs: Map<number, NormalizedBlockOutput[]>
completedCount: number
totalExpectedNodes: number
items?: any[]
}

export class ExecutionState implements BlockStateController {
Expand Down
16 changes: 13 additions & 3 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createLogger } from '@/lib/logs/console/logger'
import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { LoopScope } from '@/executor/execution/state'
import type { BlockStateController } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
Expand All @@ -26,12 +27,18 @@ export interface LoopContinuationResult {
}

export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null

constructor(
private dag: DAG,
private state: BlockStateController,
private resolver: VariableResolver
) {}

setEdgeManager(edgeManager: EdgeManager): void {
this.edgeManager = edgeManager
}

initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
if (!loopConfig) {
Expand Down Expand Up @@ -216,23 +223,26 @@ export class LoopOrchestrator {
const loopNodes = loopConfig.nodes
const allLoopNodeIds = new Set([sentinelStartId, sentinelEndId, ...loopNodes])

let restoredCount = 0
// Clear deactivated edges for loop nodes so error/success edges can be re-evaluated
if (this.edgeManager) {
this.edgeManager.clearDeactivatedEdgesForNodes(allLoopNodeIds)
}

for (const nodeId of allLoopNodeIds) {
const nodeToRestore = this.dag.nodes.get(nodeId)
if (!nodeToRestore) continue

for (const [potentialSourceId, potentialSourceNode] of this.dag.nodes) {
if (!allLoopNodeIds.has(potentialSourceId)) continue

for (const [_, edge] of potentialSourceNode.outgoingEdges) {
for (const [, edge] of potentialSourceNode.outgoingEdges) {
if (edge.target === nodeId) {
const isBackwardEdge =
edge.sourceHandle === EDGE.LOOP_CONTINUE ||
edge.sourceHandle === EDGE.LOOP_CONTINUE_ALT

if (!isBackwardEdge) {
nodeToRestore.incomingEdges.add(potentialSourceId)
restoredCount++
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions apps/sim/executor/orchestrators/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ export class NodeExecutionOrchestrator {
}
}

// Initialize parallel scope BEFORE execution so <parallel.currentItem> can be resolved
const parallelId = node.metadata.parallelId
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const totalBranches = node.metadata.branchTotal || 1
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1
this.parallelOrchestrator.initializeParallelScope(
ctx,
parallelId,
totalBranches,
nodesInParallel
)
}

if (node.metadata.isSentinel) {
const output = this.handleSentinel(ctx, node)
const isFinalOutput = node.outgoingEdges.size === 0
Expand Down
Loading