diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index 0b707dacd2..ec69512e70 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -86,6 +86,27 @@ export class EdgeManager { this.deactivatedEdges.clear() } + /** + * Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration). + * This ensures error/success edges can be re-evaluated on each iteration. + */ + clearDeactivatedEdgesForNodes(nodeIds: Set): void { + const edgesToRemove: string[] = [] + for (const edgeKey of this.deactivatedEdges) { + // Edge key format is "sourceId-targetId-handle" + // Check if either source or target is in the nodeIds set + for (const nodeId of nodeIds) { + if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) { + edgesToRemove.push(edgeKey) + break + } + } + } + for (const edgeKey of edgesToRemove) { + this.deactivatedEdges.delete(edgeKey) + } + } + private shouldActivateEdge(edge: DAGEdge, output: NormalizedBlockOutput): boolean { const handle = edge.sourceHandle @@ -180,7 +201,7 @@ export class EdgeManager { const sourceNode = this.dag.nodes.get(sourceId) if (!sourceNode) continue - for (const [_, edge] of sourceNode.outgoingEdges) { + for (const [, edge] of sourceNode.outgoingEdges) { if (edge.target === node.id) { const edgeKey = this.createEdgeKey(sourceId, edge.target, edge.sourceHandle) if (!this.deactivatedEdges.has(edgeKey)) { diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 68c471d8cb..10eb0114c0 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -279,6 +279,14 @@ export class ExecutionEngine { }) this.addMultipleToQueue(readyNodes) + + // Check for dynamically added nodes (e.g., from parallel expansion) + if (this.context.pendingDynamicNodes && this.context.pendingDynamicNodes.length > 0) { + const dynamicNodes = this.context.pendingDynamicNodes + this.context.pendingDynamicNodes = [] + logger.info('Adding dynamically expanded parallel nodes', { dynamicNodes }) + this.addMultipleToQueue(dynamicNodes) + } } private buildPausedResult(startTime: number): ExecutionResult { diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index fad1ffb1ad..8a2bf4fc9a 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -64,9 +64,11 @@ export class DAGExecutor { const resolver = new VariableResolver(this.workflow, this.workflowVariables, state) const loopOrchestrator = new LoopOrchestrator(dag, state, resolver) const parallelOrchestrator = new ParallelOrchestrator(dag, state) + parallelOrchestrator.setResolver(resolver) const allHandlers = createBlockHandlers() const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state) const edgeManager = new EdgeManager(dag) + loopOrchestrator.setEdgeManager(edgeManager) const nodeOrchestrator = new NodeExecutionOrchestrator( dag, state, diff --git a/apps/sim/executor/execution/state.ts b/apps/sim/executor/execution/state.ts index f5c608af69..e3737d5b3b 100644 --- a/apps/sim/executor/execution/state.ts +++ b/apps/sim/executor/execution/state.ts @@ -22,6 +22,7 @@ export interface ParallelScope { branchOutputs: Map completedCount: number totalExpectedNodes: number + items?: any[] } export class ExecutionState implements BlockStateController { diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 3796786626..2e3d6b81e4 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -1,6 +1,7 @@ import { createLogger } from '@/lib/logs/console/logger' import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' +import type { EdgeManager } from '@/executor/execution/edge-manager' import type { LoopScope } from '@/executor/execution/state' import type { BlockStateController } from '@/executor/execution/types' import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types' @@ -26,12 +27,18 @@ export interface LoopContinuationResult { } export class LoopOrchestrator { + private edgeManager: EdgeManager | null = null + constructor( private dag: DAG, private state: BlockStateController, private resolver: VariableResolver ) {} + setEdgeManager(edgeManager: EdgeManager): void { + this.edgeManager = edgeManager + } + initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope { const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined if (!loopConfig) { @@ -216,7 +223,11 @@ export class LoopOrchestrator { const loopNodes = loopConfig.nodes const allLoopNodeIds = new Set([sentinelStartId, sentinelEndId, ...loopNodes]) - let restoredCount = 0 + // Clear deactivated edges for loop nodes so error/success edges can be re-evaluated + if (this.edgeManager) { + this.edgeManager.clearDeactivatedEdgesForNodes(allLoopNodeIds) + } + for (const nodeId of allLoopNodeIds) { const nodeToRestore = this.dag.nodes.get(nodeId) if (!nodeToRestore) continue @@ -224,7 +235,7 @@ export class LoopOrchestrator { for (const [potentialSourceId, potentialSourceNode] of this.dag.nodes) { if (!allLoopNodeIds.has(potentialSourceId)) continue - for (const [_, edge] of potentialSourceNode.outgoingEdges) { + for (const [, edge] of potentialSourceNode.outgoingEdges) { if (edge.target === nodeId) { const isBackwardEdge = edge.sourceHandle === EDGE.LOOP_CONTINUE || @@ -232,7 +243,6 @@ export class LoopOrchestrator { if (!isBackwardEdge) { nodeToRestore.incomingEdges.add(potentialSourceId) - restoredCount++ } } } diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index c3e50e9570..2157807f3f 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -53,6 +53,20 @@ export class NodeExecutionOrchestrator { } } + // Initialize parallel scope BEFORE execution so can be resolved + const parallelId = node.metadata.parallelId + if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { + const totalBranches = node.metadata.branchTotal || 1 + const parallelConfig = this.dag.parallelConfigs.get(parallelId) + const nodesInParallel = (parallelConfig as any)?.nodes?.length || 1 + this.parallelOrchestrator.initializeParallelScope( + ctx, + parallelId, + totalBranches, + nodesInParallel + ) + } + if (node.metadata.isSentinel) { const output = this.handleSentinel(ctx, node) const isFinalOutput = node.outgoingEdges.size === 0 diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index d3707523eb..9be1b012cb 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -1,15 +1,17 @@ import { createLogger } from '@/lib/logs/console/logger' -import type { DAG } from '@/executor/dag/builder' +import type { DAG, DAGNode } from '@/executor/dag/builder' import type { ParallelScope } from '@/executor/execution/state' import type { BlockStateWriter } from '@/executor/execution/types' import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types' import type { ParallelConfigWithNodes } from '@/executor/types/parallel' import { + buildBranchNodeId, calculateBranchCount, extractBaseBlockId, extractBranchIndex, parseDistributionItems, } from '@/executor/utils/subflow-utils' +import type { VariableResolver } from '@/executor/variables/resolver' import type { SerializedParallel } from '@/serializer/types' const logger = createLogger('ParallelOrchestrator') @@ -29,31 +31,325 @@ export interface ParallelAggregationResult { } export class ParallelOrchestrator { + private resolver: VariableResolver | null = null + constructor( private dag: DAG, private state: BlockStateWriter ) {} + setResolver(resolver: VariableResolver): void { + this.resolver = resolver + } + initializeParallelScope( ctx: ExecutionContext, parallelId: string, totalBranches: number, terminalNodesCount = 1 ): ParallelScope { + const parallelConfig = this.dag.parallelConfigs.get(parallelId) + const items = parallelConfig ? this.resolveDistributionItems(ctx, parallelConfig) : undefined + + // If we have more items than pre-built branches, expand the DAG + const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches + const scope: ParallelScope = { parallelId, - totalBranches, + totalBranches: actualBranchCount, branchOutputs: new Map(), completedCount: 0, - totalExpectedNodes: totalBranches * terminalNodesCount, + totalExpectedNodes: actualBranchCount * terminalNodesCount, + items, } if (!ctx.parallelExecutions) { ctx.parallelExecutions = new Map() } ctx.parallelExecutions.set(parallelId, scope) + + // Dynamically expand DAG if needed + if (items && items.length > totalBranches && parallelConfig) { + logger.info('Dynamically expanding parallel branches', { + parallelId, + existingBranches: totalBranches, + targetBranches: items.length, + itemsCount: items.length, + }) + + const newEntryNodes = this.expandParallelBranches( + parallelId, + parallelConfig, + totalBranches, + items.length + ) + + logger.info('Parallel expansion complete', { + parallelId, + newEntryNodes, + totalNodesInDag: this.dag.nodes.size, + }) + + // Add new entry nodes to pending dynamic nodes so the engine can schedule them + if (newEntryNodes.length > 0) { + if (!ctx.pendingDynamicNodes) { + ctx.pendingDynamicNodes = [] + } + ctx.pendingDynamicNodes.push(...newEntryNodes) + } + } else { + logger.info('No parallel expansion needed', { + parallelId, + itemsLength: items?.length, + totalBranches, + hasParallelConfig: !!parallelConfig, + }) + } + return scope } + /** + * Dynamically expand the DAG to include additional branch nodes when + * the resolved item count exceeds the pre-built branch count. + */ + private expandParallelBranches( + parallelId: string, + config: SerializedParallel, + existingBranchCount: number, + targetBranchCount: number + ): string[] { + // Get all blocks that are part of this parallel + const blocksInParallel = config.nodes + const blocksInParallelSet = new Set(blocksInParallel) + + // Step 1: Create all new nodes first + for (const blockId of blocksInParallel) { + const branch0NodeId = buildBranchNodeId(blockId, 0) + const templateNode = this.dag.nodes.get(branch0NodeId) + + if (!templateNode) { + logger.warn('Template node not found for parallel expansion', { blockId, branch0NodeId }) + continue + } + + for (let branchIndex = existingBranchCount; branchIndex < targetBranchCount; branchIndex++) { + const newNodeId = buildBranchNodeId(blockId, branchIndex) + + const newNode: DAGNode = { + id: newNodeId, + block: { + ...templateNode.block, + id: newNodeId, + }, + incomingEdges: new Set(), + outgoingEdges: new Map(), + metadata: { + ...templateNode.metadata, + branchIndex, + branchTotal: targetBranchCount, + originalBlockId: blockId, + }, + } + + this.dag.nodes.set(newNodeId, newNode) + } + } + + // Step 2: Wire edges between the new branch nodes + this.wireExpandedBranchEdges( + parallelId, + blocksInParallel, + existingBranchCount, + targetBranchCount + ) + + // Step 3: Update metadata on existing nodes to reflect new total + this.updateExistingBranchMetadata(blocksInParallel, existingBranchCount, targetBranchCount) + + // Step 4: Identify entry nodes AFTER edges are wired + // Entry nodes are those with no INTERNAL incoming edges (edges from outside parallel don't count) + const newEntryNodes: string[] = [] + for (const blockId of blocksInParallel) { + const branch0NodeId = buildBranchNodeId(blockId, 0) + const templateNode = this.dag.nodes.get(branch0NodeId) + if (!templateNode) continue + + // Check if template has any INTERNAL incoming edges + let hasInternalIncoming = false + for (const incomingId of templateNode.incomingEdges) { + const baseIncomingId = extractBaseBlockId(incomingId) + if (blocksInParallelSet.has(baseIncomingId)) { + hasInternalIncoming = true + break + } + } + + // If no internal incoming edges, the new branches of this block are entry nodes + if (!hasInternalIncoming) { + for ( + let branchIndex = existingBranchCount; + branchIndex < targetBranchCount; + branchIndex++ + ) { + newEntryNodes.push(buildBranchNodeId(blockId, branchIndex)) + } + } + } + + return newEntryNodes + } + + /** + * Wire edges between expanded branch nodes by replicating the edge pattern from branch 0. + * Handles both internal edges (within the parallel) and exit edges (to blocks after the parallel). + */ + private wireExpandedBranchEdges( + parallelId: string, + blocksInParallel: string[], + existingBranchCount: number, + targetBranchCount: number + ): void { + const blocksInParallelSet = new Set(blocksInParallel) + + // For each block, look at branch 0's outgoing edges and replicate for new branches + for (const blockId of blocksInParallel) { + const branch0NodeId = buildBranchNodeId(blockId, 0) + const branch0Node = this.dag.nodes.get(branch0NodeId) + + if (!branch0Node) continue + + // Replicate outgoing edges for each new branch + for (const [, edge] of branch0Node.outgoingEdges) { + // Use edge.target (the actual target node ID), not the Map key which may be a formatted edge ID + const actualTargetNodeId = edge.target + + // Extract the base target block ID + const baseTargetId = extractBaseBlockId(actualTargetNodeId) + + // Check if target is inside or outside the parallel + const isInternalEdge = blocksInParallelSet.has(baseTargetId) + + for ( + let branchIndex = existingBranchCount; + branchIndex < targetBranchCount; + branchIndex++ + ) { + const sourceNodeId = buildBranchNodeId(blockId, branchIndex) + const sourceNode = this.dag.nodes.get(sourceNodeId) + + if (!sourceNode) continue + + if (isInternalEdge) { + // Internal edge: wire to the corresponding branch of the target + const newTargetNodeId = buildBranchNodeId(baseTargetId, branchIndex) + const targetNode = this.dag.nodes.get(newTargetNodeId) + + if (targetNode) { + sourceNode.outgoingEdges.set(newTargetNodeId, { + target: newTargetNodeId, + sourceHandle: edge.sourceHandle, + targetHandle: edge.targetHandle, + }) + targetNode.incomingEdges.add(sourceNodeId) + } + } else { + // Exit edge: wire to the same external target (blocks after the parallel) + // All branches point to the same external node + const externalTargetNode = this.dag.nodes.get(actualTargetNodeId) + + if (externalTargetNode) { + sourceNode.outgoingEdges.set(actualTargetNodeId, { + target: actualTargetNodeId, + sourceHandle: edge.sourceHandle, + targetHandle: edge.targetHandle, + }) + // Add incoming edge from this new branch to the external node + externalTargetNode.incomingEdges.add(sourceNodeId) + } + } + } + } + } + } + + /** + * Update existing branch nodes' metadata to reflect the new total branch count. + */ + private updateExistingBranchMetadata( + blocksInParallel: string[], + existingBranchCount: number, + targetBranchCount: number + ): void { + for (const blockId of blocksInParallel) { + for (let branchIndex = 0; branchIndex < existingBranchCount; branchIndex++) { + const nodeId = buildBranchNodeId(blockId, branchIndex) + const node = this.dag.nodes.get(nodeId) + if (node) { + node.metadata.branchTotal = targetBranchCount + } + } + } + } + + /** + * Resolve distribution items at runtime, handling references like + * This mirrors how LoopOrchestrator.resolveForEachItems works. + */ + private resolveDistributionItems(ctx: ExecutionContext, config: SerializedParallel): any[] { + const rawItems = config.distribution + + if (rawItems === undefined || rawItems === null) { + return [] + } + + // Already an array - return as-is + if (Array.isArray(rawItems)) { + return rawItems + } + + // Object - convert to entries array (consistent with loop forEach behavior) + if (typeof rawItems === 'object') { + return Object.entries(rawItems) + } + + // String handling + if (typeof rawItems === 'string') { + // Resolve references at runtime using the variable resolver + if (rawItems.startsWith('<') && rawItems.endsWith('>') && this.resolver) { + const resolved = this.resolver.resolveSingleReference(ctx, '', rawItems) + if (Array.isArray(resolved)) { + return resolved + } + if (typeof resolved === 'object' && resolved !== null) { + return Object.entries(resolved) + } + logger.warn('Distribution reference did not resolve to array or object', { + rawItems, + resolved, + }) + return [] + } + + // Try to parse as JSON + try { + const normalized = rawItems.replace(/'/g, '"') + const parsed = JSON.parse(normalized) + if (Array.isArray(parsed)) { + return parsed + } + if (typeof parsed === 'object' && parsed !== null) { + return Object.entries(parsed) + } + return [] + } catch (error) { + logger.error('Failed to parse distribution items', { rawItems, error }) + return [] + } + } + + return [] + } + handleParallelBranchCompletion( ctx: ExecutionContext, parallelId: string, diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index dbc46005f5..6c2d270c0a 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -190,6 +190,7 @@ export interface ExecutionContext { completedCount: number totalExpectedNodes: number parallelType?: 'count' | 'collection' + items?: any[] } > @@ -223,6 +224,9 @@ export interface ExecutionContext { // Cancellation support isCancelled?: boolean + + // Dynamically added nodes that need to be scheduled (e.g., from parallel expansion) + pendingDynamicNodes?: string[] } export interface ExecutionResult { diff --git a/apps/sim/executor/variables/resolvers/parallel.ts b/apps/sim/executor/variables/resolvers/parallel.ts index 651b059908..78fca1f9f6 100644 --- a/apps/sim/executor/variables/resolvers/parallel.ts +++ b/apps/sim/executor/variables/resolvers/parallel.ts @@ -49,7 +49,10 @@ export class ParallelResolver implements Resolver { return undefined } - const distributionItems = this.getDistributionItems(parallelConfig) + // First try to get items from the parallel scope (resolved at runtime) + // This is the same pattern as LoopResolver reading from loopScope.items + const parallelScope = context.executionContext.parallelExecutions?.get(parallelId) + const distributionItems = parallelScope?.items ?? this.getDistributionItems(parallelConfig) let value: any switch (property) {