Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions apps/sim/app/api/chat/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,12 @@ export async function executeWorkflowForChat(

logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`)

// Find the workflow
// Find the workflow (deployedState is NOT deprecated - needed for chat execution)
const workflowResult = await db
.select({
state: workflow.state,
deployedState: workflow.deployedState,
isDeployed: workflow.isDeployed,
deployedState: workflow.deployedState,
variables: workflow.variables,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
Expand All @@ -308,9 +308,14 @@ export async function executeWorkflowForChat(
throw new Error('Workflow not available')
}

// Use deployed state for execution
const state = workflowResult[0].deployedState || workflowResult[0].state
const { blocks, edges, loops, parallels } = state as WorkflowState
// For chat execution, use ONLY the deployed state (no fallback)
if (!workflowResult[0].deployedState) {
throw new Error(`Workflow must be deployed to be available for chat`)
}

// Use deployed state for chat execution (this is the stable, deployed version)
const deployedState = workflowResult[0].deployedState as WorkflowState
const { blocks, edges, loops, parallels } = deployedState

// Prepare for execution, similar to use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
Expand Down Expand Up @@ -344,16 +349,13 @@ export async function executeWorkflowForChat(
logger.warn(`[${requestId}] Could not fetch environment variables:`, error)
}

// Get workflow variables
let workflowVariables = {}
try {
// The workflow state may contain variables
const workflowState = state as any
if (workflowState.variables) {
if (workflowResult[0].variables) {
workflowVariables =
typeof workflowState.variables === 'string'
? JSON.parse(workflowState.variables)
: workflowState.variables
typeof workflowResult[0].variables === 'string'
? JSON.parse(workflowResult[0].variables)
: workflowResult[0].variables
}
} catch (error) {
logger.warn(`[${requestId}] Could not parse workflow variables:`, error)
Expand Down
11 changes: 11 additions & 0 deletions apps/sim/app/api/schedules/execute/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ describe('Scheduled Workflow Execution API Route', () => {

mockExecutionDependencies()

// Mock the normalized tables helper
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: sampleWorkflowState.parallels || {},
isFromNormalizedTables: true,
}),
}))

vi.doMock('croner', () => ({
Cron: vi.fn().mockImplementation(() => ({
nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute
Expand Down
35 changes: 29 additions & 6 deletions apps/sim/app/api/schedules/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import {
} from '@/lib/schedules/utils'
import { checkServerSideUsageLimits } from '@/lib/usage-monitor'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment, userStats, workflow, workflowSchedule } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'

// Add dynamic export to prevent caching
export const dynamic = 'force-dynamic'
Expand Down Expand Up @@ -149,8 +149,27 @@ export async function GET(req: NextRequest) {
continue
}

const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops, parallels } = state
// Load workflow data from normalized tables (no fallback to deprecated state column)
logger.debug(
`[${requestId}] Loading workflow ${schedule.workflowId} from normalized tables`
)
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)

if (!normalizedData) {
logger.error(
`[${requestId}] No normalized data found for scheduled workflow ${schedule.workflowId}`
)
throw new Error(`Workflow data not found in normalized tables for ${schedule.workflowId}`)
}

// Use normalized data only
const blocks = normalizedData.blocks
const edges = normalizedData.edges
const loops = normalizedData.loops
const parallels = normalizedData.parallels
logger.info(
`[${requestId}] Loaded scheduled workflow ${schedule.workflowId} from normalized tables`
)

const mergedStates = mergeSubblockState(blocks)

Expand Down Expand Up @@ -405,9 +424,13 @@ export async function GET(req: NextRequest) {
.limit(1)

if (workflowRecord) {
const state = workflowRecord.state as WorkflowState
const { blocks } = state
nextRunAt = calculateNextRunTime(schedule, blocks)
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)

if (!normalizedData) {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
} else {
nextRunAt = calculateNextRunTime(schedule, normalizedData.blocks)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}
Expand Down
25 changes: 11 additions & 14 deletions apps/sim/app/api/webhooks/trigger/[path]/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ import { NextRequest } from 'next/server'
* @vitest-environment node
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
createMockRequest,
mockExecutionDependencies,
sampleWorkflowState,
} from '@/app/api/__test-utils__/utils'
import { createMockRequest, mockExecutionDependencies } from '@/app/api/__test-utils__/utils'

// Define mock functions at the top level to be used in mocks
const hasProcessedMessageMock = vi.fn().mockResolvedValue(false)
Expand Down Expand Up @@ -148,23 +144,29 @@ describe('Webhook Trigger API Route', () => {
vi.resetAllMocks()
vi.clearAllTimers()

// Mock all dependencies
mockExecutionDependencies()

// Reset mock behaviors to default for each test
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {},
edges: [],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))

hasProcessedMessageMock.mockResolvedValue(false)
markMessageAsProcessedMock.mockResolvedValue(true)
acquireLockMock.mockResolvedValue(true)
handleWhatsAppVerificationMock.mockResolvedValue(null)
processGenericDeduplicationMock.mockResolvedValue(null)
processWebhookMock.mockResolvedValue(new Response('Webhook processed', { status: 200 }))

// Restore original crypto.randomUUID if it was mocked
if ((global as any).crypto?.randomUUID) {
vi.spyOn(crypto, 'randomUUID').mockRestore()
}

// Mock crypto.randomUUID to return predictable values
vi.spyOn(crypto, 'randomUUID').mockReturnValue('mock-uuid-12345')
})

Expand Down Expand Up @@ -263,7 +265,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
Expand Down Expand Up @@ -355,7 +356,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
Expand Down Expand Up @@ -409,7 +409,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
Expand Down Expand Up @@ -482,7 +481,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
Expand Down Expand Up @@ -553,7 +551,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
Expand Down
19 changes: 19 additions & 0 deletions apps/sim/app/api/webhooks/trigger/[path]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
processWebhook,
processWhatsAppDeduplication,
} from '@/lib/webhooks/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { webhook, workflow } from '@/db/schema'

Expand Down Expand Up @@ -187,6 +188,24 @@ export async function POST(
foundWebhook = webhooks[0].webhook
foundWorkflow = webhooks[0].workflow

const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id)

if (!normalizedData) {
logger.error(`[${requestId}] No normalized data found for webhook workflow ${foundWorkflow.id}`)
return new NextResponse('Workflow data not found in normalized tables', { status: 500 })
}

// Construct state from normalized data only (execution-focused, no frontend state fields)
foundWorkflow.state = {
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
lastSaved: Date.now(),
isDeployed: foundWorkflow.isDeployed || false,
deployedAt: foundWorkflow.deployedAt,
}

// Special handling for Telegram webhooks to work around middleware User-Agent checks
if (foundWebhook.provider === 'telegram') {
// Log detailed information about the request for debugging
Expand Down
24 changes: 22 additions & 2 deletions apps/sim/app/api/workflows/[id]/deploy/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ describe('Workflow Deployment API Route', () => {
}),
}))

vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {
'block-1': {
id: 'block-1',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
},
edges: [],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))

vi.doMock('../../middleware', () => ({
validateWorkflowAccess: vi.fn().mockResolvedValue({
workflow: {
Expand Down Expand Up @@ -74,6 +95,7 @@ describe('Workflow Deployment API Route', () => {
isDeployed: false,
deployedAt: null,
userId: 'user-id',
deployedState: null,
},
]),
}),
Expand Down Expand Up @@ -129,7 +151,6 @@ describe('Workflow Deployment API Route', () => {
}),
}),
})
// Mock normalized table queries (blocks, edges, subflows)
.mockReturnValueOnce({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([
Expand Down Expand Up @@ -216,7 +237,6 @@ describe('Workflow Deployment API Route', () => {
}),
}),
})
// Mock normalized table queries (blocks, edges, subflows)
.mockReturnValueOnce({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([
Expand Down
28 changes: 20 additions & 8 deletions apps/sim/app/api/workflows/[id]/deploy/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
isDeployed: workflow.isDeployed,
deployedAt: workflow.deployedAt,
userId: workflow.userId,
state: workflow.state,
deployedState: workflow.deployedState,
})
.from(workflow)
Expand Down Expand Up @@ -93,11 +92,25 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
// Check if the workflow has meaningful changes that would require redeployment
let needsRedeployment = false
if (workflowData.deployedState) {
const { hasWorkflowChanged } = await import('@/lib/workflows/utils')
needsRedeployment = hasWorkflowChanged(
workflowData.state as any,
workflowData.deployedState as any
)
// Load current state from normalized tables for comparison
const { loadWorkflowFromNormalizedTables } = await import('@/lib/workflows/db-helpers')
const normalizedData = await loadWorkflowFromNormalizedTables(id)

if (normalizedData) {
// Convert normalized data to WorkflowState format for comparison
const currentState = {
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
}

const { hasWorkflowChanged } = await import('@/lib/workflows/utils')
needsRedeployment = hasWorkflowChanged(
currentState as any,
workflowData.deployedState as any
)
}
}

logger.info(`[${requestId}] Successfully retrieved deployment info: ${id}`)
Expand Down Expand Up @@ -126,11 +139,10 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return createErrorResponse(validation.error.message, validation.error.status)
}

// Get the workflow to find the user
// Get the workflow to find the user (removed deprecated state column)
const workflowData = await db
.select({
userId: workflow.userId,
state: workflow.state,
})
.from(workflow)
.where(eq(workflow.id, id))
Expand Down
Loading
Loading