diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 99bad6f89f..038a3d7948 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -292,12 +292,12 @@ export async function executeWorkflowForChat( logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`) - // Find the workflow + // Find the workflow (deployedState is NOT deprecated - needed for chat execution) const workflowResult = await db .select({ - state: workflow.state, - deployedState: workflow.deployedState, isDeployed: workflow.isDeployed, + deployedState: workflow.deployedState, + variables: workflow.variables, }) .from(workflow) .where(eq(workflow.id, workflowId)) @@ -308,9 +308,14 @@ export async function executeWorkflowForChat( throw new Error('Workflow not available') } - // Use deployed state for execution - const state = workflowResult[0].deployedState || workflowResult[0].state - const { blocks, edges, loops, parallels } = state as WorkflowState + // For chat execution, use ONLY the deployed state (no fallback) + if (!workflowResult[0].deployedState) { + throw new Error(`Workflow must be deployed to be available for chat`) + } + + // Use deployed state for chat execution (this is the stable, deployed version) + const deployedState = workflowResult[0].deployedState as WorkflowState + const { blocks, edges, loops, parallels } = deployedState // Prepare for execution, similar to use-workflow-execution.ts const mergedStates = mergeSubblockState(blocks) @@ -344,16 +349,13 @@ export async function executeWorkflowForChat( logger.warn(`[${requestId}] Could not fetch environment variables:`, error) } - // Get workflow variables let workflowVariables = {} try { - // The workflow state may contain variables - const workflowState = state as any - if (workflowState.variables) { + if (workflowResult[0].variables) { workflowVariables = - typeof workflowState.variables === 'string' - ? JSON.parse(workflowState.variables) - : workflowState.variables + typeof workflowResult[0].variables === 'string' + ? JSON.parse(workflowResult[0].variables) + : workflowResult[0].variables } } catch (error) { logger.warn(`[${requestId}] Could not parse workflow variables:`, error) diff --git a/apps/sim/app/api/schedules/execute/route.test.ts b/apps/sim/app/api/schedules/execute/route.test.ts index e7b91a945a..e05cea9514 100644 --- a/apps/sim/app/api/schedules/execute/route.test.ts +++ b/apps/sim/app/api/schedules/execute/route.test.ts @@ -17,6 +17,17 @@ describe('Scheduled Workflow Execution API Route', () => { mockExecutionDependencies() + // Mock the normalized tables helper + vi.doMock('@/lib/workflows/db-helpers', () => ({ + loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + blocks: sampleWorkflowState.blocks, + edges: sampleWorkflowState.edges || [], + loops: sampleWorkflowState.loops || {}, + parallels: sampleWorkflowState.parallels || {}, + isFromNormalizedTables: true, + }), + })) + vi.doMock('croner', () => ({ Cron: vi.fn().mockImplementation(() => ({ nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index c63eba2057..84652ee3bd 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -14,13 +14,13 @@ import { } from '@/lib/schedules/utils' import { checkServerSideUsageLimits } from '@/lib/usage-monitor' import { decryptSecret } from '@/lib/utils' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { db } from '@/db' import { environment, userStats, workflow, workflowSchedule } from '@/db/schema' import { Executor } from '@/executor' import { Serializer } from '@/serializer' import { mergeSubblockState } from '@/stores/workflows/server-utils' -import type { WorkflowState } from '@/stores/workflows/workflow/types' // Add dynamic export to prevent caching export const dynamic = 'force-dynamic' @@ -149,8 +149,27 @@ export async function GET(req: NextRequest) { continue } - const state = workflowRecord.state as WorkflowState - const { blocks, edges, loops, parallels } = state + // Load workflow data from normalized tables (no fallback to deprecated state column) + logger.debug( + `[${requestId}] Loading workflow ${schedule.workflowId} from normalized tables` + ) + const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId) + + if (!normalizedData) { + logger.error( + `[${requestId}] No normalized data found for scheduled workflow ${schedule.workflowId}` + ) + throw new Error(`Workflow data not found in normalized tables for ${schedule.workflowId}`) + } + + // Use normalized data only + const blocks = normalizedData.blocks + const edges = normalizedData.edges + const loops = normalizedData.loops + const parallels = normalizedData.parallels + logger.info( + `[${requestId}] Loaded scheduled workflow ${schedule.workflowId} from normalized tables` + ) const mergedStates = mergeSubblockState(blocks) @@ -405,9 +424,13 @@ export async function GET(req: NextRequest) { .limit(1) if (workflowRecord) { - const state = workflowRecord.state as WorkflowState - const { blocks } = state - nextRunAt = calculateNextRunTime(schedule, blocks) + const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId) + + if (!normalizedData) { + nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) + } else { + nextRunAt = calculateNextRunTime(schedule, normalizedData.blocks) + } } else { nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) } diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index 3887b30d8a..8293dbed4f 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -5,11 +5,7 @@ import { NextRequest } from 'next/server' * @vitest-environment node */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { - createMockRequest, - mockExecutionDependencies, - sampleWorkflowState, -} from '@/app/api/__test-utils__/utils' +import { createMockRequest, mockExecutionDependencies } from '@/app/api/__test-utils__/utils' // Define mock functions at the top level to be used in mocks const hasProcessedMessageMock = vi.fn().mockResolvedValue(false) @@ -148,10 +144,18 @@ describe('Webhook Trigger API Route', () => { vi.resetAllMocks() vi.clearAllTimers() - // Mock all dependencies mockExecutionDependencies() - // Reset mock behaviors to default for each test + vi.doMock('@/lib/workflows/db-helpers', () => ({ + loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + isFromNormalizedTables: true, + }), + })) + hasProcessedMessageMock.mockResolvedValue(false) markMessageAsProcessedMock.mockResolvedValue(true) acquireLockMock.mockResolvedValue(true) @@ -159,12 +163,10 @@ describe('Webhook Trigger API Route', () => { processGenericDeduplicationMock.mockResolvedValue(null) processWebhookMock.mockResolvedValue(new Response('Webhook processed', { status: 200 })) - // Restore original crypto.randomUUID if it was mocked if ((global as any).crypto?.randomUUID) { vi.spyOn(crypto, 'randomUUID').mockRestore() } - // Mock crypto.randomUUID to return predictable values vi.spyOn(crypto, 'randomUUID').mockReturnValue('mock-uuid-12345') }) @@ -263,7 +265,6 @@ describe('Webhook Trigger API Route', () => { workflow: { id: 'workflow-id', userId: 'user-id', - state: sampleWorkflowState, }, }, ]) @@ -355,7 +356,6 @@ describe('Webhook Trigger API Route', () => { workflow: { id: 'workflow-id', userId: 'user-id', - state: sampleWorkflowState, }, }, ]) @@ -409,7 +409,6 @@ describe('Webhook Trigger API Route', () => { workflow: { id: 'workflow-id', userId: 'user-id', - state: sampleWorkflowState, }, }, ]) @@ -482,7 +481,6 @@ describe('Webhook Trigger API Route', () => { workflow: { id: 'workflow-id', userId: 'user-id', - state: sampleWorkflowState, }, }, ]) @@ -553,7 +551,6 @@ describe('Webhook Trigger API Route', () => { workflow: { id: 'workflow-id', userId: 'user-id', - state: sampleWorkflowState, }, }, ]) diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.ts index 04abd34b08..76d06b483c 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.ts @@ -12,6 +12,7 @@ import { processWebhook, processWhatsAppDeduplication, } from '@/lib/webhooks/utils' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' import { db } from '@/db' import { webhook, workflow } from '@/db/schema' @@ -187,6 +188,24 @@ export async function POST( foundWebhook = webhooks[0].webhook foundWorkflow = webhooks[0].workflow + const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id) + + if (!normalizedData) { + logger.error(`[${requestId}] No normalized data found for webhook workflow ${foundWorkflow.id}`) + return new NextResponse('Workflow data not found in normalized tables', { status: 500 }) + } + + // Construct state from normalized data only (execution-focused, no frontend state fields) + foundWorkflow.state = { + blocks: normalizedData.blocks, + edges: normalizedData.edges, + loops: normalizedData.loops, + parallels: normalizedData.parallels, + lastSaved: Date.now(), + isDeployed: foundWorkflow.isDeployed || false, + deployedAt: foundWorkflow.deployedAt, + } + // Special handling for Telegram webhooks to work around middleware User-Agent checks if (foundWebhook.provider === 'telegram') { // Log detailed information about the request for debugging diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.test.ts b/apps/sim/app/api/workflows/[id]/deploy/route.test.ts index 8fb1344998..a0a6fc6021 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.test.ts @@ -31,6 +31,27 @@ describe('Workflow Deployment API Route', () => { }), })) + vi.doMock('@/lib/workflows/db-helpers', () => ({ + loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + blocks: { + 'block-1': { + id: 'block-1', + type: 'starter', + name: 'Start', + position: { x: 100, y: 100 }, + enabled: true, + subBlocks: {}, + outputs: {}, + data: {}, + }, + }, + edges: [], + loops: {}, + parallels: {}, + isFromNormalizedTables: true, + }), + })) + vi.doMock('../../middleware', () => ({ validateWorkflowAccess: vi.fn().mockResolvedValue({ workflow: { @@ -74,6 +95,7 @@ describe('Workflow Deployment API Route', () => { isDeployed: false, deployedAt: null, userId: 'user-id', + deployedState: null, }, ]), }), @@ -129,7 +151,6 @@ describe('Workflow Deployment API Route', () => { }), }), }) - // Mock normalized table queries (blocks, edges, subflows) .mockReturnValueOnce({ from: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue([ @@ -216,7 +237,6 @@ describe('Workflow Deployment API Route', () => { }), }), }) - // Mock normalized table queries (blocks, edges, subflows) .mockReturnValueOnce({ from: vi.fn().mockReturnValue({ where: vi.fn().mockResolvedValue([ diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.ts b/apps/sim/app/api/workflows/[id]/deploy/route.ts index afd03d5780..449e86b708 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.ts @@ -32,7 +32,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ isDeployed: workflow.isDeployed, deployedAt: workflow.deployedAt, userId: workflow.userId, - state: workflow.state, deployedState: workflow.deployedState, }) .from(workflow) @@ -93,11 +92,25 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ // Check if the workflow has meaningful changes that would require redeployment let needsRedeployment = false if (workflowData.deployedState) { - const { hasWorkflowChanged } = await import('@/lib/workflows/utils') - needsRedeployment = hasWorkflowChanged( - workflowData.state as any, - workflowData.deployedState as any - ) + // Load current state from normalized tables for comparison + const { loadWorkflowFromNormalizedTables } = await import('@/lib/workflows/db-helpers') + const normalizedData = await loadWorkflowFromNormalizedTables(id) + + if (normalizedData) { + // Convert normalized data to WorkflowState format for comparison + const currentState = { + blocks: normalizedData.blocks, + edges: normalizedData.edges, + loops: normalizedData.loops, + parallels: normalizedData.parallels, + } + + const { hasWorkflowChanged } = await import('@/lib/workflows/utils') + needsRedeployment = hasWorkflowChanged( + currentState as any, + workflowData.deployedState as any + ) + } } logger.info(`[${requestId}] Successfully retrieved deployment info: ${id}`) @@ -126,11 +139,10 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ return createErrorResponse(validation.error.message, validation.error.status) } - // Get the workflow to find the user + // Get the workflow to find the user (removed deprecated state column) const workflowData = await db .select({ userId: workflow.userId, - state: workflow.state, }) .from(workflow) .where(eq(workflow.id, id)) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.test.ts index 7b673364af..07805d36b5 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.test.ts @@ -24,45 +24,54 @@ describe('Workflow Execution API Route', () => { beforeEach(() => { vi.resetModules() - // Mock workflow middleware vi.doMock('@/app/api/workflows/middleware', () => ({ validateWorkflowAccess: vi.fn().mockResolvedValue({ workflow: { id: 'workflow-id', userId: 'user-id', - state: { - blocks: { - 'starter-id': { - id: 'starter-id', - type: 'starter', - name: 'Start', - position: { x: 100, y: 100 }, - enabled: true, - }, - 'agent-id': { - id: 'agent-id', - type: 'agent', - name: 'Agent', - position: { x: 300, y: 100 }, - enabled: true, - }, - }, - edges: [ - { - id: 'edge-1', - source: 'starter-id', - target: 'agent-id', - sourceHandle: 'source', - targetHandle: 'target', - }, - ], - loops: {}, + }, + }), + })) + + vi.doMock('@/lib/workflows/db-helpers', () => ({ + loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + blocks: { + 'starter-id': { + id: 'starter-id', + type: 'starter', + name: 'Start', + position: { x: 100, y: 100 }, + enabled: true, + subBlocks: {}, + outputs: {}, + data: {}, + }, + 'agent-id': { + id: 'agent-id', + type: 'agent', + name: 'Agent', + position: { x: 300, y: 100 }, + enabled: true, + subBlocks: {}, + outputs: {}, + data: {}, }, }, + edges: [ + { + id: 'edge-1', + source: 'starter-id', + target: 'agent-id', + sourceHandle: 'source', + targetHandle: 'target', + }, + ], + loops: {}, + parallels: {}, + isFromNormalizedTables: true, }), })) - // Reset execute mock to track calls executeMock = vi.fn().mockResolvedValue({ success: true, output: { @@ -76,14 +85,12 @@ describe('Workflow Execution API Route', () => { }, }) - // Mock executor vi.doMock('@/executor', () => ({ Executor: vi.fn().mockImplementation(() => ({ execute: executeMock, })), })) - // Mock environment variables vi.doMock('@/lib/utils', () => ({ decryptSecret: vi.fn().mockResolvedValue({ decrypted: 'decrypted-secret-value', @@ -92,13 +99,11 @@ describe('Workflow Execution API Route', () => { getRotatingApiKey: vi.fn().mockReturnValue('rotated-api-key'), })) - // Mock logger vi.doMock('@/lib/logs/execution-logger', () => ({ persistExecutionLogs: vi.fn().mockResolvedValue(undefined), persistExecutionError: vi.fn().mockResolvedValue(undefined), })) - // Mock trace spans vi.doMock('@/lib/logs/trace-spans', () => ({ buildTraceSpans: vi.fn().mockReturnValue({ traceSpans: [], @@ -106,13 +111,11 @@ describe('Workflow Execution API Route', () => { }), })) - // Mock workflow run counts vi.doMock('@/lib/workflows/utils', () => ({ updateWorkflowRunCounts: vi.fn().mockResolvedValue(undefined), workflowHasResponseBlock: vi.fn().mockReturnValue(false), })) - // Mock database vi.doMock('@/db', () => { const mockDb = { select: vi.fn().mockImplementation(() => ({ @@ -140,7 +143,6 @@ describe('Workflow Execution API Route', () => { return { db: mockDb } }) - // Mock Serializer vi.doMock('@/serializer', () => ({ Serializer: vi.fn().mockImplementation(() => ({ serializeWorkflow: vi.fn().mockReturnValue({ @@ -162,49 +164,37 @@ describe('Workflow Execution API Route', () => { * Simulates direct execution with URL-based parameters */ it('should execute workflow with GET request successfully', async () => { - // Create a mock request with query parameters const req = createMockRequest('GET') - // Create params similar to what Next.js would provide const params = Promise.resolve({ id: 'workflow-id' }) - // Import the handler after mocks are set up const { GET } = await import('./route') - // Call the handler const response = await GET(req, { params }) - // Get the actual status code - in some implementations this might not be 200 - // Based on the current implementation, validate the response exists expect(response).toBeDefined() - // Try to parse the response body let data try { data = await response.json() } catch (e) { - // If we can't parse JSON, the response may not be what we expect console.error('Response could not be parsed as JSON:', await response.text()) throw e } - // If status is 200, verify success structure if (response.status === 200) { expect(data).toHaveProperty('success', true) expect(data).toHaveProperty('output') expect(data.output).toHaveProperty('response') } - // Verify middleware was called const validateWorkflowAccess = (await import('@/app/api/workflows/middleware')) .validateWorkflowAccess expect(validateWorkflowAccess).toHaveBeenCalledWith(expect.any(Object), 'workflow-id') - // Verify executor was initialized const Executor = (await import('@/executor')).Executor expect(Executor).toHaveBeenCalled() - // Verify execute was called with undefined input (GET requests don't have body) expect(executeMock).toHaveBeenCalledWith('workflow-id') }) @@ -213,59 +203,45 @@ describe('Workflow Execution API Route', () => { * Simulates execution with a JSON body containing parameters */ it('should execute workflow with POST request successfully', async () => { - // Create request body with custom inputs const requestBody = { inputs: { message: 'Test input message', }, } - // Create a mock request with the request body const req = createMockRequest('POST', requestBody) - // Create params similar to what Next.js would provide const params = Promise.resolve({ id: 'workflow-id' }) - // Import the handler after mocks are set up const { POST } = await import('./route') - // Call the handler const response = await POST(req, { params }) - // Ensure response exists expect(response).toBeDefined() - // Try to parse the response body let data try { data = await response.json() } catch (e) { - // If we can't parse JSON, the response may not be what we expect console.error('Response could not be parsed as JSON:', await response.text()) throw e } - // If status is 200, verify success structure if (response.status === 200) { expect(data).toHaveProperty('success', true) expect(data).toHaveProperty('output') expect(data.output).toHaveProperty('response') } - // Verify middleware was called const validateWorkflowAccess = (await import('@/app/api/workflows/middleware')) .validateWorkflowAccess expect(validateWorkflowAccess).toHaveBeenCalledWith(expect.any(Object), 'workflow-id') - // Verify executor was constructed const Executor = (await import('@/executor')).Executor expect(Executor).toHaveBeenCalled() - // Verify execute was called with the input body expect(executeMock).toHaveBeenCalledWith('workflow-id') - // Updated expectations to match actual implementation - // The structure should match: serializedWorkflow, processedBlockStates, decryptedEnvVars, processedInput, workflowVariables expect(Executor).toHaveBeenCalledWith( expect.anything(), // serializedWorkflow expect.anything(), // processedBlockStates @@ -282,7 +258,6 @@ describe('Workflow Execution API Route', () => { * Test POST execution with structured input matching the input format */ it('should execute workflow with structured input matching the input format', async () => { - // Create structured input matching the expected input format const structuredInput = { firstName: 'John', age: 30, @@ -291,27 +266,20 @@ describe('Workflow Execution API Route', () => { tags: ['test', 'api'], } - // Create a mock request with the structured input const req = createMockRequest('POST', structuredInput) - // Create params similar to what Next.js would provide const params = Promise.resolve({ id: 'workflow-id' }) - // Import the handler after mocks are set up const { POST } = await import('./route') - // Call the handler const response = await POST(req, { params }) - // Ensure response exists and is successful expect(response).toBeDefined() expect(response.status).toBe(200) - // Parse the response body const data = await response.json() expect(data).toHaveProperty('success', true) - // Verify the executor was constructed with the structured input - updated to match implementation const Executor = (await import('@/executor')).Executor expect(Executor).toHaveBeenCalledWith( expect.anything(), // serializedWorkflow @@ -478,39 +446,51 @@ describe('Workflow Execution API Route', () => { workflow: { id: 'workflow-with-vars-id', userId: 'user-id', - state: { - blocks: { - 'starter-id': { - id: 'starter-id', - type: 'starter', - name: 'Start', - position: { x: 100, y: 100 }, - enabled: true, - }, - 'agent-id': { - id: 'agent-id', - type: 'agent', - name: 'Agent', - position: { x: 300, y: 100 }, - enabled: true, - }, - }, - edges: [ - { - id: 'edge-1', - source: 'starter-id', - target: 'agent-id', - sourceHandle: 'source', - targetHandle: 'target', - }, - ], - loops: {}, - }, variables: workflowVariables, }, }), })) + // Mock normalized tables helper for this specific test + vi.doMock('@/lib/workflows/db-helpers', () => ({ + loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ + blocks: { + 'starter-id': { + id: 'starter-id', + type: 'starter', + name: 'Start', + position: { x: 100, y: 100 }, + enabled: true, + subBlocks: {}, + outputs: {}, + data: {}, + }, + 'agent-id': { + id: 'agent-id', + type: 'agent', + name: 'Agent', + position: { x: 300, y: 100 }, + enabled: true, + subBlocks: {}, + outputs: {}, + data: {}, + }, + }, + edges: [ + { + id: 'edge-1', + source: 'starter-id', + target: 'agent-id', + sourceHandle: 'source', + targetHandle: 'target', + }, + ], + loops: {}, + parallels: {}, + isFromNormalizedTables: true, + }), + })) + // Create a constructor mock to capture the arguments const executorConstructorMock = vi.fn().mockImplementation(() => ({ execute: vi.fn().mockResolvedValue({ diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index daab404755..5c8879a07e 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -7,6 +7,7 @@ import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/executio import { buildTraceSpans } from '@/lib/logs/trace-spans' import { checkServerSideUsageLimits } from '@/lib/usage-monitor' import { decryptSecret } from '@/lib/utils' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' import { createHttpResponseFromBlock, updateWorkflowRunCounts, @@ -94,19 +95,34 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any) { runningExecutions.add(executionKey) logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`) - // Use the deployed state if available, otherwise fall back to current state - const workflowState = workflow.deployedState || workflow.state + // Load workflow data from normalized tables + logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`) + const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) - if (!workflow.deployedState) { + let blocks: Record + let edges: any[] + let loops: Record + let parallels: Record + + if (normalizedData) { + // Use normalized data as primary source + ;({ blocks, edges, loops, parallels } = normalizedData) + logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`) + } else { + // Fallback to deployed state if available (for legacy workflows) logger.warn( - `[${requestId}] No deployed state found for workflow: ${workflowId}, using current state` + `[${requestId}] No normalized data found, falling back to deployed state for workflow: ${workflowId}` ) - } else { - logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`) - } - const state = workflowState as WorkflowState - const { blocks, edges, loops, parallels } = state + if (!workflow.deployedState) { + throw new Error( + `Workflow ${workflowId} has no deployed state and no normalized data available` + ) + } + + const deployedState = workflow.deployedState as WorkflowState + ;({ blocks, edges, loops, parallels } = deployedState) + } // Use the same execution flow as in scheduled executions const mergedStates = mergeSubblockState(blocks) diff --git a/apps/sim/db/schema.ts b/apps/sim/db/schema.ts index a3040709e7..1545c92580 100644 --- a/apps/sim/db/schema.ts +++ b/apps/sim/db/schema.ts @@ -116,6 +116,7 @@ export const workflow = pgTable('workflow', { folderId: text('folder_id').references(() => workflowFolder.id, { onDelete: 'set null' }), name: text('name').notNull(), description: text('description'), + // DEPRECATED: Use normalized tables (workflow_blocks, workflow_edges, workflow_subflows) instead state: json('state').notNull(), color: text('color').notNull().default('#3972F6'), lastSynced: timestamp('last_synced').notNull(), diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index 3aa0cca268..3c3f024145 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -145,7 +145,7 @@ export class WorkflowBlockHandler implements BlockHandler { logger.info(`Loaded child workflow: ${workflowData.name} (${workflowId})`) - // Extract the workflow state + // Extract the workflow state (API returns normalized data in state field) const workflowState = workflowData.state if (!workflowState || !workflowState.blocks) { @@ -153,7 +153,7 @@ export class WorkflowBlockHandler implements BlockHandler { return null } - // Use blocks directly since DB format should match UI format + // Use blocks directly since API returns data from normalized tables const serializedWorkflow = this.serializer.serializeWorkflow( workflowState.blocks, workflowState.edges || [], diff --git a/apps/sim/lib/webhooks/utils.ts b/apps/sim/lib/webhooks/utils.ts index 8d1c0338ab..8606d662b0 100644 --- a/apps/sim/lib/webhooks/utils.ts +++ b/apps/sim/lib/webhooks/utils.ts @@ -6,6 +6,7 @@ import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/executio import { buildTraceSpans } from '@/lib/logs/trace-spans' import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis' import { decryptSecret } from '@/lib/utils' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { getOAuthToken } from '@/app/api/auth/oauth/utils' import { db } from '@/db' @@ -13,7 +14,6 @@ import { environment, userStats, webhook } from '@/db/schema' import { Executor } from '@/executor' import { Serializer } from '@/serializer' import { mergeSubblockStateAsync } from '@/stores/workflows/server-utils' -import type { WorkflowState } from '@/stores/workflows/workflow/types' const logger = createLogger('WebhookUtils') @@ -475,23 +475,28 @@ export async function executeWorkflowFromPayload( // Returns void as errors are handled internally try { - // Get the workflow state - if (!foundWorkflow.state) { - logger.error(`[${requestId}] TRACE: Missing workflow state`, { + // Load workflow data from normalized tables + logger.debug(`[${requestId}] Loading workflow ${foundWorkflow.id} from normalized tables`) + const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id) + + if (!normalizedData) { + logger.error(`[${requestId}] TRACE: No normalized data found for workflow`, { workflowId: foundWorkflow.id, - hasState: false, + hasNormalizedData: false, }) - throw new Error(`Workflow ${foundWorkflow.id} has no state`) + throw new Error(`Workflow ${foundWorkflow.id} data not found in normalized tables`) } - const state = foundWorkflow.state as WorkflowState - const { blocks, edges, loops, parallels } = state + + // Use normalized data for execution + const { blocks, edges, loops, parallels } = normalizedData + logger.info(`[${requestId}] Loaded workflow ${foundWorkflow.id} from normalized tables`) // DEBUG: Log state information - logger.debug(`[${requestId}] TRACE: Retrieved workflow state`, { + logger.debug(`[${requestId}] TRACE: Retrieved workflow state from normalized tables`, { workflowId: foundWorkflow.id, blockCount: Object.keys(blocks || {}).length, edgeCount: (edges || []).length, - loopCount: (loops || []).length, + loopCount: Object.keys(loops || {}).length, }) logger.debug( diff --git a/apps/sim/stores/workflows/registry/store.ts b/apps/sim/stores/workflows/registry/store.ts index 9237eee58d..5ffd0dd0ee 100644 --- a/apps/sim/stores/workflows/registry/store.ts +++ b/apps/sim/stores/workflows/registry/store.ts @@ -432,7 +432,7 @@ export const useWorkflowRegistry = create()( let workflowState: any if (workflowData?.state) { - // Use the state from the database + // API returns normalized data in state workflowState = { blocks: workflowData.state.blocks || {}, edges: workflowData.state.edges || [], @@ -448,9 +448,18 @@ export const useWorkflowRegistry = create()( history: { past: [], present: { - state: workflowData.state, + state: { + blocks: workflowData.state.blocks || {}, + edges: workflowData.state.edges || [], + loops: workflowData.state.loops || {}, + parallels: workflowData.state.parallels || {}, + isDeployed: workflowData.isDeployed || false, + deployedAt: workflowData.deployedAt + ? new Date(workflowData.deployedAt) + : undefined, + }, timestamp: Date.now(), - action: 'Loaded from database', + action: 'Loaded from database (normalized tables)', subblockValues: {}, }, future: [],