diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index e9a9023162..0ac192b6a5 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -348,6 +348,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: workspaceId: workflow.workspaceId, }) + let cachedWorkflowData: { + blocks: Record + edges: any[] + loops: Record + parallels: Record + } | null = null + let processedInput = input try { const workflowData = shouldUseDraftState @@ -355,6 +362,13 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: : await loadDeployedWorkflowState(workflowId) if (workflowData) { + cachedWorkflowData = { + blocks: workflowData.blocks, + edges: workflowData.edges, + loops: workflowData.loops || {}, + parallels: workflowData.parallels || {}, + } + const serializedWorkflow = new Serializer().serializeWorkflow( workflowData.blocks, workflowData.edges, @@ -402,6 +416,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: ) } + const effectiveWorkflowStateOverride = workflowStateOverride || cachedWorkflowData || undefined + if (!enableSSE) { logger.info(`[${requestId}] Using non-SSE execution (direct JSON response)`) try { @@ -414,7 +430,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: triggerType, useDraftState: shouldUseDraftState, startTime: new Date().toISOString(), - workflowStateOverride, + workflowStateOverride: effectiveWorkflowStateOverride, } const snapshot = new ExecutionSnapshot( @@ -479,8 +495,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`) } else { logger.info(`[${requestId}] Using streaming API response`) - const deployedData = await loadDeployedWorkflowState(workflowId) - const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {}) + + const resolvedSelectedOutputs = resolveOutputIds( + selectedOutputs, + cachedWorkflowData?.blocks || {} + ) const stream = await createStreamingResponse({ requestId, workflow: { @@ -677,7 +696,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: triggerType, useDraftState: shouldUseDraftState, startTime: new Date().toISOString(), - workflowStateOverride, + workflowStateOverride: effectiveWorkflowStateOverride, } const snapshot = new ExecutionSnapshot( diff --git a/apps/sim/lib/environment/utils.ts b/apps/sim/lib/environment/utils.ts index d2c1477e5e..74e25f59fd 100644 --- a/apps/sim/lib/environment/utils.ts +++ b/apps/sim/lib/environment/utils.ts @@ -67,16 +67,18 @@ export async function getPersonalAndWorkspaceEnv( const workspaceEncrypted: Record = (workspaceRows[0]?.variables as any) || {} const decryptAll = async (src: Record) => { - const out: Record = {} - for (const [k, v] of Object.entries(src)) { - try { - const { decrypted } = await decryptSecret(v) - out[k] = decrypted - } catch { - out[k] = '' - } - } - return out + const entries = Object.entries(src) + const results = await Promise.all( + entries.map(async ([k, v]) => { + try { + const { decrypted } = await decryptSecret(v) + return [k, decrypted] as const + } catch { + return [k, ''] as const + } + }) + ) + return Object.fromEntries(results) } const [personalDecrypted, workspaceDecrypted] = await Promise.all([ diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 6da2316490..e018b2d9bc 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -9,7 +9,6 @@ import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' import { createLogger } from '@/lib/logs/console/logger' import type { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' -import { decryptSecret } from '@/lib/utils' import { loadDeployedWorkflowState, loadWorkflowFromNormalizedTables, @@ -153,13 +152,15 @@ export async function executeWorkflowCore( // Merge block states const mergedStates = mergeSubblockState(blocks) - // Get and decrypt environment variables - const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( - userId, - providedWorkspaceId - ) + const { personalEncrypted, workspaceEncrypted, personalDecrypted, workspaceDecrypted } = + await getPersonalAndWorkspaceEnv(userId, providedWorkspaceId) + + // Use encrypted values for logging (don't log decrypted secrets) const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted }) + // Use already-decrypted values for execution (no redundant decryption) + const decryptedEnvVars: Record = { ...personalDecrypted, ...workspaceDecrypted } + await loggingSession.safeStart({ userId, workspaceId: providedWorkspaceId, @@ -167,13 +168,11 @@ export async function executeWorkflowCore( skipLogCreation, // Skip if resuming an existing execution }) - // Process block states with env var substitution - const currentBlockStates = await Object.entries(mergedStates).reduce( - async (accPromise, [id, block]) => { - const acc = await accPromise - acc[id] = await Object.entries(block.subBlocks).reduce( - async (subAccPromise, [key, subBlock]) => { - const subAcc = await subAccPromise + // Process block states with env var substitution using pre-decrypted values + const currentBlockStates = Object.entries(mergedStates).reduce( + (acc, [id, block]) => { + acc[id] = Object.entries(block.subBlocks).reduce( + (subAcc, [key, subBlock]) => { let value = subBlock.value if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) { @@ -181,10 +180,9 @@ export async function executeWorkflowCore( if (matches) { for (const match of matches) { const varName = match.slice(2, -2) - const encryptedValue = variables[varName] - if (encryptedValue) { - const { decrypted } = await decryptSecret(encryptedValue) - value = (value as string).replace(match, decrypted) + const decryptedValue = decryptedEnvVars[varName] + if (decryptedValue !== undefined) { + value = (value as string).replace(match, decryptedValue) } } } @@ -193,20 +191,13 @@ export async function executeWorkflowCore( subAcc[key] = value return subAcc }, - Promise.resolve({} as Record) + {} as Record ) return acc }, - Promise.resolve({} as Record>) + {} as Record> ) - // Decrypt all env vars - const decryptedEnvVars: Record = {} - for (const [key, encryptedValue] of Object.entries(variables)) { - const { decrypted } = await decryptSecret(encryptedValue) - decryptedEnvVars[key] = decrypted - } - // Process response format const processedBlockStates = Object.entries(currentBlockStates).reduce( (acc, [blockId, blockState]) => {