diff --git a/apps/sim/app/api/schedules/execute/route.test.ts b/apps/sim/app/api/schedules/execute/route.test.ts index 1274d1aa11..0cb26a4bd1 100644 --- a/apps/sim/app/api/schedules/execute/route.test.ts +++ b/apps/sim/app/api/schedules/execute/route.test.ts @@ -3,81 +3,50 @@ * * @vitest-environment node */ +import type { NextRequest } from 'next/server' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import { - mockExecutionDependencies, - mockScheduleExecuteDb, - sampleWorkflowState, -} from '@/app/api/__test-utils__/utils' + +function createMockRequest(): NextRequest { + const mockHeaders = new Map([ + ['authorization', 'Bearer test-cron-secret'], + ['content-type', 'application/json'], + ]) + + return { + headers: { + get: (key: string) => mockHeaders.get(key.toLowerCase()) || null, + }, + url: 'http://localhost:3000/api/schedules/execute', + } as NextRequest +} describe('Scheduled Workflow Execution API Route', () => { beforeEach(() => { vi.clearAllMocks() + vi.resetModules() + }) - mockExecutionDependencies() - - // Mock all dependencies - vi.doMock('@/services/queue', () => ({ - RateLimiter: vi.fn().mockImplementation(() => ({ - checkRateLimitWithSubscription: vi.fn().mockResolvedValue({ - allowed: true, - remaining: 100, - resetAt: new Date(Date.now() + 60000), - }), - })), - })) - - vi.doMock('@/lib/billing', () => ({ - checkServerSideUsageLimits: vi.fn().mockResolvedValue({ isExceeded: false }), - })) - - vi.doMock('@/lib/billing/core/subscription', () => ({ - getHighestPrioritySubscription: vi.fn().mockResolvedValue({ - plan: 'pro', - status: 'active', - }), - })) - - vi.doMock('@/lib/environment/utils', () => ({ - getPersonalAndWorkspaceEnv: vi.fn().mockResolvedValue({ - personalEncrypted: {}, - workspaceEncrypted: {}, - }), - })) + afterEach(() => { + vi.clearAllMocks() + vi.resetModules() + }) - vi.doMock('@/lib/logs/execution/logging-session', () => ({ - LoggingSession: vi.fn().mockImplementation(() => ({ - safeStart: vi.fn().mockResolvedValue(undefined), - safeComplete: vi.fn().mockResolvedValue(undefined), - safeCompleteWithError: vi.fn().mockResolvedValue(undefined), - setupExecutor: vi.fn(), - })), - })) + it('should execute scheduled workflows with Trigger.dev disabled', async () => { + const mockExecuteScheduleJob = vi.fn().mockResolvedValue(undefined) - vi.doMock('@/lib/workflows/db-helpers', () => ({ - loadDeployedWorkflowState: vi.fn().mockResolvedValue({ - blocks: sampleWorkflowState.blocks, - edges: sampleWorkflowState.edges || [], - loops: sampleWorkflowState.loops || {}, - parallels: sampleWorkflowState.parallels || {}, - }), - loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({ - blocks: sampleWorkflowState.blocks, - edges: sampleWorkflowState.edges || [], - loops: sampleWorkflowState.loops || {}, - parallels: {}, - isFromNormalizedTables: true, - }), + vi.doMock('@/lib/auth/internal', () => ({ + verifyCronAuth: vi.fn().mockReturnValue(null), })) - vi.doMock('@/stores/workflows/server-utils', () => ({ - mergeSubblockState: vi.fn().mockReturnValue(sampleWorkflowState.blocks), + vi.doMock('@/background/schedule-execution', () => ({ + executeScheduleJob: mockExecuteScheduleJob, })) - vi.doMock('@/lib/schedules/utils', () => ({ - calculateNextRunTime: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), - getScheduleTimeValues: vi.fn().mockReturnValue({}), - getSubBlockValue: vi.fn().mockReturnValue('manual'), + vi.doMock('@/lib/env', () => ({ + env: { + TRIGGER_DEV_ENABLED: false, + }, + isTruthy: vi.fn(() => false), })) vi.doMock('drizzle-orm', () => ({ @@ -85,198 +54,209 @@ describe('Scheduled Workflow Execution API Route', () => { eq: vi.fn((field, value) => ({ field, value, type: 'eq' })), lte: vi.fn((field, value) => ({ field, value, type: 'lte' })), not: vi.fn((condition) => ({ type: 'not', condition })), - sql: vi.fn((strings, ...values) => ({ strings, values, type: 'sql' })), - })) - - vi.doMock('croner', () => ({ - Cron: vi.fn().mockImplementation(() => ({ - nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute - })), })) vi.doMock('@sim/db', () => { const mockDb = { select: vi.fn().mockImplementation(() => ({ - from: vi.fn().mockImplementation((_table: any) => ({ - where: vi.fn().mockImplementation((_cond: any) => ({ - limit: vi.fn().mockImplementation((n?: number) => { - // Always return empty array - no due schedules - return [] - }), - })), - })), - })), - update: vi.fn().mockImplementation(() => ({ - set: vi.fn().mockImplementation(() => ({ - where: vi.fn().mockResolvedValue([]), + from: vi.fn().mockImplementation(() => ({ + where: vi.fn().mockImplementation(() => [ + { + id: 'schedule-1', + workflowId: 'workflow-1', + blockId: null, + cronExpression: null, + lastRanAt: null, + failedCount: 0, + }, + ]), })), })), } return { db: mockDb, - userStats: { - userId: 'userId', - totalScheduledExecutions: 'totalScheduledExecutions', - lastActive: 'lastActive', - }, - workflow: { id: 'id', userId: 'userId', state: 'state' }, - workflowSchedule: { - id: 'id', - workflowId: 'workflowId', - nextRunAt: 'nextRunAt', - status: 'status', - }, + workflowSchedule: {}, } }) - }) - - afterEach(() => { - vi.clearAllMocks() - }) - - it('should execute scheduled workflows successfully', async () => { - const executeMock = vi.fn().mockResolvedValue({ - success: true, - output: { response: 'Scheduled execution completed' }, - logs: [], - metadata: { - duration: 100, - startTime: new Date().toISOString(), - endTime: new Date().toISOString(), - }, - }) - - vi.doMock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executeMock, - })), - })) const { GET } = await import('@/app/api/schedules/execute/route') - const response = await GET() - expect(response).toBeDefined() + const response = await GET(createMockRequest()) + expect(response).toBeDefined() + expect(response.status).toBe(200) const data = await response.json() expect(data).toHaveProperty('message') - expect(data).toHaveProperty('executedCount') + expect(data).toHaveProperty('executedCount', 1) }) - it('should handle errors during scheduled execution gracefully', async () => { - vi.doMock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: vi.fn().mockRejectedValue(new Error('Execution failed')), - })), + it('should queue schedules to Trigger.dev when enabled', async () => { + const mockTrigger = vi.fn().mockResolvedValue({ id: 'task-id-123' }) + + vi.doMock('@/lib/auth/internal', () => ({ + verifyCronAuth: vi.fn().mockReturnValue(null), })) - const { GET } = await import('@/app/api/schedules/execute/route') - const response = await GET() + vi.doMock('@trigger.dev/sdk', () => ({ + tasks: { + trigger: mockTrigger, + }, + })) - expect(response).toBeDefined() + vi.doMock('@/lib/env', () => ({ + env: { + TRIGGER_DEV_ENABLED: true, + }, + isTruthy: vi.fn(() => true), + })) - const data = await response.json() - expect(data).toHaveProperty('message') - }) + vi.doMock('drizzle-orm', () => ({ + and: vi.fn((...conditions) => ({ type: 'and', conditions })), + eq: vi.fn((field, value) => ({ field, value, type: 'eq' })), + lte: vi.fn((field, value) => ({ field, value, type: 'lte' })), + not: vi.fn((condition) => ({ type: 'not', condition })), + })) - it('should handle case with no due schedules', async () => { vi.doMock('@sim/db', () => { const mockDb = { select: vi.fn().mockImplementation(() => ({ from: vi.fn().mockImplementation(() => ({ - where: vi.fn().mockImplementation(() => ({ - limit: vi.fn().mockImplementation(() => []), - })), - })), - })), - update: vi.fn().mockImplementation(() => ({ - set: vi.fn().mockImplementation(() => ({ - where: vi.fn().mockResolvedValue([]), + where: vi.fn().mockImplementation(() => [ + { + id: 'schedule-1', + workflowId: 'workflow-1', + blockId: null, + cronExpression: null, + lastRanAt: null, + failedCount: 0, + }, + ]), })), })), } - return { db: mockDb } + return { + db: mockDb, + workflowSchedule: {}, + } }) const { GET } = await import('@/app/api/schedules/execute/route') - const response = await GET() + const response = await GET(createMockRequest()) + + expect(response).toBeDefined() expect(response.status).toBe(200) const data = await response.json() - expect(data).toHaveProperty('executedCount', 0) + expect(data).toHaveProperty('executedCount', 1) + }) - const executeMock = vi.fn() - vi.doMock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executeMock, - })), + it('should handle case with no due schedules', async () => { + vi.doMock('@/lib/auth/internal', () => ({ + verifyCronAuth: vi.fn().mockReturnValue(null), })) - expect(executeMock).not.toHaveBeenCalled() - }) - - // Removed: Test isolation issues with mocks make this unreliable + vi.doMock('@/background/schedule-execution', () => ({ + executeScheduleJob: vi.fn().mockResolvedValue(undefined), + })) - it('should execute schedules that are explicitly marked as active', async () => { - const executeMock = vi.fn().mockResolvedValue({ success: true, metadata: {} }) + vi.doMock('@/lib/env', () => ({ + env: { + TRIGGER_DEV_ENABLED: false, + }, + isTruthy: vi.fn(() => false), + })) - vi.doMock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executeMock, - })), + vi.doMock('drizzle-orm', () => ({ + and: vi.fn((...conditions) => ({ type: 'and', conditions })), + eq: vi.fn((field, value) => ({ field, value, type: 'eq' })), + lte: vi.fn((field, value) => ({ field, value, type: 'lte' })), + not: vi.fn((condition) => ({ type: 'not', condition })), })) - mockScheduleExecuteDb({ - schedules: [ - { - id: 'schedule-active', - workflowId: 'workflow-id', - userId: 'user-id', - status: 'active', - nextRunAt: new Date(Date.now() - 60_000), - lastRanAt: null, - cronExpression: null, - failedCount: 0, - }, - ], + vi.doMock('@sim/db', () => { + const mockDb = { + select: vi.fn().mockImplementation(() => ({ + from: vi.fn().mockImplementation(() => ({ + where: vi.fn().mockImplementation(() => []), + })), + })), + } + + return { + db: mockDb, + workflowSchedule: {}, + } }) const { GET } = await import('@/app/api/schedules/execute/route') - const response = await GET() + const response = await GET(createMockRequest()) expect(response.status).toBe(200) + const data = await response.json() + expect(data).toHaveProperty('message') + expect(data).toHaveProperty('executedCount', 0) }) - it('should not execute schedules that are disabled', async () => { - const executeMock = vi.fn() + it('should execute multiple schedules in parallel', async () => { + vi.doMock('@/lib/auth/internal', () => ({ + verifyCronAuth: vi.fn().mockReturnValue(null), + })) + + vi.doMock('@/background/schedule-execution', () => ({ + executeScheduleJob: vi.fn().mockResolvedValue(undefined), + })) + + vi.doMock('@/lib/env', () => ({ + env: { + TRIGGER_DEV_ENABLED: false, + }, + isTruthy: vi.fn(() => false), + })) - vi.doMock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executeMock, - })), + vi.doMock('drizzle-orm', () => ({ + and: vi.fn((...conditions) => ({ type: 'and', conditions })), + eq: vi.fn((field, value) => ({ field, value, type: 'eq' })), + lte: vi.fn((field, value) => ({ field, value, type: 'lte' })), + not: vi.fn((condition) => ({ type: 'not', condition })), })) - mockScheduleExecuteDb({ - schedules: [ - { - id: 'schedule-disabled', - workflowId: 'workflow-id', - userId: 'user-id', - status: 'disabled', - nextRunAt: new Date(Date.now() - 60_000), - lastRanAt: null, - cronExpression: null, - failedCount: 0, - }, - ], + vi.doMock('@sim/db', () => { + const mockDb = { + select: vi.fn().mockImplementation(() => ({ + from: vi.fn().mockImplementation(() => ({ + where: vi.fn().mockImplementation(() => [ + { + id: 'schedule-1', + workflowId: 'workflow-1', + blockId: null, + cronExpression: null, + lastRanAt: null, + failedCount: 0, + }, + { + id: 'schedule-2', + workflowId: 'workflow-2', + blockId: null, + cronExpression: null, + lastRanAt: null, + failedCount: 0, + }, + ]), + })), + })), + } + + return { + db: mockDb, + workflowSchedule: {}, + } }) const { GET } = await import('@/app/api/schedules/execute/route') - const response = await GET() + const response = await GET(createMockRequest()) expect(response.status).toBe(200) const data = await response.json() - expect(data).toHaveProperty('executedCount', 0) - - expect(executeMock).not.toHaveBeenCalled() + expect(data).toHaveProperty('executedCount', 2) }) }) diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 81bb6a6445..0d94269dbf 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -1,673 +1,108 @@ -import { db, userStats, workflow, workflowSchedule } from '@sim/db' -import { Cron } from 'croner' -import { and, eq, lte, not, sql } from 'drizzle-orm' -import { NextResponse } from 'next/server' -import { v4 as uuidv4 } from 'uuid' -import { z } from 'zod' -import { getApiKeyOwnerUserId } from '@/lib/api-key/service' -import { checkServerSideUsageLimits } from '@/lib/billing' -import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' -import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' +import { db, workflowSchedule } from '@sim/db' +import { tasks } from '@trigger.dev/sdk' +import { and, eq, lte, not } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { verifyCronAuth } from '@/lib/auth/internal' +import { env, isTruthy } from '@/lib/env' import { createLogger } from '@/lib/logs/console/logger' -import { LoggingSession } from '@/lib/logs/execution/logging-session' -import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' -import { - type BlockState, - calculateNextRunTime as calculateNextTime, - getScheduleTimeValues, - getSubBlockValue, -} from '@/lib/schedules/utils' -import { decryptSecret, generateRequestId } from '@/lib/utils' -import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' -import { updateWorkflowRunCounts } from '@/lib/workflows/utils' -import { Executor } from '@/executor' -import { Serializer } from '@/serializer' -import { RateLimiter } from '@/services/queue' -import { mergeSubblockState } from '@/stores/workflows/server-utils' +import { generateRequestId } from '@/lib/utils' +import { executeScheduleJob } from '@/background/schedule-execution' export const dynamic = 'force-dynamic' const logger = createLogger('ScheduledExecuteAPI') -// Maximum number of consecutive failures before disabling a schedule -const MAX_CONSECUTIVE_FAILURES = 3 - -/** - * Calculate the next run time for a schedule - * This is a wrapper around the utility function in schedule-utils.ts - */ -function calculateNextRunTime( - schedule: typeof workflowSchedule.$inferSelect, - blocks: Record -): Date { - // Look for either starter block or schedule trigger block - const scheduleBlock = Object.values(blocks).find( - (block) => block.type === 'starter' || block.type === 'schedule' - ) - if (!scheduleBlock) throw new Error('No starter or schedule block found') - const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType') - const scheduleValues = getScheduleTimeValues(scheduleBlock) +export async function GET(request: NextRequest) { + const requestId = generateRequestId() + logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`) - if (schedule.cronExpression) { - const cron = new Cron(schedule.cronExpression) - const nextDate = cron.nextRun() - if (!nextDate) throw new Error('Invalid cron expression or no future occurrences') - return nextDate + const authError = verifyCronAuth(request, 'Schedule execution') + if (authError) { + return authError } - const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null - return calculateNextTime(scheduleType, scheduleValues, lastRanAt) -} - -const EnvVarsSchema = z.record(z.string()) - -const runningExecutions = new Set() - -export async function GET() { - logger.info(`Scheduled execution triggered at ${new Date().toISOString()}`) - const requestId = generateRequestId() const now = new Date() - let dueSchedules: (typeof workflowSchedule.$inferSelect)[] = [] - try { - dueSchedules = await db + const dueSchedules = await db .select() .from(workflowSchedule) .where( and(lte(workflowSchedule.nextRunAt, now), not(eq(workflowSchedule.status, 'disabled'))) ) - .limit(10) logger.debug(`[${requestId}] Successfully queried schedules: ${dueSchedules.length} found`) - logger.info(`[${requestId}] Processing ${dueSchedules.length} due scheduled workflows`) - for (const schedule of dueSchedules) { - const executionId = uuidv4() - - try { - if (runningExecutions.has(schedule.workflowId)) { - logger.debug(`[${requestId}] Skipping workflow ${schedule.workflowId} - already running`) - continue - } - - runningExecutions.add(schedule.workflowId) - logger.debug(`[${requestId}] Starting execution of workflow ${schedule.workflowId}`) + const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED) - const [workflowRecord] = await db - .select() - .from(workflow) - .where(eq(workflow.id, schedule.workflowId)) - .limit(1) - - if (!workflowRecord) { - logger.warn(`[${requestId}] Workflow ${schedule.workflowId} not found`) - runningExecutions.delete(schedule.workflowId) - continue - } - - const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId) + if (useTrigger) { + const triggerPromises = dueSchedules.map(async (schedule) => { + try { + const payload = { + scheduleId: schedule.id, + workflowId: schedule.workflowId, + blockId: schedule.blockId || undefined, + cronExpression: schedule.cronExpression || undefined, + lastRanAt: schedule.lastRanAt?.toISOString(), + failedCount: schedule.failedCount || 0, + now: now.toISOString(), + } - if (!actorUserId) { - logger.warn( - `[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.` + const handle = await tasks.trigger('schedule-execution', payload) + logger.info( + `[${requestId}] Queued schedule execution task ${handle.id} for workflow ${schedule.workflowId}` ) - runningExecutions.delete(schedule.workflowId) - continue - } - - // Check rate limits for scheduled execution (checks both personal and org subscriptions) - const userSubscription = await getHighestPrioritySubscription(actorUserId) - - const rateLimiter = new RateLimiter() - const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - actorUserId, - userSubscription, - 'schedule', - false // schedules are always sync - ) - - if (!rateLimitCheck.allowed) { - logger.warn( - `[${requestId}] Rate limit exceeded for scheduled workflow ${schedule.workflowId}`, - { - userId: workflowRecord.userId, - remaining: rateLimitCheck.remaining, - resetAt: rateLimitCheck.resetAt, - } + return handle + } catch (error) { + logger.error( + `[${requestId}] Failed to trigger schedule execution for workflow ${schedule.workflowId}`, + error ) - - // Retry in 5 minutes for rate limit - const retryDelay = 5 * 60 * 1000 // 5 minutes - const nextRetryAt = new Date(now.getTime() + retryDelay) - - try { - await db - .update(workflowSchedule) - .set({ - updatedAt: now, - nextRunAt: nextRetryAt, - }) - .where(eq(workflowSchedule.id, schedule.id)) - - logger.debug(`[${requestId}] Updated next retry time due to rate limit`) - } catch (updateError) { - logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError) - } - - runningExecutions.delete(schedule.workflowId) - continue + return null } - - const usageCheck = await checkServerSideUsageLimits(actorUserId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: schedule.workflowId, - } - ) - try { - const deployedData = await loadDeployedWorkflowState(schedule.workflowId) - const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any) - await db - .update(workflowSchedule) - .set({ updatedAt: now, nextRunAt }) - .where(eq(workflowSchedule.id, schedule.id)) - } catch (calcErr) { - logger.warn( - `[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`, - calcErr - ) - } - runningExecutions.delete(schedule.workflowId) - continue + }) + + await Promise.allSettled(triggerPromises) + + logger.info(`[${requestId}] Queued ${dueSchedules.length} schedule executions to Trigger.dev`) + } else { + const directExecutionPromises = dueSchedules.map(async (schedule) => { + const payload = { + scheduleId: schedule.id, + workflowId: schedule.workflowId, + blockId: schedule.blockId || undefined, + cronExpression: schedule.cronExpression || undefined, + lastRanAt: schedule.lastRanAt?.toISOString(), + failedCount: schedule.failedCount || 0, + now: now.toISOString(), } - // Execute scheduled workflow immediately (no queuing) - logger.info(`[${requestId}] Executing scheduled workflow ${schedule.workflowId}`) - - try { - const executionSuccess = await (async () => { - // Create logging session inside the execution callback - const loggingSession = new LoggingSession( - schedule.workflowId, - executionId, - 'schedule', - requestId - ) - - try { - logger.debug(`[${requestId}] Loading deployed workflow ${schedule.workflowId}`) - const deployedData = await loadDeployedWorkflowState(schedule.workflowId) - - const blocks = deployedData.blocks - const edges = deployedData.edges - const loops = deployedData.loops - const parallels = deployedData.parallels - logger.info(`[${requestId}] Loaded deployed workflow ${schedule.workflowId}`) - - // Validate that the schedule's trigger block exists in the deployed state - if (schedule.blockId) { - const blockExists = await blockExistsInDeployment( - schedule.workflowId, - schedule.blockId - ) - if (!blockExists) { - logger.warn( - `[${requestId}] Schedule trigger block ${schedule.blockId} not found in deployed workflow ${schedule.workflowId}. Skipping execution.` - ) - return { skip: true, blocks: {} as Record } - } - } - - const mergedStates = mergeSubblockState(blocks) - - // Retrieve environment variables with workspace precedence - const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( - actorUserId, - workflowRecord.workspaceId || undefined - ) - const variables = EnvVarsSchema.parse({ - ...personalEncrypted, - ...workspaceEncrypted, - }) - - const currentBlockStates = await Object.entries(mergedStates).reduce( - async (accPromise, [id, block]) => { - const acc = await accPromise - acc[id] = await Object.entries(block.subBlocks).reduce( - async (subAccPromise, [key, subBlock]) => { - const subAcc = await subAccPromise - let value = subBlock.value - - if ( - typeof value === 'string' && - value.includes('{{') && - value.includes('}}') - ) { - const matches = value.match(/{{([^}]+)}}/g) - if (matches) { - for (const match of matches) { - const varName = match.slice(2, -2) - const encryptedValue = variables[varName] - if (!encryptedValue) { - throw new Error(`Environment variable "${varName}" was not found`) - } - - try { - const { decrypted } = await decryptSecret(encryptedValue) - value = (value as string).replace(match, decrypted) - } catch (error: any) { - logger.error( - `[${requestId}] Error decrypting value for variable "${varName}"`, - error - ) - throw new Error( - `Failed to decrypt environment variable "${varName}": ${error.message}` - ) - } - } - } - } - - subAcc[key] = value - return subAcc - }, - Promise.resolve({} as Record) - ) - return acc - }, - Promise.resolve({} as Record>) - ) - - const decryptedEnvVars: Record = {} - for (const [key, encryptedValue] of Object.entries(variables)) { - try { - const { decrypted } = await decryptSecret(encryptedValue) - decryptedEnvVars[key] = decrypted - } catch (error: any) { - logger.error( - `[${requestId}] Failed to decrypt environment variable "${key}"`, - error - ) - throw new Error( - `Failed to decrypt environment variable "${key}": ${error.message}` - ) - } - } - - // Process the block states to ensure response formats are properly parsed - const processedBlockStates = Object.entries(currentBlockStates).reduce( - (acc, [blockId, blockState]) => { - // Check if this block has a responseFormat that needs to be parsed - if (blockState.responseFormat && typeof blockState.responseFormat === 'string') { - const responseFormatValue = blockState.responseFormat.trim() - - // Check for variable references like - if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) { - logger.debug( - `[${requestId}] Response format contains variable reference for block ${blockId}` - ) - // Keep variable references as-is - they will be resolved during execution - acc[blockId] = blockState - } else if (responseFormatValue === '') { - // Empty string - remove response format - acc[blockId] = { - ...blockState, - responseFormat: undefined, - } - } else { - try { - logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`) - // Attempt to parse the responseFormat if it's a string - const parsedResponseFormat = JSON.parse(responseFormatValue) - - acc[blockId] = { - ...blockState, - responseFormat: parsedResponseFormat, - } - } catch (error) { - logger.warn( - `[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`, - error - ) - // Set to undefined instead of keeping malformed JSON - this allows execution to continue - acc[blockId] = { - ...blockState, - responseFormat: undefined, - } - } - } - } else { - acc[blockId] = blockState - } - return acc - }, - {} as Record> - ) - - // Get workflow variables - let workflowVariables = {} - if (workflowRecord.variables) { - try { - if (typeof workflowRecord.variables === 'string') { - workflowVariables = JSON.parse(workflowRecord.variables) - } else { - workflowVariables = workflowRecord.variables - } - } catch (error) { - logger.error(`Failed to parse workflow variables: ${schedule.workflowId}`, error) - } - } - - const serializedWorkflow = new Serializer().serializeWorkflow( - mergedStates, - edges, - loops, - parallels, - true // Enable validation during execution - ) - - const input = { - _context: { - workflowId: schedule.workflowId, - }, - } - - // Start logging with environment variables - await loggingSession.safeStart({ - userId: actorUserId, - workspaceId: workflowRecord.workspaceId || '', - variables: variables || {}, - }) - - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: input, - workflowVariables, - contextExtensions: { - executionId, - workspaceId: workflowRecord.workspaceId || '', - isDeployedContext: true, - }, - }) - - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - const result = await executor.execute( - schedule.workflowId, - schedule.blockId || undefined - ) - - const executionResult = - 'stream' in result && 'execution' in result ? result.execution : result - - logger.info(`[${requestId}] Workflow execution completed: ${schedule.workflowId}`, { - success: executionResult.success, - executionTime: executionResult.metadata?.duration, - }) - - if (executionResult.success) { - await updateWorkflowRunCounts(schedule.workflowId) - - try { - await db - .update(userStats) - .set({ - totalScheduledExecutions: sql`total_scheduled_executions + 1`, - lastActive: now, - }) - .where(eq(userStats.userId, actorUserId)) - - logger.debug(`[${requestId}] Updated user stats for scheduled execution`) - } catch (statsError) { - logger.error(`[${requestId}] Error updating user stats:`, statsError) - } - } - - const { traceSpans, totalDuration } = buildTraceSpans(executionResult) - - // Complete logging - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: executionResult.output || {}, - traceSpans: (traceSpans || []) as any, - }) - - return { success: executionResult.success, blocks, executionResult } - } catch (earlyError: any) { - // Handle errors that occur before workflow execution (e.g., missing data, env vars, etc.) - logger.error( - `[${requestId}] Early failure in scheduled workflow ${schedule.workflowId}`, - earlyError - ) - - // Create a minimal log entry for early failures - try { - await loggingSession.safeStart({ - userId: workflowRecord.userId, - workspaceId: workflowRecord.workspaceId || '', - variables: {}, - }) - - await loggingSession.safeCompleteWithError({ - error: { - message: `Schedule execution failed before workflow started: ${earlyError.message}`, - stackTrace: earlyError.stack, - }, - traceSpans: [], - }) - } catch (loggingError) { - logger.error( - `[${requestId}] Failed to create log entry for early schedule failure`, - loggingError - ) - } - - // Re-throw the error to be handled by the outer catch block - throw earlyError - } - })() - - // Check if execution was skipped (e.g., trigger block not found) - if ('skip' in executionSuccess && executionSuccess.skip) { - runningExecutions.delete(schedule.workflowId) - continue - } - - if (executionSuccess.success) { - logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`) - - const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks) - - logger.debug( - `[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${schedule.workflowId}` - ) - - try { - await db - .update(workflowSchedule) - .set({ - lastRanAt: now, - updatedAt: now, - nextRunAt, - failedCount: 0, // Reset failure count on success - }) - .where(eq(workflowSchedule.id, schedule.id)) - - logger.debug( - `[${requestId}] Updated next run time for workflow ${schedule.workflowId} to ${nextRunAt.toISOString()}` - ) - } catch (updateError) { - logger.error(`[${requestId}] Error updating schedule after success:`, updateError) - } - } else { - logger.warn(`[${requestId}] Workflow ${schedule.workflowId} execution failed`) - - const newFailedCount = (schedule.failedCount || 0) + 1 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES - const nextRunAt = calculateNextRunTime(schedule, executionSuccess.blocks) - - if (shouldDisable) { - logger.warn( - `[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } - - try { - await db - .update(workflowSchedule) - .set({ - updatedAt: now, - nextRunAt, - failedCount: newFailedCount, - lastFailedAt: now, - status: shouldDisable ? 'disabled' : 'active', - }) - .where(eq(workflowSchedule.id, schedule.id)) - - logger.debug(`[${requestId}] Updated schedule after failure`) - } catch (updateError) { - logger.error(`[${requestId}] Error updating schedule after failure:`, updateError) - } - } - } catch (error: any) { - // Handle sync queue overload - if (error.message?.includes('Service overloaded')) { - logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`) - - const retryDelay = 5 * 60 * 1000 // 5 minutes - const nextRetryAt = new Date(now.getTime() + retryDelay) - - try { - await db - .update(workflowSchedule) - .set({ - updatedAt: now, - nextRunAt: nextRetryAt, - }) - .where(eq(workflowSchedule.id, schedule.id)) - - logger.debug(`[${requestId}] Updated schedule retry time due to service overload`) - } catch (updateError) { - logger.error( - `[${requestId}] Error updating schedule for service overload:`, - updateError - ) - } - } else { - logger.error( - `[${requestId}] Error executing scheduled workflow ${schedule.workflowId}`, - error - ) - - // Ensure we create a log entry for this failed execution - try { - const failureLoggingSession = new LoggingSession( - schedule.workflowId, - executionId, - 'schedule', - requestId - ) - - await failureLoggingSession.safeStart({ - userId: workflowRecord.userId, - workspaceId: workflowRecord.workspaceId || '', - variables: {}, - }) - - await failureLoggingSession.safeCompleteWithError({ - error: { - message: `Schedule execution failed: ${error.message}`, - stackTrace: error.stack, - }, - traceSpans: [], - }) - } catch (loggingError) { - logger.error( - `[${requestId}] Failed to create log entry for failed schedule execution`, - loggingError - ) - } - - let nextRunAt: Date - try { - const [workflowRecord] = await db - .select() - .from(workflow) - .where(eq(workflow.id, schedule.workflowId)) - .limit(1) - - if (workflowRecord?.isDeployed) { - try { - const deployedData = await loadDeployedWorkflowState(schedule.workflowId) - nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any) - } catch { - nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) - } - } else { - nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) - } - } catch (workflowError) { - logger.error( - `[${requestId}] Error retrieving workflow for next run calculation`, - workflowError - ) - nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) // 24 hours as a fallback - } - - const newFailedCount = (schedule.failedCount || 0) + 1 - const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES + void executeScheduleJob(payload).catch((error) => { + logger.error( + `[${requestId}] Direct schedule execution failed for workflow ${schedule.workflowId}`, + error + ) + }) - if (shouldDisable) { - logger.warn( - `[${requestId}] Disabling schedule for workflow ${schedule.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` - ) - } + logger.info( + `[${requestId}] Queued direct schedule execution for workflow ${schedule.workflowId} (Trigger.dev disabled)` + ) + }) - try { - await db - .update(workflowSchedule) - .set({ - updatedAt: now, - nextRunAt, - failedCount: newFailedCount, - lastFailedAt: now, - status: shouldDisable ? 'disabled' : 'active', - }) - .where(eq(workflowSchedule.id, schedule.id)) + await Promise.allSettled(directExecutionPromises) - logger.debug(`[${requestId}] Updated schedule after execution error`) - } catch (updateError) { - logger.error( - `[${requestId}] Error updating schedule after execution error:`, - updateError - ) - } - } - } finally { - runningExecutions.delete(schedule.workflowId) - } - } catch (error: any) { - logger.error(`[${requestId}] Error in scheduled execution handler`, error) - return NextResponse.json({ error: error.message }, { status: 500 }) - } + logger.info( + `[${requestId}] Queued ${dueSchedules.length} direct schedule executions (Trigger.dev disabled)` + ) } + + return NextResponse.json({ + message: 'Scheduled workflow executions processed', + executedCount: dueSchedules.length, + }) } catch (error: any) { logger.error(`[${requestId}] Error in scheduled execution handler`, error) return NextResponse.json({ error: error.message }, { status: 500 }) } - - return NextResponse.json({ - message: 'Scheduled workflow executions processed', - executedCount: dueSchedules.length, - }) } diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts new file mode 100644 index 0000000000..691f8255ae --- /dev/null +++ b/apps/sim/background/schedule-execution.ts @@ -0,0 +1,598 @@ +import { db, userStats, workflow, workflowSchedule } from '@sim/db' +import { task } from '@trigger.dev/sdk' +import { Cron } from 'croner' +import { eq, sql } from 'drizzle-orm' +import { v4 as uuidv4 } from 'uuid' +import { getApiKeyOwnerUserId } from '@/lib/api-key/service' +import { checkServerSideUsageLimits } from '@/lib/billing' +import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' +import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' +import { createLogger } from '@/lib/logs/console/logger' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' +import { + type BlockState, + calculateNextRunTime as calculateNextTime, + getScheduleTimeValues, + getSubBlockValue, +} from '@/lib/schedules/utils' +import { decryptSecret } from '@/lib/utils' +import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' +import { updateWorkflowRunCounts } from '@/lib/workflows/utils' +import { Executor } from '@/executor' +import { Serializer } from '@/serializer' +import { RateLimiter } from '@/services/queue' +import { mergeSubblockState } from '@/stores/workflows/server-utils' + +const logger = createLogger('TriggerScheduleExecution') + +const MAX_CONSECUTIVE_FAILURES = 3 + +export type ScheduleExecutionPayload = { + scheduleId: string + workflowId: string + blockId?: string + cronExpression?: string + lastRanAt?: string + failedCount?: number + now: string +} + +function calculateNextRunTime( + schedule: { cronExpression?: string; lastRanAt?: string }, + blocks: Record +): Date { + const scheduleBlock = Object.values(blocks).find( + (block) => block.type === 'starter' || block.type === 'schedule' + ) + if (!scheduleBlock) throw new Error('No starter or schedule block found') + const scheduleType = getSubBlockValue(scheduleBlock, 'scheduleType') + const scheduleValues = getScheduleTimeValues(scheduleBlock) + + if (schedule.cronExpression) { + const cron = new Cron(schedule.cronExpression) + const nextDate = cron.nextRun() + if (!nextDate) throw new Error('Invalid cron expression or no future occurrences') + return nextDate + } + + const lastRanAt = schedule.lastRanAt ? new Date(schedule.lastRanAt) : null + return calculateNextTime(scheduleType, scheduleValues, lastRanAt) +} + +export async function executeScheduleJob(payload: ScheduleExecutionPayload) { + const executionId = uuidv4() + const requestId = executionId.slice(0, 8) + const now = new Date(payload.now) + + logger.info(`[${requestId}] Starting schedule execution`, { + scheduleId: payload.scheduleId, + workflowId: payload.workflowId, + executionId, + }) + + const EnvVarsSchema = (await import('zod')).z.record((await import('zod')).z.string()) + + try { + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, payload.workflowId)) + .limit(1) + + if (!workflowRecord) { + logger.warn(`[${requestId}] Workflow ${payload.workflowId} not found`) + return + } + + const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId) + + if (!actorUserId) { + logger.warn( + `[${requestId}] Skipping schedule ${payload.scheduleId}: pinned API key required to attribute usage.` + ) + return + } + + const userSubscription = await getHighestPrioritySubscription(actorUserId) + + const rateLimiter = new RateLimiter() + const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( + actorUserId, + userSubscription, + 'schedule', + false + ) + + if (!rateLimitCheck.allowed) { + logger.warn( + `[${requestId}] Rate limit exceeded for scheduled workflow ${payload.workflowId}`, + { + userId: workflowRecord.userId, + remaining: rateLimitCheck.remaining, + resetAt: rateLimitCheck.resetAt, + } + ) + + const retryDelay = 5 * 60 * 1000 + const nextRetryAt = new Date(now.getTime() + retryDelay) + + try { + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt: nextRetryAt, + }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + + logger.debug(`[${requestId}] Updated next retry time due to rate limit`) + } catch (updateError) { + logger.error(`[${requestId}] Error updating schedule for rate limit:`, updateError) + } + + return + } + + const usageCheck = await checkServerSideUsageLimits(actorUserId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: payload.workflowId, + } + ) + try { + const deployedData = await loadDeployedWorkflowState(payload.workflowId) + const nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any) + await db + .update(workflowSchedule) + .set({ updatedAt: now, nextRunAt }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + } catch (calcErr) { + logger.warn( + `[${requestId}] Unable to calculate nextRunAt while skipping schedule ${payload.scheduleId}`, + calcErr + ) + } + return + } + + logger.info(`[${requestId}] Executing scheduled workflow ${payload.workflowId}`) + + const loggingSession = new LoggingSession( + payload.workflowId, + executionId, + 'schedule', + requestId + ) + + try { + const executionSuccess = await (async () => { + try { + logger.debug(`[${requestId}] Loading deployed workflow ${payload.workflowId}`) + const deployedData = await loadDeployedWorkflowState(payload.workflowId) + + const blocks = deployedData.blocks + const edges = deployedData.edges + const loops = deployedData.loops + const parallels = deployedData.parallels + logger.info(`[${requestId}] Loaded deployed workflow ${payload.workflowId}`) + + if (payload.blockId) { + const blockExists = await blockExistsInDeployment(payload.workflowId, payload.blockId) + if (!blockExists) { + logger.warn( + `[${requestId}] Schedule trigger block ${payload.blockId} not found in deployed workflow ${payload.workflowId}. Skipping execution.` + ) + return { skip: true, blocks: {} as Record } + } + } + + const mergedStates = mergeSubblockState(blocks) + + const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( + actorUserId, + workflowRecord.workspaceId || undefined + ) + const variables = EnvVarsSchema.parse({ + ...personalEncrypted, + ...workspaceEncrypted, + }) + + const currentBlockStates = await Object.entries(mergedStates).reduce( + async (accPromise, [id, block]) => { + const acc = await accPromise + acc[id] = await Object.entries(block.subBlocks).reduce( + async (subAccPromise, [key, subBlock]) => { + const subAcc = await subAccPromise + let value = subBlock.value + + if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) { + const matches = value.match(/{{([^}]+)}}/g) + if (matches) { + for (const match of matches) { + const varName = match.slice(2, -2) + const encryptedValue = variables[varName] + if (!encryptedValue) { + throw new Error(`Environment variable "${varName}" was not found`) + } + + try { + const { decrypted } = await decryptSecret(encryptedValue) + value = (value as string).replace(match, decrypted) + } catch (error: any) { + logger.error( + `[${requestId}] Error decrypting value for variable "${varName}"`, + error + ) + throw new Error( + `Failed to decrypt environment variable "${varName}": ${error.message}` + ) + } + } + } + } + + subAcc[key] = value + return subAcc + }, + Promise.resolve({} as Record) + ) + return acc + }, + Promise.resolve({} as Record>) + ) + + const decryptedEnvVars: Record = {} + for (const [key, encryptedValue] of Object.entries(variables)) { + try { + const { decrypted } = await decryptSecret(encryptedValue) + decryptedEnvVars[key] = decrypted + } catch (error: any) { + logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error) + throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) + } + } + + const processedBlockStates = Object.entries(currentBlockStates).reduce( + (acc, [blockId, blockState]) => { + if (blockState.responseFormat && typeof blockState.responseFormat === 'string') { + const responseFormatValue = blockState.responseFormat.trim() + + if (responseFormatValue.startsWith('<') && responseFormatValue.includes('>')) { + logger.debug( + `[${requestId}] Response format contains variable reference for block ${blockId}` + ) + acc[blockId] = blockState + } else if (responseFormatValue === '') { + acc[blockId] = { + ...blockState, + responseFormat: undefined, + } + } else { + try { + logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`) + const parsedResponseFormat = JSON.parse(responseFormatValue) + + acc[blockId] = { + ...blockState, + responseFormat: parsedResponseFormat, + } + } catch (error) { + logger.warn( + `[${requestId}] Failed to parse responseFormat for block ${blockId}, using undefined`, + error + ) + acc[blockId] = { + ...blockState, + responseFormat: undefined, + } + } + } + } else { + acc[blockId] = blockState + } + return acc + }, + {} as Record> + ) + + let workflowVariables = {} + if (workflowRecord.variables) { + try { + if (typeof workflowRecord.variables === 'string') { + workflowVariables = JSON.parse(workflowRecord.variables) + } else { + workflowVariables = workflowRecord.variables + } + } catch (error) { + logger.error(`Failed to parse workflow variables: ${payload.workflowId}`, error) + } + } + + const serializedWorkflow = new Serializer().serializeWorkflow( + mergedStates, + edges, + loops, + parallels, + true + ) + + const input = { + _context: { + workflowId: payload.workflowId, + }, + } + + await loggingSession.safeStart({ + userId: actorUserId, + workspaceId: workflowRecord.workspaceId || '', + variables: variables || {}, + }) + + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: input, + workflowVariables, + contextExtensions: { + executionId, + workspaceId: workflowRecord.workspaceId || '', + isDeployedContext: true, + }, + }) + + loggingSession.setupExecutor(executor) + + const result = await executor.execute(payload.workflowId, payload.blockId || undefined) + + const executionResult = + 'stream' in result && 'execution' in result ? result.execution : result + + logger.info(`[${requestId}] Workflow execution completed: ${payload.workflowId}`, { + success: executionResult.success, + executionTime: executionResult.metadata?.duration, + }) + + if (executionResult.success) { + await updateWorkflowRunCounts(payload.workflowId) + + try { + await db + .update(userStats) + .set({ + totalScheduledExecutions: sql`total_scheduled_executions + 1`, + lastActive: now, + }) + .where(eq(userStats.userId, actorUserId)) + + logger.debug(`[${requestId}] Updated user stats for scheduled execution`) + } catch (statsError) { + logger.error(`[${requestId}] Error updating user stats:`, statsError) + } + } + + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: executionResult.output || {}, + traceSpans: (traceSpans || []) as any, + }) + + return { success: executionResult.success, blocks, executionResult } + } catch (earlyError: any) { + logger.error( + `[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, + earlyError + ) + + try { + await loggingSession.safeStart({ + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId || '', + variables: {}, + }) + + await loggingSession.safeCompleteWithError({ + error: { + message: `Schedule execution failed before workflow started: ${earlyError.message}`, + stackTrace: earlyError.stack, + }, + traceSpans: [], + }) + } catch (loggingError) { + logger.error( + `[${requestId}] Failed to create log entry for early schedule failure`, + loggingError + ) + } + + throw earlyError + } + })() + + if ('skip' in executionSuccess && executionSuccess.skip) { + return + } + + if (executionSuccess.success) { + logger.info(`[${requestId}] Workflow ${payload.workflowId} executed successfully`) + + const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks) + + logger.debug( + `[${requestId}] Calculated next run time: ${nextRunAt.toISOString()} for workflow ${payload.workflowId}` + ) + + try { + await db + .update(workflowSchedule) + .set({ + lastRanAt: now, + updatedAt: now, + nextRunAt, + failedCount: 0, + }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + + logger.debug( + `[${requestId}] Updated next run time for workflow ${payload.workflowId} to ${nextRunAt.toISOString()}` + ) + } catch (updateError) { + logger.error(`[${requestId}] Error updating schedule after success:`, updateError) + } + } else { + logger.warn(`[${requestId}] Workflow ${payload.workflowId} execution failed`) + + const newFailedCount = (payload.failedCount || 0) + 1 + const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES + const nextRunAt = calculateNextRunTime(payload, executionSuccess.blocks) + + if (shouldDisable) { + logger.warn( + `[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` + ) + } + + try { + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt, + failedCount: newFailedCount, + lastFailedAt: now, + status: shouldDisable ? 'disabled' : 'active', + }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + + logger.debug(`[${requestId}] Updated schedule after failure`) + } catch (updateError) { + logger.error(`[${requestId}] Error updating schedule after failure:`, updateError) + } + } + } catch (error: any) { + if (error.message?.includes('Service overloaded')) { + logger.warn(`[${requestId}] Service overloaded, retrying schedule in 5 minutes`) + + const retryDelay = 5 * 60 * 1000 + const nextRetryAt = new Date(now.getTime() + retryDelay) + + try { + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt: nextRetryAt, + }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + + logger.debug(`[${requestId}] Updated schedule retry time due to service overload`) + } catch (updateError) { + logger.error(`[${requestId}] Error updating schedule for service overload:`, updateError) + } + } else { + logger.error( + `[${requestId}] Error executing scheduled workflow ${payload.workflowId}`, + error + ) + + try { + const failureLoggingSession = new LoggingSession( + payload.workflowId, + executionId, + 'schedule', + requestId + ) + + await failureLoggingSession.safeStart({ + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId || '', + variables: {}, + }) + + await failureLoggingSession.safeCompleteWithError({ + error: { + message: `Schedule execution failed: ${error.message}`, + stackTrace: error.stack, + }, + traceSpans: [], + }) + } catch (loggingError) { + logger.error( + `[${requestId}] Failed to create log entry for failed schedule execution`, + loggingError + ) + } + + let nextRunAt: Date + try { + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, payload.workflowId)) + .limit(1) + + if (workflowRecord?.isDeployed) { + try { + const deployedData = await loadDeployedWorkflowState(payload.workflowId) + nextRunAt = calculateNextRunTime(payload, deployedData.blocks as any) + } catch { + nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) + } + } else { + nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) + } + } catch (workflowError) { + logger.error( + `[${requestId}] Error retrieving workflow for next run calculation`, + workflowError + ) + nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000) + } + + const newFailedCount = (payload.failedCount || 0) + 1 + const shouldDisable = newFailedCount >= MAX_CONSECUTIVE_FAILURES + + if (shouldDisable) { + logger.warn( + `[${requestId}] Disabling schedule for workflow ${payload.workflowId} after ${MAX_CONSECUTIVE_FAILURES} consecutive failures` + ) + } + + try { + await db + .update(workflowSchedule) + .set({ + updatedAt: now, + nextRunAt, + failedCount: newFailedCount, + lastFailedAt: now, + status: shouldDisable ? 'disabled' : 'active', + }) + .where(eq(workflowSchedule.id, payload.scheduleId)) + + logger.debug(`[${requestId}] Updated schedule after execution error`) + } catch (updateError) { + logger.error(`[${requestId}] Error updating schedule after execution error:`, updateError) + } + } + } + } catch (error: any) { + logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error) + } +} + +export const scheduleExecution = task({ + id: 'schedule-execution', + retry: { + maxAttempts: 1, + }, + run: async (payload: ScheduleExecutionPayload) => executeScheduleJob(payload), +}) diff --git a/helm/sim/templates/cronjobs.yaml b/helm/sim/templates/cronjobs.yaml index 95a54f74fe..aa0232486d 100644 --- a/helm/sim/templates/cronjobs.yaml +++ b/helm/sim/templates/cronjobs.yaml @@ -41,6 +41,9 @@ spec: securityContext: {{- toYaml . | nindent 14 }} {{- end }} + env: + - name: CRON_SECRET + value: {{ $.Values.app.env.CRON_SECRET | quote }} command: - /bin/sh - -c @@ -58,6 +61,7 @@ spec: if curl -f -s -S --max-time 60 --retry 2 --retry-delay 5 \ -H "Content-Type: application/json" \ -H "User-Agent: Kubernetes-CronJob/{{ $jobConfig.name }}" \ + -H "Authorization: Bearer ${CRON_SECRET}" \ "$SERVICE_URL{{ $jobConfig.path }}"; then echo "Success: HTTP request completed" exit 0