Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion apps/sim/app/api/chat/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { hasAdminPermission } from '@/lib/permissions/utils'
import { processStreamingBlockLogs } from '@/lib/tokenization'
import { getEmailDomain } from '@/lib/urls/utils'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { TriggerUtils } from '@/lib/workflows/triggers'
import { getBlock } from '@/blocks'
import { db } from '@/db'
import { chat, userStats, workflow } from '@/db/schema'
Expand Down Expand Up @@ -613,9 +614,29 @@ export async function executeWorkflowForChat(
// Set up logging on the executor
loggingSession.setupExecutor(executor)

// Determine the start block for chat execution
const startBlock = TriggerUtils.findStartBlock(mergedStates, 'chat')

if (!startBlock) {
const errorMessage =
'No Chat trigger configured for this workflow. Add a Chat Trigger block to enable chat execution.'
logger.error(`[${requestId}] ${errorMessage}`)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: errorMessage,
stackTrace: undefined,
},
})
throw new Error(errorMessage)
}

const startBlockId = startBlock.blockId

let result
try {
result = await executor.execute(workflowId)
result = await executor.execute(workflowId, startBlockId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
await loggingSession.safeCompleteWithError({
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/app/api/workflows/[id]/execute/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ describe('Workflow Execution API Route', () => {
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalled()

expect(executeMock).toHaveBeenCalledWith('workflow-id')
expect(executeMock).toHaveBeenCalledWith('workflow-id', 'starter-id')
})

/**
Expand Down Expand Up @@ -337,7 +337,7 @@ describe('Workflow Execution API Route', () => {
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalled()

expect(executeMock).toHaveBeenCalledWith('workflow-id')
expect(executeMock).toHaveBeenCalledWith('workflow-id', 'starter-id')

expect(Executor).toHaveBeenCalledWith(
expect.objectContaining({
Expand Down
29 changes: 28 additions & 1 deletion apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { TriggerUtils } from '@/lib/workflows/triggers'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
Expand Down Expand Up @@ -272,6 +273,32 @@ async function executeWorkflow(
true // Enable validation during execution
)

// Determine API trigger start block
// Direct API execution ONLY works with API trigger blocks (or legacy starter in api/run mode)
const startBlock = TriggerUtils.findStartBlock(mergedStates, 'api', false) // isChildWorkflow = false

if (!startBlock) {
logger.error(`[${requestId}] No API trigger configured for this workflow`)
throw new Error(
'No API trigger configured for this workflow. Add an API Trigger block or use a Start block in API mode.'
)
}

const startBlockId = startBlock.blockId
const triggerBlock = startBlock.block

// Check if the API trigger has any outgoing connections (except for legacy starter blocks)
// Legacy starter blocks have their own validation in the executor
if (triggerBlock.type !== 'starter') {
const outgoingConnections = serializedWorkflow.connections.filter(
(conn) => conn.source === startBlockId
)
if (outgoingConnections.length === 0) {
logger.error(`[${requestId}] API trigger has no outgoing connections`)
throw new Error('API Trigger block must be connected to other blocks to execute')
}
}

const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
Expand All @@ -287,7 +314,7 @@ async function executeWorkflow(
// Set up logging on the executor
loggingSession.setupExecutor(executor)

const result = await executor.execute(workflowId)
const result = await executor.execute(workflowId, startBlockId)

// Check if we got a StreamingExecution result (with stream + execution properties)
// For API routes, we only care about the ExecutionResult part, not the stream
Expand Down
57 changes: 57 additions & 0 deletions apps/sim/app/api/workflows/[id]/yaml/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,58 @@ async function upsertCustomToolsFromBlocks(
}
}

/**
* Convert blocks with 'inputs' field to standard 'subBlocks' structure
* This handles trigger blocks that may come from YAML/copilot with legacy format
*/
function normalizeBlockStructure(blocks: Record<string, any>): Record<string, any> {
const normalizedBlocks: Record<string, any> = {}

for (const [blockId, block] of Object.entries(blocks)) {
const normalizedBlock = { ...block }

// Check if this is a trigger block with 'inputs' field
if (block.inputs && (
block.type === 'api_trigger' ||
block.type === 'input_trigger' ||
block.type === 'starter' ||
block.type === 'chat_trigger' ||
block.type === 'generic_webhook'
)) {
// Convert inputs.inputFormat to subBlocks.inputFormat
if (block.inputs.inputFormat) {
if (!normalizedBlock.subBlocks) {
normalizedBlock.subBlocks = {}
}

normalizedBlock.subBlocks.inputFormat = {
id: 'inputFormat',
type: 'input-format',
value: block.inputs.inputFormat
}
}

// Copy any other inputs fields to subBlocks
for (const [inputKey, inputValue] of Object.entries(block.inputs)) {
if (inputKey !== 'inputFormat' && !normalizedBlock.subBlocks[inputKey]) {
normalizedBlock.subBlocks[inputKey] = {
id: inputKey,
type: 'short-input', // Default type, may need adjustment based on actual field
value: inputValue
}
}
}

// Remove the inputs field after conversion
delete normalizedBlock.inputs
}

normalizedBlocks[blockId] = normalizedBlock
}

return normalizedBlocks
}

/**
* PUT /api/workflows/[id]/yaml
* Consolidated YAML workflow saving endpoint
Expand Down Expand Up @@ -344,6 +396,11 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
})
}

// Normalize blocks that use 'inputs' field to standard 'subBlocks' structure
if (workflowState.blocks) {
workflowState.blocks = normalizeBlockStructure(workflowState.blocks)
}

// Ensure all blocks have required fields
Object.values(workflowState.blocks).forEach((block: any) => {
if (block.enabled === undefined) {
Expand Down
141 changes: 20 additions & 121 deletions apps/sim/app/api/workflows/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import { generateRequestId } from '@/lib/utils'
import { db } from '@/db'
import { workflow, workflowBlocks, workspace } from '@/db/schema'
import { workflow, workspace } from '@/db/schema'
import { verifyWorkspaceMembership } from './utils'

const logger = createLogger('WorkflowAPI')
Expand Down Expand Up @@ -95,132 +95,31 @@ export async function POST(req: NextRequest) {
const { name, description, color, workspaceId, folderId } = CreateWorkflowSchema.parse(body)

const workflowId = crypto.randomUUID()
const starterId = crypto.randomUUID()
const now = new Date()

logger.info(`[${requestId}] Creating workflow ${workflowId} for user ${session.user.id}`)

await db.transaction(async (tx) => {
await tx.insert(workflow).values({
id: workflowId,
userId: session.user.id,
workspaceId: workspaceId || null,
folderId: folderId || null,
name,
description,
color,
lastSynced: now,
createdAt: now,
updatedAt: now,
isDeployed: false,
collaborators: [],
runCount: 0,
variables: {},
isPublished: false,
marketplaceData: null,
})

await tx.insert(workflowBlocks).values({
id: starterId,
workflowId: workflowId,
type: 'starter',
name: 'Start',
positionX: '100',
positionY: '100',
enabled: true,
horizontalHandles: true,
isWide: false,
advancedMode: false,
triggerMode: false,
height: '95',
subBlocks: {
startWorkflow: {
id: 'startWorkflow',
type: 'dropdown',
value: 'manual',
},
webhookPath: {
id: 'webhookPath',
type: 'short-input',
value: '',
},
webhookSecret: {
id: 'webhookSecret',
type: 'short-input',
value: '',
},
scheduleType: {
id: 'scheduleType',
type: 'dropdown',
value: 'daily',
},
minutesInterval: {
id: 'minutesInterval',
type: 'short-input',
value: '',
},
minutesStartingAt: {
id: 'minutesStartingAt',
type: 'short-input',
value: '',
},
hourlyMinute: {
id: 'hourlyMinute',
type: 'short-input',
value: '',
},
dailyTime: {
id: 'dailyTime',
type: 'short-input',
value: '',
},
weeklyDay: {
id: 'weeklyDay',
type: 'dropdown',
value: 'MON',
},
weeklyDayTime: {
id: 'weeklyDayTime',
type: 'short-input',
value: '',
},
monthlyDay: {
id: 'monthlyDay',
type: 'short-input',
value: '',
},
monthlyTime: {
id: 'monthlyTime',
type: 'short-input',
value: '',
},
cronExpression: {
id: 'cronExpression',
type: 'short-input',
value: '',
},
timezone: {
id: 'timezone',
type: 'dropdown',
value: 'UTC',
},
},
outputs: {
response: {
type: {
input: 'any',
},
},
},
createdAt: now,
updatedAt: now,
})

logger.info(
`[${requestId}] Successfully created workflow ${workflowId} with start block in workflow_blocks table`
)
await db.insert(workflow).values({
id: workflowId,
userId: session.user.id,
workspaceId: workspaceId || null,
folderId: folderId || null,
name,
description,
color,
lastSynced: now,
createdAt: now,
updatedAt: now,
isDeployed: false,
collaborators: [],
runCount: 0,
variables: {},
isPublished: false,
marketplaceData: null,
})

logger.info(`[${requestId}] Successfully created empty workflow ${workflowId}`)

return NextResponse.json({
id: workflowId,
name,
Expand Down
Loading
Loading