Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'

const logger = createLogger('CleanupStaleExecutions')

const STALE_THRESHOLD_MINUTES = 30

export async function GET(request: NextRequest) {
try {
const authError = verifyCronAuth(request, 'Stale execution cleanup')
if (authError) {
return authError
}

logger.info('Starting stale execution cleanup job')

const staleThreshold = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000)

const staleExecutions = await db
.select({
id: workflowExecutionLogs.id,
executionId: workflowExecutionLogs.executionId,
workflowId: workflowExecutionLogs.workflowId,
startedAt: workflowExecutionLogs.startedAt,
})
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.status, 'running'),
lt(workflowExecutionLogs.startedAt, staleThreshold)
)
)
.limit(100)

logger.info(`Found ${staleExecutions.length} stale executions to clean up`)

let cleaned = 0
let failed = 0

for (const execution of staleExecutions) {
try {
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
const staleDurationMinutes = Math.round(staleDurationMs / 60000)

await db
.update(workflowExecutionLogs)
.set({
status: 'failed',
endedAt: new Date(),
totalDurationMs: staleDurationMs,
executionData: sql`jsonb_set(
COALESCE(execution_data, '{}'::jsonb),
ARRAY['error'],
to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text)
)`,
})
.where(eq(workflowExecutionLogs.id, execution.id))

logger.info(`Cleaned up stale execution ${execution.executionId}`, {
workflowId: execution.workflowId,
staleDurationMinutes,
})

cleaned++
} catch (error) {
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
error: error instanceof Error ? error.message : String(error),
})
failed++
}
}

logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)

return NextResponse.json({
success: true,
found: staleExecutions.length,
cleaned,
failed,
thresholdMinutes: STALE_THRESHOLD_MINUTES,
})
} catch (error) {
logger.error('Error in stale execution cleanup job:', error)
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
41 changes: 22 additions & 19 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
import { normalizeName } from '@/executor/constants'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
import type { StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { CORE_TRIGGER_TYPES } from '@/stores/logs/filters/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'

const logger = createLogger('WorkflowExecuteAPI')

Expand Down Expand Up @@ -541,11 +541,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockId: string,
blockName: string,
blockType: string,
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => {
logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType })
sendEvent({
Expand All @@ -571,11 +567,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
blockName: string,
blockType: string,
callbackData: any,
iterationContext?: {
iterationCurrent: number
iterationTotal: number
iterationType: SubflowType
}
iterationContext?: IterationContext
) => {
const hasError = callbackData.output?.error

Expand Down Expand Up @@ -713,14 +705,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
Expand Down
28 changes: 20 additions & 8 deletions apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
getSubBlockValue,
} from '@/lib/workflows/schedules/utils'
import { REFERENCE } from '@/executor/constants'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
Expand Down Expand Up @@ -285,14 +286,25 @@ async function runWorkflowExecution({
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
Expand Down
53 changes: 38 additions & 15 deletions apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import {
loadWorkflowFromNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
Expand Down Expand Up @@ -268,14 +269,25 @@ async function executeWebhookJobInternal(
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
Expand Down Expand Up @@ -509,14 +521,25 @@ async function executeWebhookJobInternal(
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId: payload.workflowId,
executionId,
pausePoints: executionResult.pausePoints || [],
snapshotSeed: executionResult.snapshotSeed,
executorUserId: executionResult.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
Expand Down
28 changes: 20 additions & 8 deletions apps/sim/background/workflow-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import { getWorkflowById } from '@/lib/workflows/utils'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata } from '@/executor/execution/types'
import type { ExecutionResult } from '@/executor/types'

const logger = createLogger('TriggerWorkflowExecution')
Expand Down Expand Up @@ -112,14 +113,25 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
executionId,
})
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
} else {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
try {
await PauseResumeManager.persistPauseResult({
workflowId,
executionId,
pausePoints: result.pausePoints || [],
snapshotSeed: result.snapshotSeed,
executorUserId: result.metadata?.userId,
})
} catch (pauseError) {
logger.error(`[${requestId}] Failed to persist pause result`, {
executionId,
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
})
await loggingSession.markAsFailed(
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
)
}
}
} else {
await PauseResumeManager.processQueuedResumes(executionId)
Expand Down
7 changes: 2 additions & 5 deletions apps/sim/executor/execution/snapshot-serializer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import type { DAG } from '@/executor/dag/builder'
import {
type ExecutionMetadata,
ExecutionSnapshot,
type SerializableExecutionState,
} from '@/executor/execution/snapshot'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext, SerializedSnapshot } from '@/executor/types'

function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {
Expand Down
57 changes: 1 addition & 56 deletions apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,4 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState } from '@/executor/types'

export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string
triggerType: string
triggerBlockId?: string
useDraftState: boolean
startTime: string
isClientSession?: boolean
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
workflowStateOverride?: {
blocks: Record<string, any>
edges: Edge[]
loops?: Record<string, any>
parallels?: Record<string, any>
deploymentVersionId?: string // ID of deployment version if this is deployed state
}
}

export interface ExecutionCallbacks {
onStream?: (streamingExec: any) => Promise<void>
onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any
) => Promise<void>
}

export interface SerializableExecutionState {
blockStates: Record<string, BlockState>
executedBlocks: string[]
blockLogs: BlockLog[]
decisions: {
router: Record<string, string>
condition: Record<string, string>
}
completedLoops: string[]
loopExecutions?: Record<string, any>
parallelExecutions?: Record<string, any>
parallelBlockMapping?: Record<string, any>
activeExecutionPath: string[]
pendingQueue?: string[]
remainingEdges?: Edge[]
dagIncomingEdges?: Record<string, string[]>
completedPauseContexts?: string[]
}
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'

export class ExecutionSnapshot {
constructor(
Expand Down
Loading