diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 2bab643cfd..6d66bf749d 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -1,6 +1,7 @@ import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' +import { checkServerSideUsageLimits } from '@/lib/billing' import { isDev } from '@/lib/environment' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' @@ -330,6 +331,22 @@ export async function executeWorkflowForChat( const workflowId = deployment.workflowId const executionId = uuidv4() + const usageCheck = await checkServerSideUsageLimits(deployment.userId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${deployment.userId} has exceeded usage limits. Skipping chat execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: deployment.workflowId, + chatId, + } + ) + throw new Error( + usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue using chat.' + ) + } + // Set up logging for chat execution const loggingSession = new LoggingSession(workflowId, executionId, 'chat', requestId) diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index cf6ab0a837..93191243d8 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -284,219 +284,10 @@ describe('Webhook Trigger API Route', () => { expect(text).toMatch(/not found/i) // Response should contain "not found" message }) - /** - * Test duplicate webhook request handling - * Verifies that duplicate requests are detected and not processed multiple times - */ - it('should handle duplicate webhook requests', async () => { - // Set up duplicate detection - hasProcessedMessageMock.mockResolvedValue(true) // Simulate duplicate - processGenericDeduplicationMock.mockResolvedValue( - new Response('Duplicate request', { status: 200 }) - ) - - // Configure DB mock to return a webhook and workflow - const { db } = await import('@/db') - const limitMock = vi.fn().mockReturnValue([ - { - webhook: { - id: 'webhook-id', - path: 'test-path', - isActive: true, - provider: 'generic', // Not Airtable to test standard path - workflowId: 'workflow-id', - providerConfig: {}, - }, - workflow: { - id: 'workflow-id', - userId: 'user-id', - }, - }, - ]) - - const whereMock = vi.fn().mockReturnValue({ limit: limitMock }) - const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock }) - const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock }) - - // @ts-ignore - mocking the query chain - db.select.mockReturnValue({ from: fromMock }) - - // Create a mock request - const req = createMockRequest('POST', { event: 'test' }) - - // Mock the path param - const params = Promise.resolve({ path: 'test-path' }) - - // Import the handler after mocks are set up - const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') - - // Call the handler - const response = await POST(req, { params }) - - // Expect 200 response for duplicate - expect(response.status).toBe(200) - - // Verify response text indicates duplication - const text = await response.text() - expect(text).toMatch(/duplicate|received/i) // Response might be "Duplicate message" or "Request received" - }) - /** * Test Slack-specific webhook handling * Verifies that Slack signature verification is performed */ // TODO: Fix failing test - returns 500 instead of 200 // it('should handle Slack webhooks with signature verification', async () => { ... }) - - /** - * Test error handling during webhook execution - */ - it('should handle errors during workflow execution', async () => { - // Mock the setTimeout to be faster for testing - // @ts-ignore - Replace global setTimeout for this test - global.setTimeout = vi.fn((callback) => { - callback() // Execute immediately - return 123 // Return a timer ID - }) - - // Set up error handling mocks - processWebhookMock.mockImplementation(() => { - throw new Error('Webhook execution failed') - }) - executeMock.mockRejectedValue(new Error('Webhook execution failed')) - - // Configure DB mock to return a webhook and workflow - const { db } = await import('@/db') - const limitMock = vi.fn().mockReturnValue([ - { - webhook: { - id: 'webhook-id', - path: 'test-path', - isActive: true, - provider: 'generic', // Not Airtable to ensure we use the timeout path - workflowId: 'workflow-id', - providerConfig: {}, - }, - workflow: { - id: 'workflow-id', - userId: 'user-id', - }, - }, - ]) - - const whereMock = vi.fn().mockReturnValue({ limit: limitMock }) - const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock }) - const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock }) - - // @ts-ignore - mocking the query chain - db.select.mockReturnValue({ from: fromMock }) - - // Create a mock request - const req = createMockRequest('POST', { event: 'test' }) - - // Mock the path param - const params = Promise.resolve({ path: 'test-path' }) - - // Import the handler after mocks are set up - const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') - - // Call the handler - const response = await POST(req, { params }) - - // Verify response exists and check status code - // For non-Airtable webhooks, we expect 200 from the timeout response - expect(response).toBeDefined() - expect(response.status).toBe(200) - - // Verify response text - const text = await response.text() - expect(text).toMatch(/received|processing/i) - }) - - /** - * Test Airtable webhook specific handling - * Verifies that Airtable webhooks use the synchronous processing path - */ - it('should handle Airtable webhooks synchronously', async () => { - // Create webhook payload for Airtable - const airtablePayload = { - base: { - id: 'appn9RltLQQMsquyL', - }, - webhook: { - id: 'achpbXeBqNLsRFAnD', - }, - timestamp: new Date().toISOString(), - } - - // Reset fetch and process mock - fetchAndProcessAirtablePayloadsMock.mockResolvedValue(undefined) - - // Configure DB mock to return an Airtable webhook - const { db } = await import('@/db') - const limitMock = vi.fn().mockReturnValue([ - { - webhook: { - id: 'airtable-webhook-id', - path: 'airtable-path', - isActive: true, - provider: 'airtable', // Set provider to airtable to test that path - workflowId: 'workflow-id', - providerConfig: { - baseId: 'appn9RltLQQMsquyL', - externalId: 'achpbXeBqNLsRFAnD', - }, - }, - workflow: { - id: 'workflow-id', - userId: 'user-id', - }, - }, - ]) - - const whereMock = vi.fn().mockReturnValue({ limit: limitMock }) - const innerJoinMock = vi.fn().mockReturnValue({ where: whereMock }) - const fromMock = vi.fn().mockReturnValue({ innerJoin: innerJoinMock }) - - // Configure db.select to return the appropriate mock for this test - // @ts-ignore - Ignore TypeScript errors for test mocks - db.select = vi.fn().mockReturnValue({ from: fromMock }) - - // Also mock the DB for the Airtable notification check - const whereMock2 = vi.fn().mockReturnValue({ limit: vi.fn().mockReturnValue([]) }) - const fromMock2 = vi.fn().mockReturnValue({ where: whereMock2 }) - - // We need to handle multiple calls to db.select - let callCount = 0 - // @ts-ignore - Ignore TypeScript errors for test mocks - db.select = vi.fn().mockImplementation(() => { - callCount++ - if (callCount === 1) { - return { from: fromMock } - } - return { from: fromMock2 } - }) - - // Create a mock request with Airtable payload - const req = createMockRequest('POST', airtablePayload) - - // Mock the path param - const params = Promise.resolve({ path: 'airtable-path' }) - - // Import the handler after mocks are set up - const { POST } = await import('@/app/api/webhooks/trigger/[path]/route') - - // Call the handler - const response = await POST(req, { params }) - - // For Airtable we expect 200 after synchronous processing - expect(response.status).toBe(200) - - // Verify that the Airtable-specific function was called - expect(fetchAndProcessAirtablePayloadsMock).toHaveBeenCalledTimes(1) - - // The response should indicate success - const text = await response.text() - expect(text).toMatch(/success|processed/i) - }) }) diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.ts index 0b0052fc55..5f8de8d01a 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.ts @@ -1,19 +1,12 @@ -import { and, eq, sql } from 'drizzle-orm' +import { tasks } from '@trigger.dev/sdk/v3' +import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import { v4 as uuidv4 } from 'uuid' -import { checkServerSideUsageLimits } from '@/lib/billing' import { createLogger } from '@/lib/logs/console/logger' -import { acquireLock, hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis' import { - fetchAndProcessAirtablePayloads, handleSlackChallenge, handleWhatsAppVerification, - processGenericDeduplication, - processWebhook, - processWhatsAppDeduplication, validateMicrosoftTeamsSignature, } from '@/lib/webhooks/utils' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' import { db } from '@/db' import { subscription, webhook, workflow } from '@/db/schema' import { RateLimiter } from '@/services/queue' @@ -24,8 +17,6 @@ const logger = createLogger('WebhookTriggerAPI') export const dynamic = 'force-dynamic' export const maxDuration = 300 -const _activeProcessingTasks = new Map>() - /** * Webhook Verification Handler (GET) * @@ -125,50 +116,13 @@ export async function POST( return new NextResponse('Invalid JSON payload', { status: 400 }) } - // --- PHASE 2: Early Slack deduplication handling --- - const messageId = body?.event_id - - if (body?.type === 'event_callback') { - const dedupeKey = messageId - ? `slack:msg:${messageId}` - : `slack:${body?.team_id || ''}:${body?.event?.ts || body?.event?.event_ts || Date.now()}` - - try { - const isDuplicate = await hasProcessedMessage(dedupeKey) - if (isDuplicate) { - logger.info(`[${requestId}] Duplicate Slack message detected: ${dedupeKey}`) - return new NextResponse('Duplicate message', { status: 200 }) - } - - await markMessageAsProcessed(dedupeKey, 60 * 60 * 24) // 24 hour TTL - } catch (error) { - logger.error(`[${requestId}] Error in Slack deduplication`, error) - // Continue processing - better to risk a duplicate than fail - } - } - - // --- PHASE 3: Distributed lock acquisition --- - let hasExecutionLock = false - let executionLockKey: string - - if (body?.type === 'event_callback') { - // For Slack events, use message-specific lock key - executionLockKey = messageId - ? `execution:lock:slack:${messageId}` - : `execution:lock:slack:${body?.team_id || ''}:${body?.event?.ts || body?.event?.event_ts || Date.now()}` - } else { - // Default fallback for other providers - executionLockKey = `execution:lock:${requestId}:${crypto.randomUUID()}` + // Handle Slack challenge + const slackResponse = handleSlackChallenge(body) + if (slackResponse) { + return slackResponse } - try { - hasExecutionLock = await acquireLock(executionLockKey, requestId, 30) // 30 second TTL - } catch (lockError) { - logger.error(`[${requestId}] Error acquiring execution lock`, lockError) - // Proceed without lock in case of Redis failure (fallback to best-effort) - } - - // --- PHASE 4: Webhook identification --- + // --- PHASE 2: Webhook identification --- const path = (await params).path logger.info(`[${requestId}] Processing webhook request for path: ${path}`) @@ -191,60 +145,7 @@ export async function POST( foundWebhook = webhooks[0].webhook foundWorkflow = webhooks[0].workflow - const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id) - - if (!normalizedData) { - logger.error(`[${requestId}] No normalized data found for webhook workflow ${foundWorkflow.id}`) - return new NextResponse('Workflow data not found in normalized tables', { status: 500 }) - } - - // Construct state from normalized data only (execution-focused, no frontend state fields) - foundWorkflow.state = { - blocks: normalizedData.blocks, - edges: normalizedData.edges, - loops: normalizedData.loops, - parallels: normalizedData.parallels, - lastSaved: Date.now(), - isDeployed: foundWorkflow.isDeployed || false, - deployedAt: foundWorkflow.deployedAt, - } - - // Special handling for Telegram webhooks to work around middleware User-Agent checks - if (foundWebhook.provider === 'telegram') { - // Log detailed information about the request for debugging - const userAgent = request.headers.get('user-agent') || 'empty' - logger.info(`[${requestId}] Received Telegram webhook request:`, { - userAgent, - path, - clientIp: - request.headers.get('x-forwarded-for') || request.headers.get('x-real-ip') || 'unknown', - method: request.method, - contentType: request.headers.get('content-type'), - hasUpdate: !!body?.update_id, - }) - - // Ensure User-Agent headers for Telegram in future requests from the bot - // We can't modify the incoming request, but we can recommend adding it for future setup - if (!userAgent || userAgent === 'empty') { - logger.warn( - `[${requestId}] Telegram webhook request missing User-Agent header. Recommend reconfiguring webhook with 'TelegramBot/1.0' User-Agent.` - ) - } - } - - // Detect provider type - const isAirtableWebhook = foundWebhook.provider === 'airtable' - const isGmailWebhook = foundWebhook.provider === 'gmail' - - // Handle Slack challenge verification (must be done before timeout) - const slackChallengeResponse = - body?.type === 'url_verification' ? handleSlackChallenge(body) : null - if (slackChallengeResponse) { - logger.info(`[${requestId}] Responding to Slack URL verification challenge`) - return slackChallengeResponse - } - - // Handle Microsoft Teams outgoing webhook signature verification (must be done before timeout) + // Handle Microsoft Teams signature verification if needed if (foundWebhook.provider === 'microsoftteams') { const providerConfig = (foundWebhook.providerConfig as Record) || {} @@ -258,9 +159,6 @@ export async function POST( return new NextResponse('Unauthorized - Missing HMAC signature', { status: 401 }) } - // Get the raw body for HMAC verification - const rawBody = await request.text() - const isValidSignature = validateMicrosoftTeamsSignature( providerConfig.hmacSecret, authHeader, @@ -273,247 +171,99 @@ export async function POST( } logger.debug(`[${requestId}] Microsoft Teams HMAC signature verified successfully`) - - // Parse the body again since we consumed it for verification - try { - body = JSON.parse(rawBody) - } catch (parseError) { - logger.error( - `[${requestId}] Failed to parse Microsoft Teams webhook body after verification`, - { - error: parseError instanceof Error ? parseError.message : String(parseError), - } - ) - return new NextResponse('Invalid JSON payload', { status: 400 }) - } } } - // Skip processing if another instance is already handling this request - if (!hasExecutionLock) { - logger.info(`[${requestId}] Skipping execution as lock was not acquired`) - return new NextResponse('Request is being processed by another instance', { status: 200 }) - } + // --- PHASE 3: Rate limiting for webhook execution --- + try { + // Get user subscription for rate limiting + const [subscriptionRecord] = await db + .select({ plan: subscription.plan }) + .from(subscription) + .where(eq(subscription.referenceId, foundWorkflow.userId)) + .limit(1) - // --- PHASE 5: Provider-specific processing --- - - // For Airtable: Process synchronously without timeouts - if (isAirtableWebhook) { - try { - logger.info(`[${requestId}] Airtable webhook ping received for webhook: ${foundWebhook.id}`) - - // Handle Airtable deduplication - const notificationId = body.notificationId || null - if (notificationId) { - try { - const processedKey = `airtable-webhook-${foundWebhook.id}-${notificationId}` - - // Check if notification was already processed - const alreadyProcessed = await db - .select({ id: webhook.id }) - .from(webhook) - .where( - and( - eq(webhook.id, foundWebhook.id), - sql`(webhook.provider_config->>'processedNotifications')::jsonb ? ${processedKey}` - ) - ) - .limit(1) - - if (alreadyProcessed.length > 0) { - logger.info( - `[${requestId}] Duplicate Airtable notification detected: ${notificationId}` - ) - return new NextResponse('Notification already processed', { status: 200 }) - } - - // Store notification ID for deduplication - const providerConfig = foundWebhook.providerConfig || {} - const processedNotifications = providerConfig.processedNotifications || [] - processedNotifications.push(processedKey) - - // Keep only the last 100 notifications to prevent unlimited growth - const limitedNotifications = processedNotifications.slice(-100) - - // Update the webhook record - await db - .update(webhook) - .set({ - providerConfig: { - ...providerConfig, - processedNotifications: limitedNotifications, - }, - updatedAt: new Date(), - }) - .where(eq(webhook.id, foundWebhook.id)) - } catch (error) { - logger.warn(`[${requestId}] Airtable deduplication check failed, continuing`, { - error: error instanceof Error ? error.message : String(error), - }) - } - } + const subscriptionPlan = (subscriptionRecord?.plan || 'free') as SubscriptionPlan - // Process Airtable payloads synchronously - try { - logger.info(`[${requestId}] Starting Airtable payload processing`) - await fetchAndProcessAirtablePayloads(foundWebhook, foundWorkflow, requestId) - return new NextResponse('Airtable ping processed successfully', { status: 200 }) - } catch (error: any) { - logger.error(`[${requestId}] Error during Airtable processing`, { - error: error.message, - }) - return new NextResponse(`Error processing Airtable webhook: ${error.message}`, { - status: 500, - }) - } - } catch (error: any) { - logger.error(`[${requestId}] Error in Airtable processing`, error) - return new NextResponse(`Internal server error: ${error.message}`, { status: 500 }) - } - } + // Check async rate limits (webhooks are processed asynchronously) + const rateLimiter = new RateLimiter() + const rateLimitCheck = await rateLimiter.checkRateLimit( + foundWorkflow.userId, + subscriptionPlan, + 'webhook', + true // isAsync = true for webhook execution + ) - // --- For all other webhook types: Use async processing with timeout --- - - // Create timeout promise for fast initial response (2.5 seconds) - const timeoutDuration = 25000 - const timeoutPromise = new Promise((resolve) => { - setTimeout(() => { - logger.info(`[${requestId}] Fast response timeout activated`) - resolve(new NextResponse('Request received', { status: 200 })) - }, timeoutDuration) - }) - - // Create the processing promise for asynchronous execution - const processingPromise = (async () => { - try { - // Provider-specific deduplication - if (foundWebhook.provider === 'whatsapp') { - const data = body?.entry?.[0]?.changes?.[0]?.value - const messages = data?.messages || [] - - const whatsappDuplicateResponse = await processWhatsAppDeduplication(requestId, messages) - if (whatsappDuplicateResponse) { - return whatsappDuplicateResponse - } - } else if (foundWebhook.provider === 'gmail') { - // Gmail-specific validation and logging - logger.info(`[${requestId}] Gmail webhook request received for webhook: ${foundWebhook.id}`) - - const webhookSecret = foundWebhook.secret - if (webhookSecret) { - const secretHeader = request.headers.get('X-Webhook-Secret') - if (secretHeader !== webhookSecret) { - logger.warn(`[${requestId}] Invalid webhook secret`) - return new NextResponse('Unauthorized', { status: 401 }) - } - } - - if (!body.email) { - logger.warn(`[${requestId}] Invalid Gmail webhook payload format`) - return new NextResponse('Invalid payload format', { status: 400 }) - } - - logger.info(`[${requestId}] Processing Gmail email`, { - emailId: body.email.id, - subject: - body.email?.payload?.headers?.find((h: any) => h.name === 'Subject')?.value || - 'No subject', - }) + if (!rateLimitCheck.allowed) { + logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, { + provider: foundWebhook.provider, + remaining: rateLimitCheck.remaining, + resetAt: rateLimitCheck.resetAt, + }) - // Gmail deduplication using generic method - const genericDuplicateResponse = await processGenericDeduplication(requestId, path, body) - if (genericDuplicateResponse) { - return genericDuplicateResponse - } - } else if (foundWebhook.provider !== 'slack') { - // Generic deduplication for all other providers - const genericDuplicateResponse = await processGenericDeduplication(requestId, path, body) - if (genericDuplicateResponse) { - return genericDuplicateResponse - } + // Return 200 to prevent webhook provider retries, but indicate rate limit + if (foundWebhook.provider === 'microsoftteams') { + // Microsoft Teams requires specific response format + return NextResponse.json({ + type: 'message', + text: 'Rate limit exceeded. Please try again later.', + }) } - // Check rate limits for webhook execution - const [subscriptionRecord] = await db - .select({ plan: subscription.plan }) - .from(subscription) - .where(eq(subscription.referenceId, foundWorkflow.userId)) - .limit(1) - - const subscriptionPlan = (subscriptionRecord?.plan || 'free') as SubscriptionPlan - - const rateLimiter = new RateLimiter() - const rateLimitCheck = await rateLimiter.checkRateLimit( - foundWorkflow.userId, - subscriptionPlan, - 'webhook', - false // webhooks are always sync - ) + // Simple error response for other providers (return 200 to prevent retries) + return NextResponse.json({ message: 'Rate limit exceeded' }, { status: 200 }) + } - if (!rateLimitCheck.allowed) { - logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, { - remaining: rateLimitCheck.remaining, - resetAt: rateLimitCheck.resetAt, - }) + logger.debug(`[${requestId}] Rate limit check passed for webhook`, { + provider: foundWebhook.provider, + remaining: rateLimitCheck.remaining, + resetAt: rateLimitCheck.resetAt, + }) + } catch (rateLimitError) { + logger.error(`[${requestId}] Error checking webhook rate limits:`, rateLimitError) + // Continue processing - better to risk rate limit bypass than fail webhook + } - // Return 200 to prevent webhook retries but indicate rate limit in response - return new NextResponse( - JSON.stringify({ - status: 'error', - message: `Rate limit exceeded. You have ${rateLimitCheck.remaining} requests remaining. Resets at ${rateLimitCheck.resetAt.toISOString()}`, - }), - { - status: 200, // Use 200 to prevent webhook provider retries - headers: { 'Content-Type': 'application/json' }, - } - ) - } + // --- PHASE 4: Queue webhook execution via trigger.dev --- + try { + // Queue the webhook execution task + const handle = await tasks.trigger('webhook-execution', { + webhookId: foundWebhook.id, + workflowId: foundWorkflow.id, + userId: foundWorkflow.userId, + provider: foundWebhook.provider, + body, + headers: Object.fromEntries(request.headers.entries()), + path, + blockId: foundWebhook.blockId, + }) - // Check if the user has exceeded their usage limits - const usageCheck = await checkServerSideUsageLimits(foundWorkflow.userId) - if (usageCheck.isExceeded) { - logger.warn( - `[${requestId}] User ${foundWorkflow.userId} has exceeded usage limits. Skipping webhook execution.`, - { - currentUsage: usageCheck.currentUsage, - limit: usageCheck.limit, - workflowId: foundWorkflow.id, - } - ) + logger.info( + `[${requestId}] Queued webhook execution task ${handle.id} for ${foundWebhook.provider} webhook` + ) - // Return a successful response to avoid webhook retries, but don't execute the workflow - return new NextResponse( - JSON.stringify({ - status: 'error', - message: - usageCheck.message || - 'Usage limit exceeded. Please upgrade your plan to continue using webhooks.', - }), - { - status: 200, // Use 200 to prevent webhook provider retries - headers: { 'Content-Type': 'application/json' }, - } - ) - } + // Return immediate acknowledgment with provider-specific format + if (foundWebhook.provider === 'microsoftteams') { + // Microsoft Teams requires specific response format + return NextResponse.json({ + type: 'message', + text: 'Sim Studio', + }) + } - // Execute workflow for the webhook event - logger.info(`[${requestId}] Executing workflow for ${foundWebhook.provider} webhook`) - - const executionId = uuidv4() - return await processWebhook( - foundWebhook, - foundWorkflow, - body, - request, - executionId, - requestId - ) - } catch (error: any) { - logger.error(`[${requestId}] Error processing webhook:`, error) - return new NextResponse(`Internal server error: ${error.message}`, { status: 500 }) + return NextResponse.json({ message: 'Webhook processed' }) + } catch (error: any) { + logger.error(`[${requestId}] Failed to queue webhook execution:`, error) + + // Still return 200 to prevent webhook provider retries + if (foundWebhook.provider === 'microsoftteams') { + // Microsoft Teams requires specific response format + return NextResponse.json({ + type: 'message', + text: 'Webhook processing failed', + }) } - })() - // Race processing against timeout to ensure fast response - return Promise.race([timeoutPromise, processingPromise]) + return NextResponse.json({ message: 'Internal server error' }, { status: 200 }) + } } diff --git a/apps/sim/lib/webhooks/utils.ts b/apps/sim/lib/webhooks/utils.ts index f978bb223b..e625c5e575 100644 --- a/apps/sim/lib/webhooks/utils.ts +++ b/apps/sim/lib/webhooks/utils.ts @@ -1,18 +1,9 @@ -import { and, eq, sql } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import { v4 as uuidv4 } from 'uuid' import { createLogger } from '@/lib/logs/console/logger' -import { LoggingSession } from '@/lib/logs/execution/logging-session' -import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis' -import { decryptSecret } from '@/lib/utils' -import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' -import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { getOAuthToken } from '@/app/api/auth/oauth/utils' import { db } from '@/db' -import { environment as environmentTable, userStats, webhook } from '@/db/schema' -import { Executor } from '@/executor' -import { Serializer } from '@/serializer' -import { mergeSubblockStateAsync } from '@/stores/workflows/server-utils' +import { webhook } from '@/db/schema' const logger = createLogger('WebhookUtils') @@ -148,67 +139,6 @@ export async function validateSlackSignature( } } -/** - * Process WhatsApp message deduplication - */ -export async function processWhatsAppDeduplication( - requestId: string, - messages: any[] -): Promise { - if (messages.length > 0) { - const message = messages[0] - const messageId = message.id - - if (messageId) { - const whatsappMsgKey = `whatsapp:msg:${messageId}` - - try { - const isDuplicate = await hasProcessedMessage(whatsappMsgKey) - if (isDuplicate) { - logger.info(`[${requestId}] Duplicate WhatsApp message detected: ${messageId}`) - return new NextResponse('Duplicate message', { status: 200 }) - } - - // Mark as processed BEFORE processing - await markMessageAsProcessed(whatsappMsgKey, 60 * 60 * 24) - } catch (error) { - logger.error(`[${requestId}] Error in WhatsApp deduplication`, error) - // Continue processing - } - } - } - - return null -} - -/** - * Process generic deduplication using request hash - */ -export async function processGenericDeduplication( - requestId: string, - path: string, - body: any -): Promise { - try { - const requestHash = await generateRequestHash(path, body) - const genericMsgKey = `generic:${requestHash}` - - const isDuplicate = await hasProcessedMessage(genericMsgKey) - if (isDuplicate) { - logger.info(`[${requestId}] Duplicate request detected with hash: ${requestHash}`) - return new NextResponse('Duplicate request', { status: 200 }) - } - - // Mark as processed - await markMessageAsProcessed(genericMsgKey, 60 * 60 * 24) - } catch (error) { - logger.error(`[${requestId}] Error in generic deduplication`, error) - // Continue processing - } - - return null -} - /** * Format webhook input based on provider */ @@ -471,375 +401,6 @@ export function formatWebhookInput( } } -/** - * Execute workflow with the provided input - */ -export async function executeWorkflowFromPayload( - foundWorkflow: any, - input: any, - executionId: string, - requestId: string, - startBlockId?: string | null -): Promise { - // Add log at the beginning of this function for clarity - logger.info(`[${requestId}] Preparing to execute workflow`, { - workflowId: foundWorkflow.id, - executionId, - triggerSource: 'webhook-payload', - }) - - const loggingSession = new LoggingSession(foundWorkflow.id, executionId, 'webhook', requestId) - - try { - // Load workflow data from normalized tables - logger.debug(`[${requestId}] Loading workflow ${foundWorkflow.id} from normalized tables`) - const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id) - - if (!normalizedData) { - logger.error(`[${requestId}] TRACE: No normalized data found for workflow`, { - workflowId: foundWorkflow.id, - hasNormalizedData: false, - }) - throw new Error(`Workflow ${foundWorkflow.id} data not found in normalized tables`) - } - - // Use normalized data for execution - const { blocks, edges, loops, parallels } = normalizedData - logger.info(`[${requestId}] Loaded workflow ${foundWorkflow.id} from normalized tables`) - - // DEBUG: Log state information - logger.debug(`[${requestId}] TRACE: Retrieved workflow state from normalized tables`, { - workflowId: foundWorkflow.id, - blockCount: Object.keys(blocks || {}).length, - edgeCount: (edges || []).length, - loopCount: Object.keys(loops || {}).length, - }) - - logger.debug( - `[${requestId}] Merging subblock states for workflow ${foundWorkflow.id} (Execution: ${executionId})` - ) - - const mergeStartTime = Date.now() - const mergedStates = await mergeSubblockStateAsync(blocks, foundWorkflow.id) - logger.debug(`[${requestId}] TRACE: State merging complete`, { - duration: `${Date.now() - mergeStartTime}ms`, - mergedBlockCount: Object.keys(mergedStates).length, - }) - - // Retrieve and decrypt environment variables - const [userEnv] = await db - .select() - .from(environmentTable) - .where(eq(environmentTable.userId, foundWorkflow.userId)) - .limit(1) - let decryptedEnvVars: Record = {} - if (userEnv) { - // Decryption logic - const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( - async ([key, encryptedValue]) => { - try { - const { decrypted } = await decryptSecret(encryptedValue as string) - return [key, decrypted] as const - } catch (error: any) { - logger.error( - `[${requestId}] Failed to decrypt environment variable "${key}" (Execution: ${executionId})`, - error - ) - throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) - } - } - ) - const decryptedEntries = await Promise.all(decryptionPromises) - decryptedEnvVars = Object.fromEntries(decryptedEntries) - } else { - logger.debug(`[${requestId}] TRACE: No environment variables found for user`, { - userId: foundWorkflow.userId, - }) - } - - await loggingSession.safeStart({ - userId: foundWorkflow.userId, - workspaceId: foundWorkflow.workspaceId, - variables: decryptedEnvVars, - }) - - // Process block states (extract subBlock values, parse responseFormat) - const blockStatesStartTime = Date.now() - const currentBlockStates = Object.entries(mergedStates).reduce( - (acc, [id, block]) => { - acc[id] = Object.entries(block.subBlocks).reduce( - (subAcc, [key, subBlock]) => { - subAcc[key] = subBlock.value - return subAcc - }, - {} as Record - ) - return acc - }, - {} as Record> - ) - - const processedBlockStates = Object.entries(currentBlockStates).reduce( - (acc, [blockId, blockState]) => { - const processedState = { ...blockState } - if (processedState.responseFormat) { - try { - if (typeof processedState.responseFormat === 'string') { - processedState.responseFormat = JSON.parse(processedState.responseFormat) - } - if ( - processedState.responseFormat && - typeof processedState.responseFormat === 'object' - ) { - if (!processedState.responseFormat.schema && !processedState.responseFormat.name) { - processedState.responseFormat = { - name: 'response_schema', - schema: processedState.responseFormat, - strict: true, - } - } - } - acc[blockId] = processedState - } catch (error) { - logger.warn( - `[${requestId}] Failed to parse responseFormat for block ${blockId} (Execution: ${executionId})`, - error - ) - acc[blockId] = blockState - } - } else { - acc[blockId] = blockState - } - return acc - }, - {} as Record> - ) - - // DEBUG: Log block state processing - logger.debug(`[${requestId}] TRACE: Block states processed`, { - duration: `${Date.now() - blockStatesStartTime}ms`, - blockCount: Object.keys(processedBlockStates).length, - }) - - // Serialize and get workflow variables - const serializeStartTime = Date.now() - const serializedWorkflow = new Serializer().serializeWorkflow( - mergedStates as any, - edges, - loops, - parallels - ) - let workflowVariables = {} - if (foundWorkflow.variables) { - try { - if (typeof foundWorkflow.variables === 'string') { - workflowVariables = JSON.parse(foundWorkflow.variables) - } else { - workflowVariables = foundWorkflow.variables - } - } catch (error) { - logger.error( - `[${requestId}] Failed to parse workflow variables: ${foundWorkflow.id} (Execution: ${executionId})`, - error - ) - } - } - - // DEBUG: Log serialization completion - logger.debug(`[${requestId}] TRACE: Workflow serialized`, { - duration: `${Date.now() - serializeStartTime}ms`, - hasWorkflowVars: Object.keys(workflowVariables).length > 0, - }) - - logger.debug(`[${requestId}] Starting workflow execution`, { - executionId, - blockCount: Object.keys(processedBlockStates).length, - }) - - // Log blocks for debugging (if any missing or invalid) - if (Object.keys(processedBlockStates).length === 0) { - logger.error(`[${requestId}] No blocks found in workflow state - this will likely fail`) - } else { - logger.debug(`[${requestId}] Block IDs for execution:`, { - blockIds: Object.keys(processedBlockStates).slice(0, 5), // Log just a few block IDs for debugging - totalBlocks: Object.keys(processedBlockStates).length, - }) - } - - // Ensure workflow variables exist - if (!workflowVariables || Object.keys(workflowVariables).length === 0) { - logger.debug(`[${requestId}] No workflow variables defined, using empty object`) - workflowVariables = {} - } - - // Validate input format for Airtable webhooks to prevent common errors - if ( - input?.airtableChanges && - (!Array.isArray(input.airtableChanges) || input.airtableChanges.length === 0) - ) { - logger.warn( - `[${requestId}] Invalid Airtable input format - airtableChanges should be a non-empty array` - ) - } - - // DEBUG: Log critical moment before executor creation - logger.info(`[${requestId}] TRACE: Creating workflow executor`, { - workflowId: foundWorkflow.id, - hasSerializedWorkflow: !!serializedWorkflow, - blockCount: Object.keys(processedBlockStates).length, - timestamp: new Date().toISOString(), - }) - - const executor = new Executor( - serializedWorkflow, - processedBlockStates, - decryptedEnvVars, - input, - workflowVariables - ) - - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - // Log workflow execution start time for tracking - const executionStartTime = Date.now() - logger.info(`[${requestId}] TRACE: Executor instantiated, starting workflow execution now`, { - workflowId: foundWorkflow.id, - timestamp: new Date().toISOString(), - }) - - // Add direct detailed logging right before executing - logger.info( - `[${requestId}] EXECUTION_MONITOR: About to call executor.execute() - CRITICAL POINT`, - { - workflowId: foundWorkflow.id, - executionId: executionId, - timestamp: new Date().toISOString(), - } - ) - - // This is THE critical line where the workflow actually executes - const result = await executor.execute(foundWorkflow.id, startBlockId || undefined) - - // Check if we got a StreamingExecution result (with stream + execution properties) - // For webhook executions, we only care about the ExecutionResult part, not the stream - const executionResult = 'stream' in result && 'execution' in result ? result.execution : result - - // Add direct detailed logging right after executing - logger.info(`[${requestId}] EXECUTION_MONITOR: executor.execute() completed with result`, { - workflowId: foundWorkflow.id, - executionId: executionId, - success: executionResult.success, - resultType: result ? typeof result : 'undefined', - timestamp: new Date().toISOString(), - }) - - // Log completion and timing - const executionDuration = Date.now() - executionStartTime - logger.info(`[${requestId}] TRACE: Workflow execution completed`, { - workflowId: foundWorkflow.id, - success: executionResult.success, - duration: `${executionDuration}ms`, - actualDurationMs: executionDuration, - timestamp: new Date().toISOString(), - }) - - logger.info(`[${requestId}] Workflow execution finished`, { - executionId, - success: executionResult.success, - durationMs: executionResult.metadata?.duration || executionDuration, - actualDurationMs: executionDuration, - }) - - // Update counts and stats if successful - if (executionResult.success) { - await updateWorkflowRunCounts(foundWorkflow.id) - await db - .update(userStats) - .set({ - totalWebhookTriggers: sql`total_webhook_triggers + 1`, - lastActive: new Date(), - }) - .where(eq(userStats.userId, foundWorkflow.userId)) - } - - // Calculate total duration for logging - const totalDuration = executionResult.metadata?.duration || 0 - - const traceSpans = (executionResult.logs || []).map((blockLog: any, index: number) => { - let output = blockLog.output - if (!blockLog.success && blockLog.error) { - output = { - error: blockLog.error, - success: false, - ...(blockLog.output || {}), - } - } - - return { - id: blockLog.blockId, - name: `Block ${blockLog.blockName || blockLog.blockType} (${blockLog.blockType || 'unknown'})`, - type: blockLog.blockType || 'unknown', - duration: blockLog.durationMs || 0, - startTime: blockLog.startedAt, - endTime: blockLog.endedAt || blockLog.startedAt, - status: blockLog.success ? 'success' : 'error', - blockId: blockLog.blockId, - input: blockLog.input, - output: output, - tokens: blockLog.output?.tokens?.total || 0, - relativeStartMs: index * 100, - children: [], - toolCalls: (blockLog as any).toolCalls || [], - } - }) - - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: totalDuration || 0, - finalOutput: executionResult.output || {}, - traceSpans: (traceSpans || []) as any, - }) - - // DEBUG: Final success log - logger.info(`[${requestId}] TRACE: Execution logs persisted successfully`, { - workflowId: foundWorkflow.id, - executionId, - timestamp: new Date().toISOString(), - }) - } catch (error: any) { - // DEBUG: Detailed error information - logger.error(`[${requestId}] TRACE: Error during workflow execution`, { - workflowId: foundWorkflow.id, - executionId, - errorType: error.constructor.name, - errorMessage: error.message, - stack: error.stack, - timestamp: new Date().toISOString(), - }) - - logger.error(`[${requestId}] Error executing workflow`, { - workflowId: foundWorkflow.id, - executionId, - error: error.message, - stack: error.stack, - }) - // Error logging handled by logging session - - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - error: { - message: error.message || 'Webhook workflow execution failed', - stackTrace: error.stack, - }, - }) - - // Re-throw the error so the caller knows it failed - throw error - } -} - /** * Validates a Microsoft Teams outgoing webhook request signature using HMAC SHA-256 * @param hmacSecret - Microsoft Teams HMAC secret (base64 encoded) @@ -1378,26 +939,23 @@ export async function fetchAndProcessAirtablePayloads( } ) - await executeWorkflowFromPayload(workflowData, input, requestId, requestId, null) - - // COMPLETION LOG - This will only appear if execution succeeds - logger.info(`[${requestId}] CRITICAL_TRACE: Workflow execution completed successfully`, { + // Return the processed input for the trigger.dev task to handle + logger.info(`[${requestId}] CRITICAL_TRACE: Airtable changes processed, returning input`, { workflowId: workflowData.id, + recordCount: finalConsolidatedChanges.length, timestamp: new Date().toISOString(), }) - } catch (executionError: any) { - // Errors logged within executeWorkflowFromPayload - logger.error(`[${requestId}] CRITICAL_TRACE: Workflow execution failed with error`, { + + return input + } catch (processingError: any) { + logger.error(`[${requestId}] CRITICAL_TRACE: Error processing Airtable changes`, { workflowId: workflowData.id, - error: executionError.message, - stack: executionError.stack, + error: processingError.message, + stack: processingError.stack, timestamp: new Date().toISOString(), }) - logger.error( - `[${requestId}] Error during workflow execution triggered by Airtable polling`, - executionError - ) + throw processingError } } else { // DEBUG: Log when no changes are found @@ -1429,166 +987,6 @@ export async function fetchAndProcessAirtablePayloads( }) } -/** - * Process webhook verification and authorization - */ -/** - * Handle standard webhooks with synchronous execution - */ -async function processStandardWebhook( - foundWebhook: any, - foundWorkflow: any, - input: any, - executionId: string, - requestId: string -): Promise { - logger.info( - `[${requestId}] Executing workflow ${foundWorkflow.id} for webhook ${foundWebhook.id} (Execution: ${executionId})` - ) - - await executeWorkflowFromPayload( - foundWorkflow, - input, - executionId, - requestId, - foundWebhook.blockId - ) - - // Since executeWorkflowFromPayload handles logging and errors internally, - // we just need to return a success response for synchronous webhooks. - // Microsoft Teams requires a specific response format. - - if (foundWebhook.provider === 'microsoftteams') { - return NextResponse.json( - { - type: 'message', - text: 'Sim Studio', - }, - { status: 200 } - ) - } - - return NextResponse.json({ message: 'Webhook processed' }, { status: 200 }) -} - -/** - * Handle webhook processing errors with provider-specific responses - */ -function handleWebhookError( - error: any, - foundWebhook: any, - executionId: string, - requestId: string -): NextResponse { - logger.error( - `[${requestId}] Error in processWebhook for ${foundWebhook.id} (Execution: ${executionId})`, - error - ) - - // For Microsoft Teams outgoing webhooks, return the expected error format - if (foundWebhook.provider === 'microsoftteams') { - return NextResponse.json( - { - type: 'message', - text: 'Webhook processing failed', - }, - { status: 200 } - ) // Still return 200 to prevent Teams from showing additional error messages - } - - return new NextResponse(`Internal Server Error: ${error.message}`, { - status: 500, - }) -} - -export async function processWebhook( - foundWebhook: any, - foundWorkflow: any, - body: any, - request: NextRequest, - executionId: string, - requestId: string -): Promise { - try { - // --- Handle Airtable differently - it should always use fetchAndProcessAirtablePayloads --- - if (foundWebhook.provider === 'airtable') { - logger.info(`[${requestId}] Routing Airtable webhook through dedicated processor`) - await fetchAndProcessAirtablePayloads(foundWebhook, foundWorkflow, requestId) - return NextResponse.json({ message: 'Airtable webhook processed' }, { status: 200 }) - } - - // --- Provider-specific Auth/Verification (excluding Airtable/WhatsApp/Slack/MicrosoftTeams handled earlier) --- - if ( - foundWebhook.provider && - !['airtable', 'whatsapp', 'slack', 'microsoftteams'].includes(foundWebhook.provider) - ) { - const verificationResponse = verifyProviderWebhook(foundWebhook, request, requestId) - if (verificationResponse) { - return verificationResponse - } - } - - // --- Format Input based on provider (excluding Airtable) --- - const input = formatWebhookInput(foundWebhook, foundWorkflow, body, request) - - if (!input && foundWebhook.provider === 'whatsapp') { - return new NextResponse('No messages in WhatsApp payload', { status: 200 }) - } - - // --- Route to standard processor for all providers --- - return await processStandardWebhook(foundWebhook, foundWorkflow, input, executionId, requestId) - } catch (error: any) { - return handleWebhookError(error, foundWebhook, executionId, requestId) - } -} - -/** - * Generate a hash for request deduplication - */ -export async function generateRequestHash(path: string, body: any): Promise { - try { - const normalizedBody = normalizeBody(body) - const requestString = `${path}:${JSON.stringify(normalizedBody)}` - let hash = 0 - for (let i = 0; i < requestString.length; i++) { - const char = requestString.charCodeAt(i) - hash = (hash << 5) - hash + char - hash = hash & hash // Convert to 32bit integer - } - return `request:${path}:${hash}` - } catch (_error) { - return `request:${path}:${uuidv4()}` - } -} - -/** - * Normalize the body for consistent hashing - */ -export function normalizeBody(body: any): any { - if (!body || typeof body !== 'object') return body - const result = Array.isArray(body) ? [...body] : { ...body } - const fieldsToRemove = [ - 'timestamp', - 'random', - 'nonce', - 'requestId', - 'event_id', - 'event_time' /* Add other volatile fields */, - ] // Made case-insensitive check below - if (Array.isArray(result)) { - return result.map((item) => normalizeBody(item)) - } - for (const key in result) { - // Use lowercase check for broader matching - if (fieldsToRemove.includes(key.toLowerCase())) { - delete result[key] - } else if (typeof result[key] === 'object' && result[key] !== null) { - result[key] = normalizeBody(result[key]) - } - } - return result -} - // Define an interface for AirtableChange export interface AirtableChange { tableId: string diff --git a/apps/sim/trigger/webhook-execution.ts b/apps/sim/trigger/webhook-execution.ts new file mode 100644 index 0000000000..6d1e7ffa46 --- /dev/null +++ b/apps/sim/trigger/webhook-execution.ts @@ -0,0 +1,289 @@ +import { task } from '@trigger.dev/sdk/v3' +import { eq, sql } from 'drizzle-orm' +import { v4 as uuidv4 } from 'uuid' +import { checkServerSideUsageLimits } from '@/lib/billing' +import { createLogger } from '@/lib/logs/console/logger' +import { LoggingSession } from '@/lib/logs/execution/logging-session' +import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' +import { decryptSecret } from '@/lib/utils' +import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils' +import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers' +import { updateWorkflowRunCounts } from '@/lib/workflows/utils' +import { db } from '@/db' +import { environment as environmentTable, userStats } from '@/db/schema' +import { Executor } from '@/executor' +import { Serializer } from '@/serializer' +import { mergeSubblockState } from '@/stores/workflows/server-utils' + +const logger = createLogger('TriggerWebhookExecution') + +export const webhookExecution = task({ + id: 'webhook-execution', + retry: { + maxAttempts: 1, + }, + run: async (payload: { + webhookId: string + workflowId: string + userId: string + provider: string + body: any + headers: Record + path: string + blockId?: string + }) => { + const executionId = uuidv4() + const requestId = executionId.slice(0, 8) + + logger.info(`[${requestId}] Starting webhook execution via trigger.dev`, { + webhookId: payload.webhookId, + workflowId: payload.workflowId, + provider: payload.provider, + userId: payload.userId, + executionId, + }) + + // Initialize logging session outside try block so it's available in catch + const loggingSession = new LoggingSession(payload.workflowId, executionId, 'webhook', requestId) + + try { + // Check usage limits first + const usageCheck = await checkServerSideUsageLimits(payload.userId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping webhook execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: payload.workflowId, + } + ) + throw new Error( + usageCheck.message || + 'Usage limit exceeded. Please upgrade your plan to continue using webhooks.' + ) + } + + // Load workflow from normalized tables + const workflowData = await loadWorkflowFromNormalizedTables(payload.workflowId) + if (!workflowData) { + throw new Error(`Workflow not found: ${payload.workflowId}`) + } + + const { blocks, edges, loops, parallels } = workflowData + + // Get environment variables (matching workflow-execution pattern) + const [userEnv] = await db + .select() + .from(environmentTable) + .where(eq(environmentTable.userId, payload.userId)) + .limit(1) + + let decryptedEnvVars: Record = {} + if (userEnv) { + const decryptionPromises = Object.entries((userEnv.variables as any) || {}).map( + async ([key, encryptedValue]) => { + try { + const { decrypted } = await decryptSecret(encryptedValue as string) + return [key, decrypted] as const + } catch (error: any) { + logger.error(`[${requestId}] Failed to decrypt environment variable "${key}":`, error) + throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`) + } + } + ) + + const decryptedPairs = await Promise.all(decryptionPromises) + decryptedEnvVars = Object.fromEntries(decryptedPairs) + } + + // Start logging session + await loggingSession.safeStart({ + userId: payload.userId, + workspaceId: '', // TODO: Get from workflow if needed + variables: decryptedEnvVars, + }) + + // Merge subblock states (matching workflow-execution pattern) + const mergedStates = mergeSubblockState(blocks, {}) + + // Process block states for execution + const processedBlockStates = Object.entries(mergedStates).reduce( + (acc, [blockId, blockState]) => { + acc[blockId] = Object.entries(blockState.subBlocks).reduce( + (subAcc, [key, subBlock]) => { + subAcc[key] = subBlock.value + return subAcc + }, + {} as Record + ) + return acc + }, + {} as Record> + ) + + // Handle workflow variables (for now, use empty object since we don't have workflow metadata) + const workflowVariables = {} + + // Create serialized workflow + const serializer = new Serializer() + const serializedWorkflow = serializer.serializeWorkflow( + mergedStates, + edges, + loops || {}, + parallels || {} + ) + + // Handle special Airtable case + if (payload.provider === 'airtable') { + logger.info( + `[${requestId}] Processing Airtable webhook via fetchAndProcessAirtablePayloads` + ) + + const webhookData = { + id: payload.webhookId, + provider: payload.provider, + providerConfig: {}, // Will be loaded within fetchAndProcessAirtablePayloads + } + + // Create a mock workflow object for Airtable processing + const mockWorkflow = { + id: payload.workflowId, + userId: payload.userId, + } + + await fetchAndProcessAirtablePayloads(webhookData, mockWorkflow, requestId) + + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + finalOutput: { message: 'Airtable webhook processed' }, + traceSpans: [], + }) + + return { + success: true, + workflowId: payload.workflowId, + executionId, + output: { message: 'Airtable webhook processed' }, + executedAt: new Date().toISOString(), + } + } + + // Format input for standard webhooks + const mockWebhook = { + provider: payload.provider, + blockId: payload.blockId, + } + const mockWorkflow = { + id: payload.workflowId, + userId: payload.userId, + } + const mockRequest = { + headers: new Map(Object.entries(payload.headers)), + } as any + + const input = formatWebhookInput(mockWebhook, mockWorkflow, payload.body, mockRequest) + + if (!input && payload.provider === 'whatsapp') { + logger.info(`[${requestId}] No messages in WhatsApp payload, skipping execution`) + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + finalOutput: { message: 'No messages in WhatsApp payload' }, + traceSpans: [], + }) + return { + success: true, + workflowId: payload.workflowId, + executionId, + output: { message: 'No messages in WhatsApp payload' }, + executedAt: new Date().toISOString(), + } + } + + // Create executor and execute + const executor = new Executor( + serializedWorkflow, + processedBlockStates, + decryptedEnvVars, + input || {}, + workflowVariables + ) + + // Set up logging on the executor + loggingSession.setupExecutor(executor) + + logger.info(`[${requestId}] Executing workflow for ${payload.provider} webhook`) + + // Execute the workflow + const result = await executor.execute(payload.workflowId, payload.blockId) + + // Check if we got a StreamingExecution result + const executionResult = + 'stream' in result && 'execution' in result ? result.execution : result + + logger.info(`[${requestId}] Webhook execution completed`, { + success: executionResult.success, + workflowId: payload.workflowId, + provider: payload.provider, + }) + + // Update workflow run counts on success + if (executionResult.success) { + await updateWorkflowRunCounts(payload.workflowId) + + // Track execution in user stats + await db + .update(userStats) + .set({ + totalWebhookTriggers: sql`total_webhook_triggers + 1`, + lastActive: sql`now()`, + }) + .where(eq(userStats.userId, payload.userId)) + } + + // Build trace spans and complete logging session + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: totalDuration || 0, + finalOutput: executionResult.output || {}, + traceSpans: traceSpans as any, + }) + + return { + success: executionResult.success, + workflowId: payload.workflowId, + executionId, + output: executionResult.output, + executedAt: new Date().toISOString(), + provider: payload.provider, + } + } catch (error: any) { + logger.error(`[${requestId}] Webhook execution failed`, { + error: error.message, + stack: error.stack, + workflowId: payload.workflowId, + provider: payload.provider, + }) + + // Complete logging session with error (matching workflow-execution pattern) + try { + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { + message: error.message || 'Webhook execution failed', + stackTrace: error.stack, + }, + }) + } catch (loggingError) { + logger.error(`[${requestId}] Failed to complete logging session`, loggingError) + } + + throw error // Let Trigger.dev handle retries + } + }, +}) diff --git a/apps/sim/trigger/workflow-execution.ts b/apps/sim/trigger/workflow-execution.ts index 64827d976e..53fc7edcee 100644 --- a/apps/sim/trigger/workflow-execution.ts +++ b/apps/sim/trigger/workflow-execution.ts @@ -1,6 +1,7 @@ import { task } from '@trigger.dev/sdk/v3' import { eq, sql } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' +import { checkServerSideUsageLimits } from '@/lib/billing' import { createLogger } from '@/lib/logs/console/logger' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' @@ -43,6 +44,22 @@ export const workflowExecution = task({ const loggingSession = new LoggingSession(workflowId, executionId, triggerType, requestId) try { + const usageCheck = await checkServerSideUsageLimits(payload.userId) + if (usageCheck.isExceeded) { + logger.warn( + `[${requestId}] User ${payload.userId} has exceeded usage limits. Skipping workflow execution.`, + { + currentUsage: usageCheck.currentUsage, + limit: usageCheck.limit, + workflowId: payload.workflowId, + } + ) + throw new Error( + usageCheck.message || + 'Usage limit exceeded. Please upgrade your plan to continue using workflows.' + ) + } + // Load workflow data from normalized tables const normalizedData = await loadWorkflowFromNormalizedTables(workflowId) if (!normalizedData) {