diff --git a/apps/sim/app/changelog/components/timeline-list.tsx b/apps/sim/app/changelog/components/timeline-list.tsx index ecb306c56f..4a6cb7c96b 100644 --- a/apps/sim/app/changelog/components/timeline-list.tsx +++ b/apps/sim/app/changelog/components/timeline-list.tsx @@ -98,9 +98,40 @@ export default function ChangelogList({ initialEntries }: Props) {
{entries.map((entry) => (
-
-
- {entry.tag} +
+
+
+ {entry.tag} +
+ {entry.contributors && entry.contributors.length > 0 && ( +
+ {entry.contributors.slice(0, 5).map((contributor) => ( + + + + {contributor.slice(0, 2).toUpperCase()} + + + ))} + {entry.contributors.length > 5 && ( +
+ +{entry.contributors.length - 5} +
+ )} +
+ )}
{new Date(entry.date).toLocaleDateString('en-US', { @@ -184,26 +215,6 @@ export default function ChangelogList({ initialEntries }: Props) { {cleanMarkdown(entry.content)}
- - {entry.contributors && entry.contributors.length > 0 && ( -
- {entry.contributors.slice(0, 5).map((contributor) => ( - - - {contributor.slice(0, 2).toUpperCase()} - - ))} - {entry.contributors.length > 5 && ( -
- +{entry.contributors.length - 5} -
- )} -
- )}
))} diff --git a/apps/sim/executor/handlers/loop/loop-handler.test.ts b/apps/sim/executor/handlers/loop/loop-handler.test.ts index c86da68b82..928da4bdcc 100644 --- a/apps/sim/executor/handlers/loop/loop-handler.test.ts +++ b/apps/sim/executor/handlers/loop/loop-handler.test.ts @@ -82,35 +82,29 @@ describe('LoopBlockHandler', () => { it('should initialize loop on first execution', async () => { const result = await handler.execute(mockBlock, {}, mockContext) - // After execution, the counter is incremented for the next iteration expect(mockContext.loopIterations.get('loop-1')).toBe(1) expect(mockContext.activeExecutionPath.has('inner-block')).toBe(true) - // Type guard to check if result has the expected structure if (typeof result === 'object' && result !== null) { const response = result as any - expect(response.currentIteration).toBe(0) // Still shows current iteration as 0 + expect(response.currentIteration).toBe(1) expect(response.maxIterations).toBe(3) expect(response.completed).toBe(false) } }) it('should activate loop-end-source when iterations complete', async () => { - // Set to last iteration - mockContext.loopIterations.set('loop-1', 3) + mockContext.loopIterations.set('loop-1', 4) const result = await handler.execute(mockBlock, {}, mockContext) - // The loop handler no longer marks loops as completed - that's handled by the loop manager expect(mockContext.completedLoops.has('loop-1')).toBe(false) - // The loop handler also doesn't activate end connections anymore expect(mockContext.activeExecutionPath.has('after-loop')).toBe(false) - // But it should not activate the inner block either since we're at max iterations expect(mockContext.activeExecutionPath.has('inner-block')).toBe(false) if (typeof result === 'object' && result !== null) { const response = result as any - expect(response.completed).toBe(false) // Not completed until all blocks execute + expect(response.completed).toBe(false) expect(response.message).toContain('Final iteration') } }) @@ -131,7 +125,7 @@ describe('LoopBlockHandler', () => { if (typeof result === 'object' && result !== null) { const response = result as any expect(response.loopType).toBe('forEach') - expect(response.maxIterations).toBe(3) // Limited by items length + expect(response.maxIterations).toBe(3) } }) @@ -153,28 +147,26 @@ describe('LoopBlockHandler', () => { }) it('should limit forEach loops by collection size, not iterations parameter', async () => { - // This tests the fix for the bug where forEach loops were using the iterations count - // instead of the actual collection size mockContext.workflow!.loops['loop-1'] = { id: 'loop-1', nodes: ['inner-block'], - iterations: 10, // High iteration count + iterations: 10, loopType: 'forEach', - forEachItems: ['a', 'b'], // Only 2 items + forEachItems: ['a', 'b'], } - // First execution let result = await handler.execute(mockBlock, {}, mockContext) expect(mockContext.loopIterations.get('loop-1')).toBe(1) expect(mockContext.loopItems.get('loop-1')).toBe('a') if (typeof result === 'object' && result !== null) { const response = result as any - expect(response.maxIterations).toBe(2) // Should be limited to 2, not 10 + expect(response.maxIterations).toBe(2) expect(response.completed).toBe(false) } - // Second execution + mockContext.loopIterations.set('loop-1', 2) + result = await handler.execute(mockBlock, {}, mockContext) expect(mockContext.loopIterations.get('loop-1')).toBe(2) expect(mockContext.loopItems.get('loop-1')).toBe('b') @@ -184,7 +176,10 @@ describe('LoopBlockHandler', () => { expect(response.completed).toBe(false) } - // Third execution should complete the loop + // Manually increment iteration for third execution (exceeds max) + mockContext.loopIterations.set('loop-1', 3) + + // Third execution should exceed the loop limit result = await handler.execute(mockBlock, {}, mockContext) // The loop handler no longer marks loops as completed - that's handled by the loop manager expect(mockContext.completedLoops.has('loop-1')).toBe(false) @@ -196,7 +191,7 @@ describe('LoopBlockHandler', () => { nodes: ['inner-block'], iterations: 5, loopType: 'forEach', - forEachItems: '', // Empty collection + forEachItems: '', } await expect(handler.execute(mockBlock, {}, mockContext)).rejects.toThrow( @@ -210,7 +205,7 @@ describe('LoopBlockHandler', () => { nodes: ['inner-block'], iterations: 5, loopType: 'forEach', - forEachItems: [], // Empty array + forEachItems: [], } await expect(handler.execute(mockBlock, {}, mockContext)).rejects.toThrow( @@ -223,12 +218,10 @@ describe('LoopBlockHandler', () => { it('should activate children when in active path', async () => { const handlerWithPathTracker = new LoopBlockHandler(undefined, mockPathTracker as any) - // Mock PathTracker to return true (block is in active path) mockPathTracker.isInActivePath.mockReturnValue(true) await handlerWithPathTracker.execute(mockBlock, {}, mockContext) - // Should activate children when in active path expect(mockContext.activeExecutionPath.has('inner-block')).toBe(true) expect(mockPathTracker.isInActivePath).toHaveBeenCalledWith('loop-1', mockContext) }) @@ -236,12 +229,10 @@ describe('LoopBlockHandler', () => { it('should not activate children when not in active path', async () => { const handlerWithPathTracker = new LoopBlockHandler(undefined, mockPathTracker as any) - // Mock PathTracker to return false (block is not in active path) mockPathTracker.isInActivePath.mockReturnValue(false) await handlerWithPathTracker.execute(mockBlock, {}, mockContext) - // Should not activate children when not in active path expect(mockContext.activeExecutionPath.has('inner-block')).toBe(false) expect(mockPathTracker.isInActivePath).toHaveBeenCalledWith('loop-1', mockContext) }) @@ -249,14 +240,12 @@ describe('LoopBlockHandler', () => { it('should handle PathTracker errors gracefully', async () => { const handlerWithPathTracker = new LoopBlockHandler(undefined, mockPathTracker as any) - // Mock PathTracker to throw error mockPathTracker.isInActivePath.mockImplementation(() => { throw new Error('PathTracker error') }) await handlerWithPathTracker.execute(mockBlock, {}, mockContext) - // Should default to activating children when PathTracker fails expect(mockContext.activeExecutionPath.has('inner-block')).toBe(true) }) }) diff --git a/apps/sim/executor/handlers/loop/loop-handler.ts b/apps/sim/executor/handlers/loop/loop-handler.ts index 47f42c0a38..e2df1a5be5 100644 --- a/apps/sim/executor/handlers/loop/loop-handler.ts +++ b/apps/sim/executor/handlers/loop/loop-handler.ts @@ -32,7 +32,6 @@ export class LoopBlockHandler implements BlockHandler { ): Promise { logger.info(`Executing loop block: ${block.id}`) - // Get the loop configuration from the workflow const loop = context.workflow?.loops?.[block.id] if (!loop) { logger.error(`Loop configuration not found for block ${block.id}`, { @@ -43,13 +42,12 @@ export class LoopBlockHandler implements BlockHandler { throw new Error(`Loop configuration not found for block ${block.id}`) } - // Initialize loop iteration if not already done if (!context.loopIterations.has(block.id)) { - context.loopIterations.set(block.id, 0) - logger.info(`Initialized loop ${block.id} with 0 iterations`) + context.loopIterations.set(block.id, 1) + logger.info(`Initialized loop ${block.id} starting at iteration 1`) } - const currentIteration = context.loopIterations.get(block.id) || 0 + const currentIteration = context.loopIterations.get(block.id) || 1 let maxIterations: number let forEachItems: any[] | Record | null = null if (loop.loopType === 'forEach') { @@ -75,7 +73,6 @@ export class LoopBlockHandler implements BlockHandler { ) } - // For forEach, max iterations = items length const itemsLength = Array.isArray(forEachItems) ? forEachItems.length : Object.keys(forEachItems).length @@ -94,12 +91,9 @@ export class LoopBlockHandler implements BlockHandler { `Loop ${block.id} - Current iteration: ${currentIteration}, Max iterations: ${maxIterations}` ) - // Check if we've reached the maximum iterations - if (currentIteration >= maxIterations) { + if (currentIteration > maxIterations) { logger.info(`Loop ${block.id} has reached maximum iterations (${maxIterations})`) - // Don't mark as completed here - let the loop manager handle it after all blocks execute - // Just return that this is the final iteration return { loopId: block.id, currentIteration: currentIteration - 1, // Report the actual last iteration number @@ -110,28 +104,20 @@ export class LoopBlockHandler implements BlockHandler { } as Record } - // For forEach loops, set the current item BEFORE incrementing if (loop.loopType === 'forEach' && forEachItems) { - // Store the full items array for access via context.loopItems.set(`${block.id}_items`, forEachItems) + const arrayIndex = currentIteration - 1 const currentItem = Array.isArray(forEachItems) - ? forEachItems[currentIteration] - : Object.entries(forEachItems)[currentIteration] + ? forEachItems[arrayIndex] + : Object.entries(forEachItems)[arrayIndex] context.loopItems.set(block.id, currentItem) logger.info( - `Loop ${block.id} - Set current item for iteration ${currentIteration}:`, + `Loop ${block.id} - Set current item for iteration ${currentIteration} (index ${arrayIndex}):`, currentItem ) } - // Increment the iteration counter for the NEXT iteration - // This happens AFTER we've set up the current iteration's data - context.loopIterations.set(block.id, currentIteration + 1) - logger.info( - `Loop ${block.id} - Incremented counter for next iteration: ${currentIteration + 1}` - ) - // Use routing strategy to determine if this block requires active path checking const blockType = block.metadata?.id if (Routing.requiresActivePathCheck(blockType || '')) { @@ -141,12 +127,10 @@ export class LoopBlockHandler implements BlockHandler { isInActivePath = this.pathTracker.isInActivePath(block.id, context) } catch (error) { logger.warn(`PathTracker check failed for ${blockType} block ${block.id}:`, error) - // Default to true to maintain existing behavior if PathTracker fails isInActivePath = true } } - // Only activate child nodes if this block is in the active execution path if (isInActivePath) { this.activateChildNodes(block, context, currentIteration) } else { @@ -155,17 +139,18 @@ export class LoopBlockHandler implements BlockHandler { ) } } else { - // Regular blocks always activate their children this.activateChildNodes(block, context, currentIteration) } + context.loopIterations.set(block.id, currentIteration) + return { loopId: block.id, currentIteration, maxIterations, loopType: loop.loopType || 'for', completed: false, - message: `Starting iteration ${currentIteration + 1} of ${maxIterations}`, + message: `Starting iteration ${currentIteration} of ${maxIterations}`, } as Record } @@ -177,7 +162,6 @@ export class LoopBlockHandler implements BlockHandler { context: ExecutionContext, currentIteration: number ): void { - // Loop is still active, activate the loop-start-source connection const loopStartConnections = context.workflow?.connections.filter( (conn) => conn.source === block.id && conn.sourceHandle === 'loop-start-source' diff --git a/apps/sim/executor/index.test.ts b/apps/sim/executor/index.test.ts index 31fd17b629..ea84f4e872 100644 --- a/apps/sim/executor/index.test.ts +++ b/apps/sim/executor/index.test.ts @@ -1451,4 +1451,103 @@ describe('Executor', () => { } }) }) + + describe('Parallel Execution Ordering', () => { + it('should handle missing parallel block mapping gracefully', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + // Test isIterationComplete with missing parallel config + const result = (executor as any).isIterationComplete('nonexistent-parallel', 0, null, context) + expect(result).toBe(true) // Should return true for safety + }) + + it('should correctly identify incomplete iterations', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + const parallelConfig = { + nodes: ['function-1', 'function-2'], + } + + // Add some executed blocks + context.executedBlocks.add('function-1_parallel_parallel-1_iteration_0') + // function-2 iteration 0 is missing + + const result = (executor as any).isIterationComplete('parallel-1', 0, parallelConfig, context) + expect(result).toBe(false) + + // Add the missing block + context.executedBlocks.add('function-2_parallel_parallel-1_iteration_0') + + const completedResult = (executor as any).isIterationComplete( + 'parallel-1', + 0, + parallelConfig, + context + ) + expect(completedResult).toBe(true) + }) + + it('should detect when no more parallel work is available', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + // Add parallel execution state with completed parallel + context.parallelExecutions = new Map([ + [ + 'parallel-1', + { + parallelCount: 3, + currentIteration: 3, + distributionItems: null, + completedExecutions: 3, + executionResults: new Map(), + activeIterations: new Set(), + }, + ], + ]) + + context.completedLoops.add('parallel-1') + + const hasWork = (executor as any).hasMoreParallelWork(context) + expect(hasWork).toBe(false) + }) + + it('should handle empty parallel execution context', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + // No parallel executions + context.parallelExecutions = undefined + + const hasWork = (executor as any).hasMoreParallelWork(context) + expect(hasWork).toBe(false) + }) + + it('should identify complete iterations correctly', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + const parallelConfig = { + nodes: ['function-1', 'function-2', 'function-3'], + } + + // All blocks executed for iteration 1 + context.executedBlocks.add('function-1_parallel_parallel-1_iteration_1') + context.executedBlocks.add('function-2_parallel_parallel-1_iteration_1') + context.executedBlocks.add('function-3_parallel_parallel-1_iteration_1') + + const result = (executor as any).isIterationComplete('parallel-1', 1, parallelConfig, context) + expect(result).toBe(true) + }) + + it('should handle undefined parallel configuration safely', () => { + const executor = new Executor(createMinimalWorkflow()) + const context = createMockContext() + + const result = (executor as any).isIterationComplete('parallel-1', 0, undefined, context) + expect(result).toBe(true) + }) + }) }) diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index 324a05efb9..9c16b5551f 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -250,7 +250,7 @@ export class Executor { } else { // Normal execution without debug mode if (nextLayer.length === 0) { - hasMoreLayers = false + hasMoreLayers = this.hasMoreParallelWork(context) } else { const outputs = await this.executeLayer(nextLayer, context) @@ -1100,46 +1100,6 @@ export class Executor { // If block is inside a parallel, handle multiple instances if (insideParallel && activeParallels.has(insideParallel)) { - const parallelState = activeParallels.get(insideParallel) - - // Create virtual instances for each unprocessed iteration - const virtualBlockIds = this.parallelManager.createVirtualBlockInstances( - block, - insideParallel, - parallelState, - executedBlocks, - context.activeExecutionPath - ) - - for (const virtualBlockId of virtualBlockIds) { - // Check dependencies for this virtual instance - const incomingConnections = this.actualWorkflow.connections.filter( - (conn) => conn.target === block.id - ) - - const iterationIndex = Number.parseInt(virtualBlockId.split('_iteration_')[1]) - const allDependenciesMet = this.checkDependencies( - incomingConnections, - executedBlocks, - context, - insideParallel, - iterationIndex - ) - - if (allDependenciesMet) { - pendingBlocks.add(virtualBlockId) - - // Store mapping for virtual block - if (!context.parallelBlockMapping) { - context.parallelBlockMapping = new Map() - } - context.parallelBlockMapping.set(virtualBlockId, { - originalBlockId: block.id, - parallelId: insideParallel, - iterationIndex: iterationIndex, - }) - } - } } else if (insideParallel) { // Block is inside a parallel but the parallel is not active // Check if all virtual instances have been executed @@ -1203,9 +1163,197 @@ export class Executor { } } + this.processParallelBlocks(activeParallels, context, pendingBlocks) + return Array.from(pendingBlocks) } + /** + * Process all active parallel blocks with proper dependency ordering within iterations. + * This ensures that blocks with dependencies within the same iteration are executed + * in the correct order, preventing race conditions. Only processes one iteration at a time + * to maintain proper execution order. + * + * @param activeParallels - Map of active parallel executions + * @param context - Execution context + * @param pendingBlocks - Set to add ready blocks to + */ + private processParallelBlocks( + activeParallels: Map, + context: ExecutionContext, + pendingBlocks: Set + ): void { + for (const [parallelId, parallelState] of activeParallels) { + const parallel = this.actualWorkflow.parallels?.[parallelId] + if (!parallel) continue + + // Process all incomplete iterations concurrently + // Each iteration maintains proper dependency order internally + for (let iteration = 0; iteration < parallelState.parallelCount; iteration++) { + if (this.isIterationComplete(parallelId, iteration, parallel, context)) { + continue // This iteration is already complete + } + + // Process this iteration - all iterations run concurrently + this.processParallelIteration(parallelId, iteration, parallel, context, pendingBlocks) + } + } + } + + /** + * Check if a specific parallel iteration is complete (all blocks executed). + * + * @param parallelId - ID of the parallel block + * @param iteration - Iteration index to check + * @param parallel - Parallel configuration + * @param context - Execution context + * @returns Whether the iteration is complete + */ + private isIterationComplete( + parallelId: string, + iteration: number, + parallel: any, + context: ExecutionContext + ): boolean { + if (!parallel || !parallel.nodes) { + return true + } + + for (const nodeId of parallel.nodes) { + const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}` + if (!context.executedBlocks.has(virtualBlockId)) { + return false + } + } + return true + } + + /** + * Check if there are more parallel iterations to process. + * This ensures the execution loop continues when iterations are being processed sequentially. + */ + private hasMoreParallelWork(context: ExecutionContext): boolean { + if (!context.parallelExecutions) { + return false + } + + for (const [parallelId, parallelState] of context.parallelExecutions) { + // Skip completed parallels + if (context.completedLoops.has(parallelId)) { + continue + } + + // Check if this parallel is active + if ( + parallelState.currentIteration > 0 && + parallelState.currentIteration <= parallelState.parallelCount + ) { + const parallel = this.actualWorkflow.parallels?.[parallelId] + if (!parallel) continue + + // Check if there are incomplete iterations + for (let iteration = 0; iteration < parallelState.parallelCount; iteration++) { + if (!this.isIterationComplete(parallelId, iteration, parallel, context)) { + return true + } + } + } + } + + return false + } + + /** + * Process a single parallel iteration with topological ordering of dependencies. + * + * @param parallelId - ID of the parallel block + * @param iteration - Current iteration index + * @param parallel - Parallel configuration + * @param context - Execution context + * @param pendingBlocks - Set to add ready blocks to + */ + private processParallelIteration( + parallelId: string, + iteration: number, + parallel: any, + context: ExecutionContext, + pendingBlocks: Set + ): void { + const iterationBlocks = new Map< + string, + { + virtualBlockId: string + originalBlockId: string + dependencies: string[] + isExecuted: boolean + } + >() + + // Build dependency graph for this iteration + for (const nodeId of parallel.nodes) { + const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}` + const isExecuted = context.executedBlocks.has(virtualBlockId) + + if (isExecuted) { + continue // Skip already executed blocks + } + + const block = this.actualWorkflow.blocks.find((b) => b.id === nodeId) + if (!block || !block.enabled) continue + + // Find dependencies within this iteration + const incomingConnections = this.actualWorkflow.connections.filter( + (conn) => conn.target === nodeId + ) + + const dependencies: string[] = [] + for (const conn of incomingConnections) { + // Check if the source is within the same parallel + if (parallel.nodes.includes(conn.source)) { + const sourceDependencyId = `${conn.source}_parallel_${parallelId}_iteration_${iteration}` + dependencies.push(sourceDependencyId) + } else { + // External dependency - check if it's met + const isExternalDepMet = this.checkDependencies([conn], context.executedBlocks, context) + if (!isExternalDepMet) { + // External dependency not met, skip this block for now + return + } + } + } + + iterationBlocks.set(virtualBlockId, { + virtualBlockId, + originalBlockId: nodeId, + dependencies, + isExecuted, + }) + } + + // Find blocks with no unmet dependencies within this iteration + for (const [virtualBlockId, blockInfo] of iterationBlocks) { + const unmetDependencies = blockInfo.dependencies.filter((depId) => { + // Check if dependency is executed OR not in this iteration (external) + return !context.executedBlocks.has(depId) && iterationBlocks.has(depId) + }) + + if (unmetDependencies.length === 0) { + // All dependencies within this iteration are met + pendingBlocks.add(virtualBlockId) + + // Store mapping for virtual block + if (!context.parallelBlockMapping) { + context.parallelBlockMapping = new Map() + } + context.parallelBlockMapping.set(virtualBlockId, { + originalBlockId: blockInfo.originalBlockId, + parallelId: parallelId, + iterationIndex: iteration, + }) + } + } + } + /** * Checks if all dependencies for a block are met. * Handles special cases for different connection types. @@ -1501,6 +1649,14 @@ export class Executor { if (parallelInfo) { blockLog.blockId = blockId blockLog.blockName = `${block.metadata?.name || ''} (iteration ${parallelInfo.iterationIndex + 1})` + } else { + const containingLoopId = this.resolver.getContainingLoopId(block.id) + if (containingLoopId) { + const currentIteration = context.loopIterations.get(containingLoopId) + if (currentIteration !== undefined) { + blockLog.blockName = `${block.metadata?.name || ''} (iteration ${currentIteration})` + } + } } const addConsole = useConsoleStore.getState().addConsole diff --git a/apps/sim/executor/loops/loops.ts b/apps/sim/executor/loops/loops.ts index 56c68c15dd..7218e2a624 100644 --- a/apps/sim/executor/loops/loops.ts +++ b/apps/sim/executor/loops/loops.ts @@ -45,15 +45,9 @@ export class LoopManager { // Check if all blocks in the loop have been executed const allBlocksInLoopExecuted = this.allBlocksExecuted(loop.nodes, context) - logger.info(`Loop ${loopId} - Processing iteration check`) - logger.info(` Loop block executed: ${loopBlockExecuted}`) - logger.info(` All blocks executed: ${allBlocksInLoopExecuted}`) - logger.info(` Blocks in loop: ${loop.nodes.join(', ')}`) - logger.info(` Current iteration: ${context.loopIterations.get(loopId) || 0}`) - if (allBlocksInLoopExecuted) { // All blocks in the loop have been executed - const currentIteration = context.loopIterations.get(loopId) || 0 + const currentIteration = context.loopIterations.get(loopId) || 1 // Store the results from this iteration before potentially resetting blocks const iterationResults: any[] = [] @@ -70,7 +64,7 @@ export class LoopManager { this.storeIterationResult( context, loopId, - currentIteration - 1, + currentIteration - 1, // Convert back to 0-based for storage 'iteration', iterationResults ) @@ -109,21 +103,16 @@ export class LoopManager { logger.info(`Loop ${loopId} - Current: ${currentIteration}, Max: ${maxIterations}`) // Check if we've completed all iterations - // The loop handler increments the counter after setting up each iteration - // So if currentIteration equals maxIterations, we've completed all iterations if (currentIteration >= maxIterations) { - // This was the last iteration hasLoopReachedMaxIterations = true logger.info(`Loop ${loopId} has completed all ${maxIterations} iterations`) - // Aggregate results from all iterations using stored results const results = [] const loopState = context.loopExecutions?.get(loopId) if (loopState) { for (let i = 0; i < maxIterations; i++) { const result = loopState.executionResults.get(`iteration_${i}`) if (result) { - // If result is an array (from multiple blocks in the loop), flatten it if (Array.isArray(result)) { results.push(...result) } else { @@ -133,7 +122,6 @@ export class LoopManager { } } - // Store the aggregated results in the loop block's state so subsequent blocks can reference them const aggregatedOutput = { loopId, currentIteration: maxIterations - 1, // Last iteration index @@ -144,17 +132,14 @@ export class LoopManager { message: `Completed all ${maxIterations} iterations`, } - // Store the aggregated results in context so blocks connected to loop-end-source can access them context.blockStates.set(loopId, { output: aggregatedOutput, executed: true, - executionTime: 0, // Loop coordination doesn't have meaningful execution time + executionTime: 0, }) - // Mark this loop as completed context.completedLoops.add(loopId) - // Activate the loop-end-source connections to continue workflow after loop const loopEndConnections = context.workflow?.connections.filter( (conn) => conn.source === loopId && conn.sourceHandle === 'loop-end-source' @@ -167,14 +152,15 @@ export class LoopManager { logger.info(`Loop ${loopId} - Completed and activated end connections`) } else { - // More iterations to go - reset the blocks inside the loop + context.loopIterations.set(loopId, currentIteration + 1) + logger.info(`Loop ${loopId} - Incremented counter to ${currentIteration + 1}`) + this.resetLoopBlocks(loopId, loop, context) - // Reset the loop block itself so it can execute again context.executedBlocks.delete(loopId) context.blockStates.delete(loopId) - logger.info(`Loop ${loopId} - Reset for iteration ${currentIteration}`) + logger.info(`Loop ${loopId} - Reset for iteration ${currentIteration + 1}`) } } } @@ -193,7 +179,6 @@ export class LoopManager { return Object.keys(forEachItems).length } if (typeof forEachItems === 'string') { - // Try to parse if it's a JSON string try { const parsed = JSON.parse(forEachItems) if (Array.isArray(parsed)) { @@ -202,9 +187,7 @@ export class LoopManager { if (typeof parsed === 'object' && parsed !== null) { return Object.keys(parsed).length } - } catch { - // Not valid JSON - } + } catch {} } return 0 } @@ -217,28 +200,17 @@ export class LoopManager { * @param context - Current execution context */ private resetLoopBlocks(loopId: string, loop: SerializedLoop, context: ExecutionContext): void { - logger.info(`Resetting blocks for loop ${loopId}`) - // Reset all blocks in the loop for (const nodeId of loop.nodes) { - // Remove from executed blocks context.executedBlocks.delete(nodeId) - // Clear the block state context.blockStates.delete(nodeId) - // Remove from active execution path context.activeExecutionPath.delete(nodeId) - // Clear any routing decisions for this block context.decisions.router.delete(nodeId) context.decisions.condition.delete(nodeId) - - logger.info(`Reset block ${nodeId} in loop ${loopId} for next iteration`) } - - logger.info(`After reset - executed blocks: ${Array.from(context.executedBlocks).join(', ')}`) - logger.info(`After reset - active paths: ${Array.from(context.activeExecutionPath).join(', ')}`) } /** diff --git a/apps/sim/executor/path/path.ts b/apps/sim/executor/path/path.ts index 87bb69c3b8..e8ac8a8359 100644 --- a/apps/sim/executor/path/path.ts +++ b/apps/sim/executor/path/path.ts @@ -43,8 +43,6 @@ export class PathTracker { * @param context - Current execution context */ updateExecutionPaths(executedBlockIds: string[], context: ExecutionContext): void { - logger.info(`Updating paths for blocks: ${executedBlockIds.join(', ')}`) - for (const blockId of executedBlockIds) { const block = this.getBlock(blockId) if (!block) continue diff --git a/apps/sim/executor/resolver/resolver.test.ts b/apps/sim/executor/resolver/resolver.test.ts index 8cd9a08508..e03246b7f6 100644 --- a/apps/sim/executor/resolver/resolver.test.ts +++ b/apps/sim/executor/resolver/resolver.test.ts @@ -75,8 +75,11 @@ describe('InputResolver', () => { workflowId: 'test-workflow', workflow: sampleWorkflow, blockStates: new Map([ - ['starter-block', { output: { input: 'Hello World', type: 'text' } }], - ['function-block', { output: { result: '42' } }], // String value as it would be in real app + [ + 'starter-block', + { output: { input: 'Hello World', type: 'text' }, executed: true, executionTime: 0 }, + ], + ['function-block', { output: { result: '42' }, executed: true, executionTime: 0 }], // String value as it would be in real app ]), activeExecutionPath: new Set(['starter-block', 'function-block']), blockLogs: [], @@ -2885,7 +2888,7 @@ describe('InputResolver', () => { workflow: extendedWorkflow, blockStates: new Map([ ...mockContext.blockStates, - ['testblock', { output: { result: 'test result' } }], + ['testblock', { output: { result: 'test result' }, executed: true, executionTime: 0 }], ]), activeExecutionPath: new Set([...mockContext.activeExecutionPath, 'testblock']), } @@ -3028,4 +3031,323 @@ describe('InputResolver', () => { expect(result.content3).toBe('result = 10 + 5') }) }) + + describe('Virtual Block Reference Resolution (Parallel Execution)', () => { + let parallelWorkflow: SerializedWorkflow + let parallelContext: ExecutionContext + let resolver: InputResolver + + beforeEach(() => { + parallelWorkflow = { + version: '2.0', + blocks: [ + { + id: 'start-block', + metadata: { id: BlockType.STARTER, name: 'Start', category: 'triggers' }, + position: { x: 0, y: 0 }, + config: { tool: BlockType.STARTER, params: {} }, + inputs: {}, + outputs: {}, + enabled: true, + }, + { + id: 'parallel-block', + metadata: { id: BlockType.PARALLEL, name: 'Parallel' }, + position: { x: 200, y: 0 }, + config: { tool: BlockType.PARALLEL, params: { count: 3 } }, + inputs: {}, + outputs: {}, + enabled: true, + }, + { + id: 'function1-block', + metadata: { id: BlockType.FUNCTION, name: 'Function 1' }, + position: { x: 100, y: 100 }, + config: { + tool: BlockType.FUNCTION, + params: { code: 'return ' }, + }, + inputs: {}, + outputs: {}, + enabled: true, + }, + { + id: 'function2-block', + metadata: { id: BlockType.FUNCTION, name: 'Function 2' }, + position: { x: 300, y: 100 }, + config: { + tool: BlockType.FUNCTION, + params: { code: 'return * 2' }, + }, + inputs: {}, + outputs: {}, + enabled: true, + }, + ], + connections: [ + { source: 'start-block', target: 'parallel-block', sourceHandle: 'source' }, + { + source: 'parallel-block', + target: 'function1-block', + sourceHandle: 'parallel-start-source', + }, + { source: 'function1-block', target: 'function2-block', sourceHandle: 'source' }, + ], + loops: {}, + parallels: { + 'parallel-block': { + id: 'parallel-block', + nodes: ['function1-block', 'function2-block'], + count: 3, + }, + }, + } + + parallelContext = { + workflowId: 'test-parallel-workflow', + workflow: parallelWorkflow, + blockStates: new Map([ + [ + 'function1-block', + { output: { result: 'should-not-use-this' }, executed: true, executionTime: 0 }, + ], + + [ + 'function1-block_parallel_parallel-block_iteration_0', + { output: { result: 0 }, executed: true, executionTime: 0 }, + ], + [ + 'function2-block_parallel_parallel-block_iteration_0', + { output: { result: 0 }, executed: true, executionTime: 0 }, + ], + + [ + 'function1-block_parallel_parallel-block_iteration_1', + { output: { result: 1 }, executed: true, executionTime: 0 }, + ], + [ + 'function2-block_parallel_parallel-block_iteration_1', + { output: { result: 2 }, executed: true, executionTime: 0 }, + ], + + [ + 'function1-block_parallel_parallel-block_iteration_2', + { output: { result: 2 }, executed: true, executionTime: 0 }, + ], + [ + 'function2-block_parallel_parallel-block_iteration_2', + { output: { result: 4 }, executed: true, executionTime: 0 }, + ], + ]), + activeExecutionPath: new Set([ + 'start-block', + 'parallel-block', + 'function1-block', + 'function2-block', + ]), + blockLogs: [], + metadata: { startTime: new Date().toISOString(), duration: 0 }, + environmentVariables: {}, + decisions: { router: new Map(), condition: new Map() }, + loopIterations: new Map(), + loopItems: new Map(), + completedLoops: new Set(), + executedBlocks: new Set(['start-block']), + parallelExecutions: new Map([ + [ + 'parallel-block', + { + parallelCount: 3, + currentIteration: 3, + distributionItems: null, + completedExecutions: 3, + executionResults: new Map(), + activeIterations: new Set(), + }, + ], + ]), + parallelBlockMapping: new Map([ + [ + 'function2-block_parallel_parallel-block_iteration_0', + { + originalBlockId: 'function2-block', + parallelId: 'parallel-block', + iterationIndex: 0, + }, + ], + [ + 'function2-block_parallel_parallel-block_iteration_1', + { + originalBlockId: 'function2-block', + parallelId: 'parallel-block', + iterationIndex: 1, + }, + ], + [ + 'function2-block_parallel_parallel-block_iteration_2', + { + originalBlockId: 'function2-block', + parallelId: 'parallel-block', + iterationIndex: 2, + }, + ], + ]), + } + + resolver = new InputResolver(parallelWorkflow, {}) + }) + + it('should resolve references to blocks within same parallel iteration', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_0' + + const result = resolver.resolveInputs(function2Block, parallelContext) + + expect(result.code).toBe('return 0 * 2') + }) + + it('should resolve references correctly for different iterations', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_1' + let result = resolver.resolveInputs(function2Block, parallelContext) + expect(result.code).toBe('return 1 * 2') + + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_2' + result = resolver.resolveInputs(function2Block, parallelContext) + expect(result.code).toBe('return 2 * 2') + }) + + it('should fall back to regular resolution for blocks outside parallel', () => { + const function2Block: SerializedBlock = { + ...parallelWorkflow.blocks[3], + config: { + tool: BlockType.FUNCTION, + params: { code: 'return ' }, + }, + } + + parallelContext.blockStates.set('start-block', { + output: { input: 'external-value' }, + executed: true, + executionTime: 0, + }) + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_0' + + const result = resolver.resolveInputs(function2Block, parallelContext) + + expect(result.code).toBe('return "external-value"') + }) + + it('should handle missing virtual block mapping gracefully', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + parallelContext.parallelBlockMapping = new Map() + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_0' + + const result = resolver.resolveInputs(function2Block, parallelContext) + expect(result.code).toBe('return "should-not-use-this" * 2') // Uses regular block state + }) + + it('should handle missing virtual block state gracefully', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + parallelContext.blockStates.delete('function1-block_parallel_parallel-block_iteration_0') + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_0' + + expect(() => { + resolver.resolveInputs(function2Block, parallelContext) + }).toThrow(/No state found for block/) + }) + + it('should not use virtual resolution when not in parallel execution', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + parallelContext.currentVirtualBlockId = undefined + parallelContext.parallelBlockMapping = undefined + + parallelContext.blockStates.set('function1-block', { + output: { result: 'regular-result' }, + executed: true, + executionTime: 0, + }) + + const result = resolver.resolveInputs(function2Block, parallelContext) + + expect(result.code).toBe('return "regular-result" * 2') + }) + + it('should handle complex references within parallel iterations', () => { + const function3Block: SerializedBlock = { + id: 'function3-block', + metadata: { id: BlockType.FUNCTION, name: 'Function 3' }, + position: { x: 500, y: 100 }, + config: { + tool: BlockType.FUNCTION, + params: { code: 'return + ' }, + }, + inputs: {}, + outputs: {}, + enabled: true, + } + + const updatedWorkflow = { + ...parallelWorkflow, + blocks: [...parallelWorkflow.blocks, function3Block], + connections: [ + ...parallelWorkflow.connections, + { source: 'function1-block', target: 'function3-block', sourceHandle: 'source' }, + { source: 'function2-block', target: 'function3-block', sourceHandle: 'source' }, + ], + parallels: { + 'parallel-block': { + id: 'parallel-block', + nodes: ['function1-block', 'function2-block', 'function3-block'], + count: 3, + }, + }, + } + + parallelContext.workflow = updatedWorkflow + + parallelContext.parallelBlockMapping?.set( + 'function3-block_parallel_parallel-block_iteration_1', + { + originalBlockId: 'function3-block', + parallelId: 'parallel-block', + iterationIndex: 1, + } + ) + + parallelContext.currentVirtualBlockId = 'function3-block_parallel_parallel-block_iteration_1' + + const updatedResolver = new InputResolver(updatedWorkflow, {}) + const result = updatedResolver.resolveInputs(function3Block, parallelContext) + + expect(result.code).toBe('return 1 + 2') + }) + + it('should validate that source block is in same parallel before using virtual resolution', () => { + const function2Block = parallelWorkflow.blocks[3] // function2-block + + const modifiedWorkflow = { + ...parallelWorkflow, + parallels: { + 'parallel-block': { + id: 'parallel-block', + nodes: ['function2-block'], + count: 3, + }, + }, + } + + const modifiedResolver = new InputResolver(modifiedWorkflow, {}) + parallelContext.workflow = modifiedWorkflow + parallelContext.currentVirtualBlockId = 'function2-block_parallel_parallel-block_iteration_0' + + const result = modifiedResolver.resolveInputs(function2Block, parallelContext) + expect(result.code).toBe('return "should-not-use-this" * 2') + }) + }) }) diff --git a/apps/sim/executor/resolver/resolver.ts b/apps/sim/executor/resolver/resolver.ts index f03d9a0c43..addcbaa96c 100644 --- a/apps/sim/executor/resolver/resolver.ts +++ b/apps/sim/executor/resolver/resolver.ts @@ -727,7 +727,26 @@ export class InputResolver { continue } - const blockState = context.blockStates.get(sourceBlock.id) + // For parallel execution, check if we need to use the virtual block ID + let blockState = context.blockStates.get(sourceBlock.id) + + // If we're in parallel execution and the source block is also in the same parallel, + // try to get the virtual block state for the same iteration + if ( + context.currentVirtualBlockId && + context.parallelBlockMapping?.has(context.currentVirtualBlockId) + ) { + const currentParallelInfo = context.parallelBlockMapping.get(context.currentVirtualBlockId) + if (currentParallelInfo) { + // Check if the source block is in the same parallel + const parallel = context.workflow?.parallels?.[currentParallelInfo.parallelId] + if (parallel?.nodes.includes(sourceBlock.id)) { + // Try to get the virtual block state for the same iteration + const virtualSourceBlockId = `${sourceBlock.id}_parallel_${currentParallelInfo.parallelId}_iteration_${currentParallelInfo.iterationIndex}` + blockState = context.blockStates.get(virtualSourceBlockId) + } + } + } if (!blockState) { // If the block is in a loop, return empty string diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts index f32e725ba0..d2013dd3e8 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts @@ -333,16 +333,13 @@ export function buildTraceSpans(result: ExecutionResult): { }) } - // Calculate total duration as the sum of root spans - const totalDuration = rootSpans.reduce((sum, span) => sum + span.duration, 0) + const groupedRootSpans = groupIterationBlocks(rootSpans) - // Create a synthetic workflow span that represents the entire execution - // This ensures we have a consistent top-level representation - if (rootSpans.length > 0 && result.metadata) { - // Get all spans to calculate accurate timings + const totalDuration = groupedRootSpans.reduce((sum, span) => sum + span.duration, 0) + + if (groupedRootSpans.length > 0 && result.metadata) { const allSpansList = Array.from(spanMap.values()) - // Find the earliest start time and latest end time across all spans const earliestStart = allSpansList.reduce((earliest, span) => { const startTime = new Date(span.startTime).getTime() return startTime < earliest ? startTime : earliest @@ -353,14 +350,10 @@ export function buildTraceSpans(result: ExecutionResult): { return endTime > latest ? endTime : latest }, 0) - // Calculate actual workflow duration from earliest start to latest end - // This correctly accounts for parallel execution const actualWorkflowDuration = latestEnd - earliestStart - // Check if any spans have errors to determine overall workflow status - const hasErrors = rootSpans.some((span) => { + const hasErrors = groupedRootSpans.some((span) => { if (span.status === 'error') return true - // Recursively check children for errors const checkChildren = (children: TraceSpan[] = []): boolean => { return children.some( (child) => child.status === 'error' || (child.children && checkChildren(child.children)) @@ -369,7 +362,6 @@ export function buildTraceSpans(result: ExecutionResult): { return span.children && checkChildren(span.children) }) - // Create the workflow span const workflowSpan: TraceSpan = { id: 'workflow-execution', name: 'Workflow Execution', @@ -378,22 +370,232 @@ export function buildTraceSpans(result: ExecutionResult): { startTime: new Date(earliestStart).toISOString(), endTime: new Date(latestEnd).toISOString(), status: hasErrors ? 'error' : 'success', - children: rootSpans, + children: groupedRootSpans, } - // Return this as the only root span, using the actual duration for total return { traceSpans: [workflowSpan], totalDuration: actualWorkflowDuration } } - return { traceSpans: rootSpans, totalDuration } + return { traceSpans: groupedRootSpans, totalDuration } +} + +/** + * Groups iteration-based blocks (parallel and loop) by organizing their iteration spans + * into a hierarchical structure with proper parent-child relationships. + * + * @param spans - Array of root spans to process + * @returns Array of spans with iteration blocks properly grouped + */ +function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] { + const result: TraceSpan[] = [] + const iterationSpans: TraceSpan[] = [] + const normalSpans: TraceSpan[] = [] + + spans.forEach((span) => { + const iterationMatch = span.name.match(/^(.+) \(iteration (\d+)\)$/) + if (iterationMatch) { + iterationSpans.push(span) + } else { + normalSpans.push(span) + } + }) + + const nonIterationContainerSpans = normalSpans.filter( + (span) => span.type !== 'parallel' && span.type !== 'loop' + ) + + if (iterationSpans.length > 0) { + const containerGroups = new Map< + string, + { + type: 'parallel' | 'loop' + containerId: string + containerName: string + spans: TraceSpan[] + } + >() + + iterationSpans.forEach((span) => { + const iterationMatch = span.name.match(/^(.+) \(iteration (\d+)\)$/) + if (iterationMatch) { + let containerType: 'parallel' | 'loop' = 'loop' + let containerId = 'unknown' + let containerName = 'Unknown' + + if (span.blockId?.includes('_parallel_')) { + const parallelMatch = span.blockId.match(/_parallel_([^_]+)_iteration_/) + if (parallelMatch) { + containerType = 'parallel' + containerId = parallelMatch[1] + + const parallelBlock = normalSpans.find( + (s) => s.blockId === containerId && s.type === 'parallel' + ) + containerName = parallelBlock?.name || `Parallel ${containerId}` + } + } else { + containerType = 'loop' + + const loopBlock = normalSpans.find((s) => s.type === 'loop') + if (loopBlock) { + containerId = loopBlock.blockId || 'loop-1' + containerName = loopBlock.name || `Loop ${loopBlock.blockId || '1'}` + } else { + containerId = 'loop-1' + containerName = 'Loop 1' + } + } + + const groupKey = `${containerType}_${containerId}` + + if (!containerGroups.has(groupKey)) { + containerGroups.set(groupKey, { + type: containerType, + containerId, + containerName, + spans: [], + }) + } + + containerGroups.get(groupKey)!.spans.push(span) + } + }) + + containerGroups.forEach((group, groupKey) => { + const { type, containerId, containerName, spans } = group + + const iterationGroups = new Map() + + spans.forEach((span) => { + const iterationMatch = span.name.match(/^(.+) \(iteration (\d+)\)$/) + if (iterationMatch) { + const iterationIndex = Number.parseInt(iterationMatch[2]) + + if (!iterationGroups.has(iterationIndex)) { + iterationGroups.set(iterationIndex, []) + } + iterationGroups.get(iterationIndex)!.push(span) + } + }) + + if (type === 'parallel') { + const allIterationSpans = spans + + const startTimes = allIterationSpans.map((span) => new Date(span.startTime).getTime()) + const endTimes = allIterationSpans.map((span) => new Date(span.endTime).getTime()) + const earliestStart = Math.min(...startTimes) + const latestEnd = Math.max(...endTimes) + const totalDuration = latestEnd - earliestStart + + const iterationChildren: TraceSpan[] = [] + + const sortedIterations = Array.from(iterationGroups.entries()).sort(([a], [b]) => a - b) + + sortedIterations.forEach(([iterationIndex, spans]) => { + const iterStartTimes = spans.map((span) => new Date(span.startTime).getTime()) + const iterEndTimes = spans.map((span) => new Date(span.endTime).getTime()) + const iterEarliestStart = Math.min(...iterStartTimes) + const iterLatestEnd = Math.max(...iterEndTimes) + const iterDuration = iterLatestEnd - iterEarliestStart + + const hasErrors = spans.some((span) => span.status === 'error') + + const iterationSpan: TraceSpan = { + id: `${containerId}-iteration-${iterationIndex}`, + name: `Iteration ${iterationIndex}`, + type: 'parallel-iteration', + duration: iterDuration, + startTime: new Date(iterEarliestStart).toISOString(), + endTime: new Date(iterLatestEnd).toISOString(), + status: hasErrors ? 'error' : 'success', + children: spans.map((span) => ({ + ...span, + name: span.name.replace(/ \(iteration \d+\)$/, ''), + })), + } + + iterationChildren.push(iterationSpan) + }) + + const hasErrors = allIterationSpans.some((span) => span.status === 'error') + const parallelContainer: TraceSpan = { + id: `parallel-execution-${containerId}`, + name: containerName, + type: 'parallel', + duration: totalDuration, + startTime: new Date(earliestStart).toISOString(), + endTime: new Date(latestEnd).toISOString(), + status: hasErrors ? 'error' : 'success', + children: iterationChildren, + } + + result.push(parallelContainer) + } else { + const allIterationSpans = spans + + const startTimes = allIterationSpans.map((span) => new Date(span.startTime).getTime()) + const endTimes = allIterationSpans.map((span) => new Date(span.endTime).getTime()) + const earliestStart = Math.min(...startTimes) + const latestEnd = Math.max(...endTimes) + const totalDuration = latestEnd - earliestStart + + const iterationChildren: TraceSpan[] = [] + + const sortedIterations = Array.from(iterationGroups.entries()).sort(([a], [b]) => a - b) + + sortedIterations.forEach(([iterationIndex, spans]) => { + const iterStartTimes = spans.map((span) => new Date(span.startTime).getTime()) + const iterEndTimes = spans.map((span) => new Date(span.endTime).getTime()) + const iterEarliestStart = Math.min(...iterStartTimes) + const iterLatestEnd = Math.max(...iterEndTimes) + const iterDuration = iterLatestEnd - iterEarliestStart + + const hasErrors = spans.some((span) => span.status === 'error') + + const iterationSpan: TraceSpan = { + id: `${containerId}-iteration-${iterationIndex}`, + name: `Iteration ${iterationIndex}`, + type: 'loop-iteration', + duration: iterDuration, + startTime: new Date(iterEarliestStart).toISOString(), + endTime: new Date(iterLatestEnd).toISOString(), + status: hasErrors ? 'error' : 'success', + children: spans.map((span) => ({ + ...span, + name: span.name.replace(/ \(iteration \d+\)$/, ''), + })), + } + + iterationChildren.push(iterationSpan) + }) + + const hasErrors = allIterationSpans.some((span) => span.status === 'error') + const loopContainer: TraceSpan = { + id: `loop-execution-${containerId}`, + name: containerName, + type: 'loop', + duration: totalDuration, + startTime: new Date(earliestStart).toISOString(), + endTime: new Date(latestEnd).toISOString(), + status: hasErrors ? 'error' : 'success', + children: iterationChildren, + } + + result.push(loopContainer) + } + }) + } + + result.push(...nonIterationContainerSpans) + + result.sort((a, b) => new Date(a.startTime).getTime() - new Date(b.startTime).getTime()) + + return result } -// Helper function to recursively process nested workflow blocks in trace spans function ensureNestedWorkflowsProcessed(span: TraceSpan): TraceSpan { - // Create a copy to avoid mutating the original const processedSpan = { ...span } - // If this is a workflow block and it has childTraceSpans in its output, process them if ( span.type === 'workflow' && span.output?.childTraceSpans && @@ -403,27 +605,22 @@ function ensureNestedWorkflowsProcessed(span: TraceSpan): TraceSpan { const nestedChildren: TraceSpan[] = [] childTraceSpans.forEach((childSpan) => { - // Skip synthetic workflow wrappers and get the actual blocks if ( childSpan.type === 'workflow' && (childSpan.name === 'Workflow Execution' || childSpan.name.endsWith(' workflow')) ) { if (childSpan.children && Array.isArray(childSpan.children)) { - // Recursively process each child to handle deeper nesting childSpan.children.forEach((grandchildSpan) => { nestedChildren.push(ensureNestedWorkflowsProcessed(grandchildSpan)) }) } } else { - // Regular span, recursively process it for potential deeper nesting nestedChildren.push(ensureNestedWorkflowsProcessed(childSpan)) } }) - // Set the processed children on this workflow block processedSpan.children = nestedChildren } else if (span.children && Array.isArray(span.children)) { - // Recursively process regular children too processedSpan.children = span.children.map((child) => ensureNestedWorkflowsProcessed(child)) } diff --git a/apps/sim/package.json b/apps/sim/package.json index 9dc616bf9e..3a61bd658c 100644 --- a/apps/sim/package.json +++ b/apps/sim/package.json @@ -168,7 +168,7 @@ "overrides": { "next": "15.4.1", "@next/env": "15.4.1", - "drizzle-orm": "^0.41.0", + "drizzle-orm": "^0.44.5", "postgres": "^3.4.5" } }