diff --git a/apps/sim/app/api/__test-utils__/utils.ts b/apps/sim/app/api/__test-utils__/utils.ts index 5b8be3b565..ce02592a9b 100644 --- a/apps/sim/app/api/__test-utils__/utils.ts +++ b/apps/sim/app/api/__test-utils__/utils.ts @@ -403,7 +403,10 @@ export function mockExecutionDependencies() { provider: 'provider', providerConfig: 'providerConfig', }, - workflow: { id: 'id', userId: 'userId' }, + workflow: { + id: 'id', + userId: 'userId', + }, workflowSchedule: { id: 'id', workflowId: 'workflowId', diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 5fada4cc98..8406395369 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -1,6 +1,6 @@ import { db } from '@sim/db' -import { chat, userStats, workflow } from '@sim/db/schema' -import { eq, sql } from 'drizzle-orm' +import { chat, workflow } from '@sim/db/schema' +import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { checkServerSideUsageLimits } from '@/lib/billing' @@ -16,7 +16,7 @@ import { TriggerUtils } from '@/lib/workflows/triggers' import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants' import { getBlock } from '@/blocks' import { Executor } from '@/executor' -import type { BlockLog, ExecutionResult } from '@/executor/types' +import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types' import { Serializer } from '@/serializer' import { mergeSubblockState } from '@/stores/workflows/server-utils' import type { WorkflowState } from '@/stores/workflows/workflow/types' @@ -548,6 +548,7 @@ export async function executeWorkflowForChat( const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() + let executionResultForLogging: ExecutionResult | null = null try { const streamedContent = new Map() @@ -603,6 +604,7 @@ export async function executeWorkflowForChat( endedAt: new Date().toISOString(), totalDurationMs: 0, error: { message: errorMessage }, + traceSpans: [], }) sessionCompleted = true } @@ -644,16 +646,24 @@ export async function executeWorkflowForChat( // Set up logging on the executor loggingSession.setupExecutor(executor) - let result + let result: ExecutionResult | StreamingExecution | undefined try { result = await executor.execute(workflowId, startBlockId) } catch (error: any) { logger.error(`[${requestId}] Chat workflow execution failed:`, error) if (!sessionCompleted) { + const executionResult = error?.executionResult || { + success: false, + output: {}, + logs: [], + } + const { traceSpans } = buildTraceSpans(executionResult) + await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: 0, error: { message: error.message || 'Chat workflow execution failed' }, + traceSpans, }) sessionCompleted = true } @@ -677,17 +687,14 @@ export async function executeWorkflowForChat( ? (result.execution as ExecutionResult) : (result as ExecutionResult) - if (executionResult?.logs) { - // Update streamed content and apply tokenization - process regardless of overall success - // This ensures partial successes (some agents succeed, some fail) still return results + executionResultForLogging = executionResult - // Add newlines between different agent outputs for better readability + if (executionResult?.logs) { const processedOutputs = new Set() executionResult.logs.forEach((log: BlockLog) => { if (streamedContent.has(log.blockId)) { const content = streamedContent.get(log.blockId) if (log.output && content) { - // Add newline separation between different outputs (but not before the first one) const separator = processedOutputs.size > 0 ? '\n\n' : '' log.output.content = separator + content processedOutputs.add(log.blockId) @@ -695,13 +702,10 @@ export async function executeWorkflowForChat( } }) - // Also process non-streamed outputs from selected blocks (like function blocks) - // This uses the same logic as the chat panel to ensure identical behavior const nonStreamingLogs = executionResult.logs.filter( (log: BlockLog) => !streamedContent.has(log.blockId) ) - // Extract the exact same functions used by the chat panel const extractBlockIdFromOutputId = (outputId: string): string => { return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0] } @@ -719,7 +723,6 @@ export async function executeWorkflowForChat( try { return JSON.parse(output.content) } catch (e) { - // Fallback to original structure if parsing fails return output } } @@ -727,13 +730,11 @@ export async function executeWorkflowForChat( return output } - // Filter outputs that have matching logs (exactly like chat panel) const outputsToRender = selectedOutputIds.filter((outputId) => { const blockIdForOutput = extractBlockIdFromOutputId(outputId) return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput) }) - // Process each selected output (exactly like chat panel) for (const outputId of outputsToRender) { const blockIdForOutput = extractBlockIdFromOutputId(outputId) const path = extractPathFromOutputId(outputId, blockIdForOutput) @@ -743,7 +744,6 @@ export async function executeWorkflowForChat( let outputValue: any = log.output if (path) { - // Parse JSON content safely (exactly like chat panel) outputValue = parseOutputContentSafely(outputValue) const pathParts = path.split('.') @@ -758,16 +758,13 @@ export async function executeWorkflowForChat( } if (outputValue !== undefined) { - // Add newline separation between different outputs const separator = processedOutputs.size > 0 ? '\n\n' : '' - // Format the output exactly like the chat panel const formattedOutput = typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue, null, 2) - // Update the log content if (!log.output.content) { log.output.content = separator + formattedOutput } else { @@ -778,7 +775,6 @@ export async function executeWorkflowForChat( } } - // Process all logs for streaming tokenization const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent) logger.info(`Processed ${processedCount} blocks for streaming tokenization`) @@ -793,23 +789,6 @@ export async function executeWorkflowForChat( } ;(enrichedResult.metadata as any).conversationId = conversationId } - // Use the executionId created at the beginning of this function - logger.debug(`Using execution ID for deployed chat: ${executionId}`) - - if (executionResult.success) { - try { - await db - .update(userStats) - .set({ - totalChatExecutions: sql`total_chat_executions + 1`, - lastActive: new Date(), - }) - .where(eq(userStats.userId, deployment.userId)) - logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`) - } catch (error) { - logger.error(`Failed to update user stats for deployed chat:`, error) - } - } } if (!(result && typeof result === 'object' && 'stream' in result)) { @@ -833,30 +812,35 @@ export async function executeWorkflowForChat( controller.close() } catch (error: any) { - // Handle any errors that occur in the stream - logger.error(`[${requestId}] Stream error:`, error) + logger.error(`[${requestId}] Chat execution streaming error:`, error) - // Send error event to client - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - `data: ${JSON.stringify({ - event: 'error', - error: error.message || 'An unexpected error occurred', - })}\n\n` - ) - ) - - // Try to complete the logging session with error if not already completed if (!sessionCompleted && loggingSession) { + const executionResult = executionResultForLogging || + (error?.executionResult as ExecutionResult | undefined) || { + success: false, + output: {}, + logs: [], + } + const { traceSpans } = buildTraceSpans(executionResult) + await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: 0, error: { message: error.message || 'Stream processing error' }, + traceSpans, }) sessionCompleted = true } + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + event: 'error', + error: error.message || 'Stream processing error', + })}\n\n` + ) + ) + controller.close() } }, diff --git a/apps/sim/app/api/schedules/execute/route.ts b/apps/sim/app/api/schedules/execute/route.ts index 6d682465c6..81bb6a6445 100644 --- a/apps/sim/app/api/schedules/execute/route.ts +++ b/apps/sim/app/api/schedules/execute/route.ts @@ -4,6 +4,7 @@ import { and, eq, lte, not, sql } from 'drizzle-orm' import { NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' +import { getApiKeyOwnerUserId } from '@/lib/api-key/service' import { checkServerSideUsageLimits } from '@/lib/billing' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' @@ -17,7 +18,7 @@ import { getSubBlockValue, } from '@/lib/schedules/utils' import { decryptSecret, generateRequestId } from '@/lib/utils' -import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' +import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { Executor } from '@/executor' import { Serializer } from '@/serializer' @@ -106,12 +107,22 @@ export async function GET() { continue } + const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId) + + if (!actorUserId) { + logger.warn( + `[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.` + ) + runningExecutions.delete(schedule.workflowId) + continue + } + // Check rate limits for scheduled execution (checks both personal and org subscriptions) - const userSubscription = await getHighestPrioritySubscription(workflowRecord.userId) + const userSubscription = await getHighestPrioritySubscription(actorUserId) const rateLimiter = new RateLimiter() const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - workflowRecord.userId, + actorUserId, userSubscription, 'schedule', false // schedules are always sync @@ -149,7 +160,7 @@ export async function GET() { continue } - const usageCheck = await checkServerSideUsageLimits(workflowRecord.userId) + const usageCheck = await checkServerSideUsageLimits(actorUserId) if (usageCheck.isExceeded) { logger.warn( `[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`, @@ -159,26 +170,19 @@ export async function GET() { workflowId: schedule.workflowId, } ) - - // Error logging handled by logging session - - const retryDelay = 24 * 60 * 60 * 1000 // 24 hour delay for exceeded limits - const nextRetryAt = new Date(now.getTime() + retryDelay) - try { + const deployedData = await loadDeployedWorkflowState(schedule.workflowId) + const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any) await db .update(workflowSchedule) - .set({ - updatedAt: now, - nextRunAt: nextRetryAt, - }) + .set({ updatedAt: now, nextRunAt }) .where(eq(workflowSchedule.id, schedule.id)) - - logger.debug(`[${requestId}] Updated next retry time due to usage limits`) - } catch (updateError) { - logger.error(`[${requestId}] Error updating schedule for usage limits:`, updateError) + } catch (calcErr) { + logger.warn( + `[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`, + calcErr + ) } - runningExecutions.delete(schedule.workflowId) continue } @@ -206,11 +210,25 @@ export async function GET() { const parallels = deployedData.parallels logger.info(`[${requestId}] Loaded deployed workflow ${schedule.workflowId}`) + // Validate that the schedule's trigger block exists in the deployed state + if (schedule.blockId) { + const blockExists = await blockExistsInDeployment( + schedule.workflowId, + schedule.blockId + ) + if (!blockExists) { + logger.warn( + `[${requestId}] Schedule trigger block ${schedule.blockId} not found in deployed workflow ${schedule.workflowId}. Skipping execution.` + ) + return { skip: true, blocks: {} as Record } + } + } + const mergedStates = mergeSubblockState(blocks) // Retrieve environment variables with workspace precedence const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( - workflowRecord.userId, + actorUserId, workflowRecord.workspaceId || undefined ) const variables = EnvVarsSchema.parse({ @@ -355,7 +373,6 @@ export async function GET() { ) const input = { - workflowId: schedule.workflowId, _context: { workflowId: schedule.workflowId, }, @@ -363,7 +380,7 @@ export async function GET() { // Start logging with environment variables await loggingSession.safeStart({ - userId: workflowRecord.userId, + userId: actorUserId, workspaceId: workflowRecord.workspaceId || '', variables: variables || {}, }) @@ -407,7 +424,7 @@ export async function GET() { totalScheduledExecutions: sql`total_scheduled_executions + 1`, lastActive: now, }) - .where(eq(userStats.userId, workflowRecord.userId)) + .where(eq(userStats.userId, actorUserId)) logger.debug(`[${requestId}] Updated user stats for scheduled execution`) } catch (statsError) { @@ -446,6 +463,7 @@ export async function GET() { message: `Schedule execution failed before workflow started: ${earlyError.message}`, stackTrace: earlyError.stack, }, + traceSpans: [], }) } catch (loggingError) { logger.error( @@ -459,6 +477,12 @@ export async function GET() { } })() + // Check if execution was skipped (e.g., trigger block not found) + if ('skip' in executionSuccess && executionSuccess.skip) { + runningExecutions.delete(schedule.workflowId) + continue + } + if (executionSuccess.success) { logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`) @@ -565,6 +589,7 @@ export async function GET() { message: `Schedule execution failed: ${error.message}`, stackTrace: error.stack, }, + traceSpans: [], }) } catch (loggingError) { logger.error( diff --git a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts index 9d5b9268a2..9a827c9ece 100644 --- a/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts +++ b/apps/sim/app/api/webhooks/trigger/[path]/route.test.ts @@ -106,6 +106,24 @@ describe('Webhook Trigger API Route', () => { mockExecutionDependencies() mockTriggerDevSdk() + globalMockData.workflows.push({ + id: 'test-workflow-id', + userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', + }) + + vi.doMock('@/lib/api-key/service', async () => { + const actual = await vi.importActual('@/lib/api-key/service') + return { + ...(actual as Record), + getApiKeyOwnerUserId: vi + .fn() + .mockImplementation(async (pinnedApiKeyId: string | null | undefined) => + pinnedApiKeyId ? 'test-user-id' : null + ), + } + }) + vi.doMock('@/services/queue', () => ({ RateLimiter: vi.fn().mockImplementation(() => ({ checkRateLimit: vi.fn().mockResolvedValue({ @@ -222,6 +240,7 @@ describe('Webhook Trigger API Route', () => { globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', }) const req = createMockRequest('POST', { event: 'test', id: 'test-123' }) @@ -250,7 +269,11 @@ describe('Webhook Trigger API Route', () => { providerConfig: { requireAuth: true, token: 'test-token-123' }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) + globalMockData.workflows.push({ + id: 'test-workflow-id', + userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', + }) const headers = { 'Content-Type': 'application/json', @@ -281,7 +304,11 @@ describe('Webhook Trigger API Route', () => { }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) + globalMockData.workflows.push({ + id: 'test-workflow-id', + userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', + }) const headers = { 'Content-Type': 'application/json', @@ -308,7 +335,11 @@ describe('Webhook Trigger API Route', () => { providerConfig: { requireAuth: true, token: 'case-test-token' }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) + globalMockData.workflows.push({ + id: 'test-workflow-id', + userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', + }) vi.doMock('@trigger.dev/sdk', () => ({ tasks: { @@ -354,7 +385,11 @@ describe('Webhook Trigger API Route', () => { }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) + globalMockData.workflows.push({ + id: 'test-workflow-id', + userId: 'test-user-id', + pinnedApiKeyId: 'test-pinned-api-key-id', + }) vi.doMock('@trigger.dev/sdk', () => ({ tasks: { @@ -391,7 +426,6 @@ describe('Webhook Trigger API Route', () => { providerConfig: { requireAuth: true, token: 'correct-token' }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) const headers = { 'Content-Type': 'application/json', @@ -424,7 +458,6 @@ describe('Webhook Trigger API Route', () => { }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) const headers = { 'Content-Type': 'application/json', @@ -453,7 +486,6 @@ describe('Webhook Trigger API Route', () => { providerConfig: { requireAuth: true, token: 'required-token' }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) const req = createMockRequest('POST', { event: 'no.auth.test' }) const params = Promise.resolve({ path: 'test-path' }) @@ -482,7 +514,6 @@ describe('Webhook Trigger API Route', () => { }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) const headers = { 'Content-Type': 'application/json', @@ -515,7 +546,6 @@ describe('Webhook Trigger API Route', () => { }, workflowId: 'test-workflow-id', }) - globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' }) const headers = { 'Content-Type': 'application/json', diff --git a/apps/sim/app/api/workflows/[id]/deploy/route.ts b/apps/sim/app/api/workflows/[id]/deploy/route.ts index a2688eacc7..b153aa4fc0 100644 --- a/apps/sim/app/api/workflows/[id]/deploy/route.ts +++ b/apps/sim/app/api/workflows/[id]/deploy/route.ts @@ -293,6 +293,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ } } + // Attribution: this route is UI-only; require session user as actor + const actorUserId: string | null = session?.user?.id ?? null + if (!actorUserId) { + logger.warn(`[${requestId}] Unable to resolve actor user for workflow deployment: ${id}`) + return createErrorResponse('Unable to determine deploying user', 400) + } + await db.transaction(async (tx) => { const [{ maxVersion }] = await tx .select({ maxVersion: sql`COALESCE(MAX("version"), 0)` }) @@ -318,7 +325,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ state: currentState, isActive: true, createdAt: deployedAt, - createdBy: userId, + createdBy: actorUserId, }) const updateData: Record = { diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index fd4239e9d5..ea4bd5a3f7 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -5,6 +5,7 @@ import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' +import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service' import { getSession } from '@/lib/auth' import { checkServerSideUsageLimits } from '@/lib/billing' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' @@ -23,6 +24,7 @@ import { import { validateWorkflowAccess } from '@/app/api/workflows/middleware' import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils' import { Executor } from '@/executor' +import type { ExecutionResult } from '@/executor/types' import { Serializer } from '@/serializer' import { RateLimitError, RateLimiter, type TriggerType } from '@/services/queue' import { mergeSubblockState } from '@/stores/workflows/server-utils' @@ -65,8 +67,8 @@ class UsageLimitError extends Error { async function executeWorkflow( workflow: any, requestId: string, - input?: any, - executingUserId?: string + input: any | undefined, + actorUserId: string ): Promise { const workflowId = workflow.id const executionId = uuidv4() @@ -85,8 +87,8 @@ async function executeWorkflow( // Rate limiting is now handled before entering the sync queue - // Check if the user has exceeded their usage limits - const usageCheck = await checkServerSideUsageLimits(workflow.userId) + // Check if the actor has exceeded their usage limits + const usageCheck = await checkServerSideUsageLimits(actorUserId) if (usageCheck.isExceeded) { logger.warn(`[${requestId}] User ${workflow.userId} has exceeded usage limits`, { currentUsage: usageCheck.currentUsage, @@ -132,13 +134,13 @@ async function executeWorkflow( // Load personal (for the executing user) and workspace env (workspace overrides personal) const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( - executingUserId || workflow.userId, + actorUserId, workflow.workspaceId || undefined ) const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted }) await loggingSession.safeStart({ - userId: executingUserId || workflow.userId, + userId: actorUserId, workspaceId: workflow.workspaceId, variables, }) @@ -340,7 +342,7 @@ async function executeWorkflow( totalApiCalls: sql`total_api_calls + 1`, lastActive: sql`now()`, }) - .where(eq(userStats.userId, workflow.userId)) + .where(eq(userStats.userId, actorUserId)) } await loggingSession.safeComplete({ @@ -354,6 +356,13 @@ async function executeWorkflow( } catch (error: any) { logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, error) + const executionResultForError = (error?.executionResult as ExecutionResult | undefined) || { + success: false, + output: {}, + logs: [], + } + const { traceSpans } = buildTraceSpans(executionResultForError) + await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: 0, @@ -361,6 +370,7 @@ async function executeWorkflow( message: error.message || 'Workflow execution failed', stackTrace: error.stack, }, + traceSpans, }) throw error @@ -396,19 +406,30 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ // Synchronous execution try { - // Check rate limits BEFORE entering queue for GET requests - if (triggerType === 'api') { - // Get user subscription (checks both personal and org subscriptions) - const userSubscription = await getHighestPrioritySubscription(validation.workflow.userId) + // Resolve actor user id + let actorUserId: string | null = null + if (triggerType === 'manual') { + actorUserId = session!.user!.id + } else { + const apiKeyHeader = request.headers.get('X-API-Key') + const auth = apiKeyHeader ? await authenticateApiKeyFromHeader(apiKeyHeader) : null + if (!auth?.success || !auth.userId) { + return createErrorResponse('Unauthorized', 401) + } + actorUserId = auth.userId + if (auth.keyId) { + void updateApiKeyLastUsed(auth.keyId).catch(() => {}) + } + // Check rate limits BEFORE entering execution for API requests + const userSubscription = await getHighestPrioritySubscription(actorUserId) const rateLimiter = new RateLimiter() const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - validation.workflow.userId, + actorUserId, userSubscription, - triggerType, - false // isAsync = false for sync calls + 'api', + false ) - if (!rateLimitCheck.allowed) { throw new RateLimitError( `Rate limit exceeded. You have ${rateLimitCheck.remaining} requests remaining. Resets at ${rateLimitCheck.resetAt.toISOString()}` @@ -420,8 +441,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{ validation.workflow, requestId, undefined, - // Executing user (manual run): if session present, use that user for fallback - (await getSession())?.user?.id || undefined + actorUserId as string ) // Check if the workflow execution contains a response block output @@ -508,14 +528,19 @@ export async function POST( let triggerType: TriggerType = 'manual' const session = await getSession() - if (session?.user?.id) { + const apiKeyHeader = request.headers.get('X-API-Key') + if (session?.user?.id && !apiKeyHeader) { authenticatedUserId = session.user.id - triggerType = 'manual' // UI session (not rate limited) - } else { - const apiKeyHeader = request.headers.get('X-API-Key') - if (apiKeyHeader) { - authenticatedUserId = validation.workflow.userId - triggerType = 'api' + triggerType = 'manual' + } else if (apiKeyHeader) { + const auth = await authenticateApiKeyFromHeader(apiKeyHeader) + if (!auth.success || !auth.userId) { + return createErrorResponse('Unauthorized', 401) + } + authenticatedUserId = auth.userId + triggerType = 'api' + if (auth.keyId) { + void updateApiKeyLastUsed(auth.keyId).catch(() => {}) } } diff --git a/apps/sim/app/api/workflows/[id]/log/route.ts b/apps/sim/app/api/workflows/[id]/log/route.ts index 5042f77b1f..75bec17ce2 100644 --- a/apps/sim/app/api/workflows/[id]/log/route.ts +++ b/apps/sim/app/api/workflows/[id]/log/route.ts @@ -44,15 +44,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{ variables: {}, }) + const { traceSpans } = buildTraceSpans(result) + if (result.success === false) { const message = result.error || 'Workflow execution failed' await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: result.metadata?.duration || 0, error: { message }, + traceSpans, }) } else { - const { traceSpans } = buildTraceSpans(result) await loggingSession.safeComplete({ endedAt: new Date().toISOString(), totalDurationMs: result.metadata?.duration || 0, diff --git a/apps/sim/app/workspace/[workspaceId]/logs/components/trace-spans/trace-spans-display.tsx b/apps/sim/app/workspace/[workspaceId]/logs/components/trace-spans/trace-spans-display.tsx index 0d3361816b..7a3657f5d5 100644 --- a/apps/sim/app/workspace/[workspaceId]/logs/components/trace-spans/trace-spans-display.tsx +++ b/apps/sim/app/workspace/[workspaceId]/logs/components/trace-spans/trace-spans-display.tsx @@ -13,6 +13,67 @@ import { import { cn, redactApiKeys } from '@/lib/utils' import type { TraceSpan } from '@/stores/logs/filters/types' +function getSpanKey(span: TraceSpan): string { + if (span.id) { + return span.id + } + + const name = span.name || 'span' + const start = span.startTime || 'unknown-start' + const end = span.endTime || 'unknown-end' + + return `${name}|${start}|${end}` +} + +function mergeTraceSpanChildren(...groups: TraceSpan[][]): TraceSpan[] { + const merged: TraceSpan[] = [] + const seen = new Set() + + groups.forEach((group) => { + group.forEach((child) => { + const key = getSpanKey(child) + if (seen.has(key)) { + return + } + seen.add(key) + merged.push(child) + }) + }) + + return merged +} + +function normalizeChildWorkflowSpan(span: TraceSpan): TraceSpan { + const enrichedSpan: TraceSpan = { ...span } + + if (enrichedSpan.output && typeof enrichedSpan.output === 'object') { + enrichedSpan.output = { ...enrichedSpan.output } + } + + const normalizedChildren = Array.isArray(span.children) + ? span.children.map((childSpan) => normalizeChildWorkflowSpan(childSpan)) + : [] + + const outputChildSpans = Array.isArray(span.output?.childTraceSpans) + ? (span.output!.childTraceSpans as TraceSpan[]).map((childSpan) => + normalizeChildWorkflowSpan(childSpan) + ) + : [] + + const mergedChildren = mergeTraceSpanChildren(normalizedChildren, outputChildSpans) + + if (enrichedSpan.output && 'childTraceSpans' in enrichedSpan.output) { + const { childTraceSpans, ...cleanOutput } = enrichedSpan.output as { + childTraceSpans?: TraceSpan[] + } & Record + enrichedSpan.output = cleanOutput + } + + enrichedSpan.children = mergedChildren.length > 0 ? mergedChildren : undefined + + return enrichedSpan +} + interface TraceSpansDisplayProps { traceSpans?: TraceSpan[] totalDuration?: number @@ -310,22 +371,23 @@ export function TraceSpansDisplay({
{traceSpans.map((span, index) => { + const normalizedSpan = normalizeChildWorkflowSpan(span) const hasSubItems = Boolean( - (span.children && span.children.length > 0) || - (span.toolCalls && span.toolCalls.length > 0) || - span.input || - span.output + (normalizedSpan.children && normalizedSpan.children.length > 0) || + (normalizedSpan.toolCalls && normalizedSpan.toolCalls.length > 0) || + normalizedSpan.input || + normalizedSpan.output ) return ( {span.children?.map((childSpan, index) => { + const enrichedChildSpan = normalizeChildWorkflowSpan(childSpan) + const childHasSubItems = Boolean( - (childSpan.children && childSpan.children.length > 0) || - (childSpan.toolCalls && childSpan.toolCalls.length > 0) || - childSpan.input || - childSpan.output + (enrichedChildSpan.children && enrichedChildSpan.children.length > 0) || + (enrichedChildSpan.toolCalls && enrichedChildSpan.toolCalls.length > 0) || + enrichedChildSpan.input || + enrichedChildSpan.output ) return ( { + return typeof value === 'object' && value !== null +} + +function sanitizeMessage(value: unknown): string | undefined { + if (typeof value !== 'string') return undefined + const trimmed = value.trim() + if (!trimmed || trimmed === 'undefined (undefined)') return undefined + return trimmed +} + +function normalizeErrorMessage(error: unknown): string { + if (error instanceof Error) { + const message = sanitizeMessage(error.message) + if (message) return message + } else if (typeof error === 'string') { + const message = sanitizeMessage(error) + if (message) return message + } + + if (isRecord(error)) { + const directMessage = sanitizeMessage(error.message) + if (directMessage) return directMessage + + const nestedError = error.error + if (isRecord(nestedError)) { + const nestedMessage = sanitizeMessage(nestedError.message) + if (nestedMessage) return nestedMessage + } else { + const nestedMessage = sanitizeMessage(nestedError) + if (nestedMessage) return nestedMessage + } + } + + return WORKFLOW_EXECUTION_FAILURE_MESSAGE +} + +function isExecutionResult(value: unknown): value is ExecutionResult { + if (!isRecord(value)) return false + return typeof value.success === 'boolean' && isRecord(value.output) +} + +function extractExecutionResult(error: unknown): ExecutionResult | null { + if (!isRecord(error)) return null + const candidate = error.executionResult + return isExecutionResult(candidate) ? candidate : null +} + export function useWorkflowExecution() { const currentWorkflow = useCurrentWorkflow() const { activeWorkflowId, workflows } = useWorkflowRegistry() @@ -862,74 +912,56 @@ export function useWorkflowExecution() { return newExecutor.execute(activeWorkflowId || '', startBlockId) } - const handleExecutionError = (error: any, options?: { executionId?: string }) => { - let errorMessage = 'Unknown error' - if (error instanceof Error) { - errorMessage = error.message || `Error: ${String(error)}` - } else if (typeof error === 'string') { - errorMessage = error - } else if (error && typeof error === 'object') { - if ( - error.message === 'undefined (undefined)' || - (error.error && - typeof error.error === 'object' && - error.error.message === 'undefined (undefined)') - ) { - errorMessage = 'API request failed - no specific error details available' - } else if (error.message) { - errorMessage = error.message - } else if (error.error && typeof error.error === 'string') { - errorMessage = error.error - } else if (error.error && typeof error.error === 'object' && error.error.message) { - errorMessage = error.error.message - } else { - try { - errorMessage = `Error details: ${JSON.stringify(error)}` - } catch { - errorMessage = 'Error occurred but details could not be displayed' - } - } - } + const handleExecutionError = (error: unknown, options?: { executionId?: string }) => { + const normalizedMessage = normalizeErrorMessage(error) + const executionResultFromError = extractExecutionResult(error) - if (errorMessage === 'undefined (undefined)') { - errorMessage = 'API request failed - no specific error details available' - } + let errorResult: ExecutionResult - // If we failed before creating an executor (e.g., serializer validation), add a console entry - if (!executor) { - try { - // Prefer attributing to specific subflow if we have a structured error - let blockId = 'serialization' - let blockName = 'Workflow' - let blockType = 'serializer' - if (error instanceof WorkflowValidationError) { - blockId = error.blockId || blockId - blockName = error.blockName || blockName - blockType = error.blockType || blockType - } + if (executionResultFromError) { + const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : [] - useConsoleStore.getState().addConsole({ - input: {}, - output: {}, - success: false, - error: errorMessage, - durationMs: 0, - startedAt: new Date().toISOString(), - endedAt: new Date().toISOString(), - workflowId: activeWorkflowId || '', - blockId, - executionId: options?.executionId, - blockName, - blockType, - }) - } catch {} - } + errorResult = { + ...executionResultFromError, + success: false, + error: executionResultFromError.error ?? normalizedMessage, + logs, + } + } else { + if (!executor) { + try { + let blockId = 'serialization' + let blockName = 'Workflow' + let blockType = 'serializer' + if (error instanceof WorkflowValidationError) { + blockId = error.blockId || blockId + blockName = error.blockName || blockName + blockType = error.blockType || blockType + } + + useConsoleStore.getState().addConsole({ + input: {}, + output: {}, + success: false, + error: normalizedMessage, + durationMs: 0, + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + workflowId: activeWorkflowId || '', + blockId, + executionId: options?.executionId, + blockName, + blockType, + }) + } catch {} + } - const errorResult: ExecutionResult = { - success: false, - output: {}, - error: errorMessage, - logs: [], + errorResult = { + success: false, + output: {}, + error: normalizedMessage, + logs: [], + } } setExecutionResult(errorResult) @@ -937,16 +969,14 @@ export function useWorkflowExecution() { setIsDebugging(false) setActiveBlocks(new Set()) - let notificationMessage = 'Workflow execution failed' - if (error?.request?.url) { - if (error.request.url && error.request.url.trim() !== '') { - notificationMessage += `: Request to ${error.request.url} failed` - if (error.status) { - notificationMessage += ` (Status: ${error.status})` - } + let notificationMessage = WORKFLOW_EXECUTION_FAILURE_MESSAGE + if (isRecord(error) && isRecord(error.request) && sanitizeMessage(error.request.url)) { + notificationMessage += `: Request to ${(error.request.url as string).trim()} failed` + if ('status' in error && typeof error.status === 'number') { + notificationMessage += ` (Status: ${error.status})` } - } else { - notificationMessage += `: ${errorMessage}` + } else if (sanitizeMessage(errorResult.error)) { + notificationMessage += `: ${errorResult.error}` } return errorResult diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index c130485be4..b60b6740b3 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -17,6 +17,7 @@ import { } from '@/lib/workflows/db-helpers' import { updateWorkflowRunCounts } from '@/lib/workflows/utils' import { Executor } from '@/executor' +import type { ExecutionResult } from '@/executor/types' import { Serializer } from '@/serializer' import { mergeSubblockState } from '@/stores/workflows/server-utils' @@ -386,6 +387,13 @@ async function executeWebhookJobInternal( // Complete logging session with error (matching workflow-execution pattern) try { + const executionResult = (error?.executionResult as ExecutionResult | undefined) || { + success: false, + output: {}, + logs: [], + } + const { traceSpans } = buildTraceSpans(executionResult) + await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: 0, @@ -393,6 +401,7 @@ async function executeWebhookJobInternal( message: error.message || 'Webhook execution failed', stackTrace: error.stack, }, + traceSpans, }) } catch (loggingError) { logger.error(`[${requestId}] Failed to complete logging session`, loggingError) diff --git a/apps/sim/background/workflow-execution.ts b/apps/sim/background/workflow-execution.ts index 9cb01a8370..04b2b2c844 100644 --- a/apps/sim/background/workflow-execution.ts +++ b/apps/sim/background/workflow-execution.ts @@ -192,6 +192,9 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { stack: error.stack, }) + const executionResult = error?.executionResult || { success: false, output: {}, logs: [] } + const { traceSpans } = buildTraceSpans(executionResult) + await loggingSession.safeCompleteWithError({ endedAt: new Date().toISOString(), totalDurationMs: 0, @@ -199,6 +202,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) { message: error.message || 'Workflow execution failed', stackTrace: error.stack, }, + traceSpans, }) throw error diff --git a/apps/sim/executor/consts.ts b/apps/sim/executor/consts.ts index b5ebea7154..d580f8205f 100644 --- a/apps/sim/executor/consts.ts +++ b/apps/sim/executor/consts.ts @@ -12,7 +12,8 @@ export enum BlockType { API = 'api', EVALUATOR = 'evaluator', RESPONSE = 'response', - WORKFLOW = 'workflow', + WORKFLOW = 'workflow', // Deprecated - kept for backwards compatibility + WORKFLOW_INPUT = 'workflow_input', // Current workflow block type STARTER = 'starter', } @@ -27,3 +28,10 @@ export const ALL_BLOCK_TYPES = Object.values(BlockType) as string[] export function isValidBlockType(type: string): type is BlockType { return ALL_BLOCK_TYPES.includes(type) } + +/** + * Helper to check if a block type is a workflow block (current or deprecated) + */ +export function isWorkflowBlockType(blockType: string | undefined): boolean { + return blockType === BlockType.WORKFLOW || blockType === BlockType.WORKFLOW_INPUT +} diff --git a/apps/sim/executor/handlers/workflow/workflow-handler.ts b/apps/sim/executor/handlers/workflow/workflow-handler.ts index db4113e4b2..7e0f950c20 100644 --- a/apps/sim/executor/handlers/workflow/workflow-handler.ts +++ b/apps/sim/executor/handlers/workflow/workflow-handler.ts @@ -1,17 +1,29 @@ import { generateInternalToken } from '@/lib/auth/internal' import { createLogger } from '@/lib/logs/console/logger' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' +import type { TraceSpan } from '@/lib/logs/types' import { getBaseUrl } from '@/lib/urls/utils' import type { BlockOutput } from '@/blocks/types' import { Executor } from '@/executor' import { BlockType } from '@/executor/consts' -import type { BlockHandler, ExecutionContext, StreamingExecution } from '@/executor/types' +import type { + BlockHandler, + ExecutionContext, + ExecutionResult, + StreamingExecution, +} from '@/executor/types' import { Serializer } from '@/serializer' import type { SerializedBlock } from '@/serializer/types' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' const logger = createLogger('WorkflowBlockHandler') +type WorkflowTraceSpan = TraceSpan & { + metadata?: Record + children?: WorkflowTraceSpan[] + output?: (Record & { childTraceSpans?: WorkflowTraceSpan[] }) | null +} + // Maximum allowed depth for nested workflow executions const MAX_WORKFLOW_DEPTH = 10 @@ -125,13 +137,19 @@ export class WorkflowBlockHandler implements BlockHandler { // Use the actual child workflow ID for authentication, not the execution ID // This ensures knowledge base and other API calls can properly authenticate const result = await subExecutor.execute(workflowId) + const executionResult = this.toExecutionResult(result) const duration = performance.now() - startTime logger.info(`Child workflow ${childWorkflowName} completed in ${Math.round(duration)}ms`) - const childTraceSpans = this.captureChildWorkflowLogs(result, childWorkflowName, context) + const childTraceSpans = this.captureChildWorkflowLogs( + executionResult, + childWorkflowName, + context + ) + const mappedResult = this.mapChildOutputToParent( - result, + executionResult, workflowId, childWorkflowName, duration, @@ -146,6 +164,7 @@ export class WorkflowBlockHandler implements BlockHandler { // Attach trace spans and name for higher-level logging to consume errorWithSpans.childTraceSpans = childTraceSpans errorWithSpans.childWorkflowName = childWorkflowName + errorWithSpans.executionResult = executionResult throw errorWithSpans } @@ -162,7 +181,19 @@ export class WorkflowBlockHandler implements BlockHandler { throw error // Re-throw as-is to avoid duplication } - throw new Error(`Error in child workflow "${childWorkflowName}": ${originalError}`) + const wrappedError = new Error( + `Error in child workflow "${childWorkflowName}": ${originalError}` + ) as any + if (error.childTraceSpans) { + wrappedError.childTraceSpans = error.childTraceSpans + } + if (error.childWorkflowName) { + wrappedError.childWorkflowName = error.childWorkflowName + } + if (error.executionResult) { + wrappedError.executionResult = error.executionResult + } + throw wrappedError } } @@ -318,10 +349,10 @@ export class WorkflowBlockHandler implements BlockHandler { * Captures and transforms child workflow logs into trace spans */ private captureChildWorkflowLogs( - childResult: any, + childResult: ExecutionResult, childWorkflowName: string, parentContext: ExecutionContext - ): any[] { + ): WorkflowTraceSpan[] { try { if (!childResult.logs || !Array.isArray(childResult.logs)) { return [] @@ -333,9 +364,15 @@ export class WorkflowBlockHandler implements BlockHandler { return [] } - const transformedSpans = traceSpans.map((span: any) => { - return this.transformSpanForChildWorkflow(span, childWorkflowName) - }) + const processedSpans = this.processChildWorkflowSpans(traceSpans) + + if (processedSpans.length === 0) { + return [] + } + + const transformedSpans = processedSpans.map((span) => + this.transformSpanForChildWorkflow(span, childWorkflowName) + ) return transformedSpans } catch (error) { @@ -347,67 +384,111 @@ export class WorkflowBlockHandler implements BlockHandler { /** * Transforms trace span for child workflow context */ - private transformSpanForChildWorkflow(span: any, childWorkflowName: string): any { - const transformedSpan = { - ...span, - name: this.cleanChildSpanName(span.name, childWorkflowName), - metadata: { - ...span.metadata, - isFromChildWorkflow: true, - childWorkflowName, - }, + private transformSpanForChildWorkflow( + span: WorkflowTraceSpan, + childWorkflowName: string + ): WorkflowTraceSpan { + const metadata: Record = { + ...(span.metadata ?? {}), + isFromChildWorkflow: true, + childWorkflowName, } - if (span.children && Array.isArray(span.children)) { - transformedSpan.children = span.children.map((childSpan: any) => - this.transformSpanForChildWorkflow(childSpan, childWorkflowName) - ) + const transformedChildren = Array.isArray(span.children) + ? span.children.map((childSpan) => + this.transformSpanForChildWorkflow(childSpan, childWorkflowName) + ) + : undefined + + return { + ...span, + metadata, + ...(transformedChildren ? { children: transformedChildren } : {}), } + } + + private processChildWorkflowSpans(spans: TraceSpan[]): WorkflowTraceSpan[] { + const processed: WorkflowTraceSpan[] = [] - if (span.output?.childTraceSpans) { - transformedSpan.output = { - ...transformedSpan.output, - childTraceSpans: span.output.childTraceSpans, + spans.forEach((span) => { + if (this.isSyntheticWorkflowWrapper(span)) { + if (span.children && Array.isArray(span.children)) { + processed.push(...this.processChildWorkflowSpans(span.children)) + } + return } - } - return transformedSpan + const workflowSpan: WorkflowTraceSpan = { + ...span, + } + + if (Array.isArray(workflowSpan.children)) { + workflowSpan.children = this.processChildWorkflowSpans(workflowSpan.children as TraceSpan[]) + } + + processed.push(workflowSpan) + }) + + return processed } - /** - * Cleans up child span names for readability - */ - private cleanChildSpanName(spanName: string, childWorkflowName: string): string { - if (spanName.includes(`${childWorkflowName}:`)) { - const cleanName = spanName.replace(`${childWorkflowName}:`, '').trim() + private flattenChildWorkflowSpans(spans: TraceSpan[]): WorkflowTraceSpan[] { + const flattened: WorkflowTraceSpan[] = [] - if (cleanName === 'Workflow Execution') { - return `${childWorkflowName} workflow` + spans.forEach((span) => { + if (this.isSyntheticWorkflowWrapper(span)) { + if (span.children && Array.isArray(span.children)) { + flattened.push(...this.flattenChildWorkflowSpans(span.children)) + } + return } - if (cleanName.startsWith('Agent ')) { - return `${cleanName}` + const workflowSpan: WorkflowTraceSpan = { + ...span, } - return `${cleanName}` - } + if (Array.isArray(workflowSpan.children)) { + const childSpans = workflowSpan.children as TraceSpan[] + workflowSpan.children = this.flattenChildWorkflowSpans(childSpans) + } - if (spanName === 'Workflow Execution') { - return `${childWorkflowName} workflow` - } + if (workflowSpan.output && typeof workflowSpan.output === 'object') { + const { childTraceSpans: nestedChildSpans, ...outputRest } = workflowSpan.output as { + childTraceSpans?: TraceSpan[] + } & Record + + if (Array.isArray(nestedChildSpans) && nestedChildSpans.length > 0) { + const flattenedNestedChildren = this.flattenChildWorkflowSpans(nestedChildSpans) + workflowSpan.children = [...(workflowSpan.children || []), ...flattenedNestedChildren] + } + + workflowSpan.output = outputRest + } + + flattened.push(workflowSpan) + }) + + return flattened + } + + private toExecutionResult(result: ExecutionResult | StreamingExecution): ExecutionResult { + return 'execution' in result ? result.execution : result + } - return `${spanName}` + private isSyntheticWorkflowWrapper(span: TraceSpan | undefined): boolean { + if (!span || span.type !== 'workflow') return false + return !span.blockId } /** * Maps child workflow output to parent block output */ private mapChildOutputToParent( - childResult: any, + childResult: ExecutionResult, childWorkflowId: string, childWorkflowName: string, duration: number, - childTraceSpans?: any[] + childTraceSpans?: WorkflowTraceSpan[] ): BlockOutput { const success = childResult.success !== false if (!success) { diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index 9b92b8765c..4404f01f1d 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -3,7 +3,7 @@ import { createLogger } from '@/lib/logs/console/logger' import type { TraceSpan } from '@/lib/logs/types' import { getBlock } from '@/blocks' import type { BlockOutput } from '@/blocks/types' -import { BlockType } from '@/executor/consts' +import { BlockType, isWorkflowBlockType } from '@/executor/consts' import { AgentBlockHandler, ApiBlockHandler, @@ -2182,7 +2182,7 @@ export class Executor { new Date(blockLog.endedAt).getTime() - new Date(blockLog.startedAt).getTime() // If this error came from a child workflow execution, persist its trace spans on the log - if (block.metadata?.id === BlockType.WORKFLOW) { + if (isWorkflowBlockType(block.metadata?.id)) { this.attachChildWorkflowSpansToLog(blockLog, error) } @@ -2272,7 +2272,7 @@ export class Executor { } // Preserve child workflow spans on the block state so downstream logging can render them - if (block.metadata?.id === BlockType.WORKFLOW) { + if (isWorkflowBlockType(block.metadata?.id)) { this.attachChildWorkflowSpansToOutput(errorOutput, error) } @@ -2283,7 +2283,42 @@ export class Executor { executionTime: blockLog.durationMs, }) - // If there are error paths to follow, return error output instead of throwing + const failureEndTime = context.metadata.endTime ?? new Date().toISOString() + if (!context.metadata.endTime) { + context.metadata.endTime = failureEndTime + } + const failureDuration = context.metadata.startTime + ? Math.max( + 0, + new Date(failureEndTime).getTime() - new Date(context.metadata.startTime).getTime() + ) + : (context.metadata.duration ?? 0) + context.metadata.duration = failureDuration + + const failureMetadata = { + ...context.metadata, + endTime: failureEndTime, + duration: failureDuration, + workflowConnections: this.actualWorkflow.connections.map((conn) => ({ + source: conn.source, + target: conn.target, + })), + } + + const upstreamExecutionResult = (error as { executionResult?: ExecutionResult } | null) + ?.executionResult + const executionResultPayload: ExecutionResult = { + success: false, + output: upstreamExecutionResult?.output ?? errorOutput, + error: upstreamExecutionResult?.error ?? this.extractErrorMessage(error), + logs: [...context.blockLogs], + metadata: { + ...failureMetadata, + ...(upstreamExecutionResult?.metadata ?? {}), + workflowConnections: failureMetadata.workflowConnections, + }, + } + if (hasErrorPath) { // Return the error output to allow execution to continue along error path return errorOutput @@ -2316,7 +2351,17 @@ export class Executor { errorMessage: this.extractErrorMessage(error), }) - throw new Error(errorMessage) + const executionError = new Error(errorMessage) + ;(executionError as any).executionResult = executionResultPayload + if (Array.isArray((error as { childTraceSpans?: TraceSpan[] } | null)?.childTraceSpans)) { + ;(executionError as any).childTraceSpans = ( + error as { childTraceSpans?: TraceSpan[] } + ).childTraceSpans + ;(executionError as any).childWorkflowName = ( + error as { childWorkflowName?: string } + ).childWorkflowName + } + throw executionError } } @@ -2329,11 +2374,12 @@ export class Executor { error as { childTraceSpans?: TraceSpan[]; childWorkflowName?: string } | null | undefined )?.childTraceSpans if (Array.isArray(spans) && spans.length > 0) { + const childWorkflowName = (error as { childWorkflowName?: string } | null | undefined) + ?.childWorkflowName blockLog.output = { ...(blockLog.output || {}), childTraceSpans: spans, - childWorkflowName: (error as { childWorkflowName?: string } | null | undefined) - ?.childWorkflowName, + childWorkflowName, } } } @@ -2516,7 +2562,7 @@ export class Executor { * Preserves child workflow trace spans for proper nesting */ private integrateChildWorkflowLogs(block: SerializedBlock, output: NormalizedBlockOutput): void { - if (block.metadata?.id !== BlockType.WORKFLOW) { + if (!isWorkflowBlockType(block.metadata?.id)) { return } diff --git a/apps/sim/lib/api-key/service.ts b/apps/sim/lib/api-key/service.ts index 9f13e23695..2869b19b86 100644 --- a/apps/sim/lib/api-key/service.ts +++ b/apps/sim/lib/api-key/service.ts @@ -125,6 +125,27 @@ export async function updateApiKeyLastUsed(keyId: string): Promise { } } +/** + * Given a pinned API key ID, resolve the owning userId (actor). + * Returns null if not found. + */ +export async function getApiKeyOwnerUserId( + pinnedApiKeyId: string | null | undefined +): Promise { + if (!pinnedApiKeyId) return null + try { + const rows = await db + .select({ userId: apiKeyTable.userId }) + .from(apiKeyTable) + .where(eq(apiKeyTable.id, pinnedApiKeyId)) + .limit(1) + return rows[0]?.userId ?? null + } catch (error) { + logger.error('Error resolving API key owner', { error, pinnedApiKeyId }) + return null + } +} + /** * Get the API encryption key from the environment * @returns The API encryption key diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index fef8d6962a..700c8a164f 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -37,6 +37,7 @@ export interface SessionErrorCompleteParams { message?: string stackTrace?: string } + traceSpans?: TraceSpan[] } export class LoggingSession { @@ -131,7 +132,7 @@ export class LoggingSession { async completeWithError(params: SessionErrorCompleteParams = {}): Promise { try { - const { endedAt, totalDurationMs, error } = params + const { endedAt, totalDurationMs, error, traceSpans } = params const endTime = endedAt ? new Date(endedAt) : new Date() const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0 @@ -151,19 +152,19 @@ export class LoggingSession { const message = error?.message || 'Execution failed before starting blocks' - const syntheticErrorSpan: TraceSpan[] = [ - { - id: 'pre-execution-validation', - name: 'Workflow Error', - type: 'validation', - duration: Math.max(1, durationMs), - startTime: startTime.toISOString(), - endTime: endTime.toISOString(), - status: 'error', - children: [], - output: { error: message }, - }, - ] + const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0 + + const errorSpan: TraceSpan = { + id: 'workflow-error-root', + name: 'Workflow Error', + type: 'workflow', + duration: Math.max(1, durationMs), + startTime: startTime.toISOString(), + endTime: endTime.toISOString(), + status: 'error', + ...(hasProvidedSpans ? {} : { children: [] }), + output: { error: message }, + } await executionLogger.completeWorkflowExecution({ executionId: this.executionId, @@ -171,7 +172,7 @@ export class LoggingSession { totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { error: message }, - traceSpans: syntheticErrorSpan, + traceSpans: hasProvidedSpans ? traceSpans : [errorSpan], }) if (this.requestId) { diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts index b88e5ba54a..379fe4eacf 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts @@ -582,6 +582,195 @@ describe('buildTraceSpans', () => { // Verify no toolCalls property exists (since we're using children instead) expect(agentSpan.toolCalls).toBeUndefined() }) + + test('should flatten nested child workflow trace spans recursively', () => { + const nestedChildSpan = { + id: 'nested-workflow-span', + name: 'Nested Workflow Block', + type: 'workflow', + blockId: 'nested-workflow-block-id', + duration: 3000, + startTime: '2024-01-01T10:00:01.000Z', + endTime: '2024-01-01T10:00:04.000Z', + status: 'success' as const, + output: { + childTraceSpans: [ + { + id: 'grand-wrapper', + name: 'Workflow Execution', + type: 'workflow', + duration: 3000, + startTime: '2024-01-01T10:00:01.000Z', + endTime: '2024-01-01T10:00:04.000Z', + status: 'success' as const, + children: [ + { + id: 'grand-child-block', + name: 'Deep API Call', + type: 'api', + duration: 1500, + startTime: '2024-01-01T10:00:01.500Z', + endTime: '2024-01-01T10:00:03.000Z', + status: 'success' as const, + input: { path: '/v1/test' }, + output: { result: 'ok' }, + }, + ], + }, + ], + }, + } + + const toolSpan = { + id: 'child-tool-span', + name: 'Helper Tool', + type: 'tool', + duration: 1000, + startTime: '2024-01-01T10:00:04.000Z', + endTime: '2024-01-01T10:00:05.000Z', + status: 'success' as const, + } + + const mockExecutionResult: ExecutionResult = { + success: true, + output: { result: 'parent output' }, + logs: [ + { + blockId: 'workflow-1', + blockName: 'Child Workflow', + blockType: 'workflow', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:05.000Z', + durationMs: 5000, + success: true, + output: { + childWorkflowName: 'Child Workflow', + childTraceSpans: [ + { + id: 'child-wrapper', + name: 'Workflow Execution', + type: 'workflow', + duration: 5000, + startTime: '2024-01-01T10:00:00.000Z', + endTime: '2024-01-01T10:00:05.000Z', + status: 'success' as const, + children: [nestedChildSpan, toolSpan], + }, + ], + }, + }, + ], + } + + const { traceSpans } = buildTraceSpans(mockExecutionResult) + + expect(traceSpans).toHaveLength(1) + const workflowSpan = traceSpans[0] + expect(workflowSpan.type).toBe('workflow') + expect(workflowSpan.children).toBeDefined() + expect(workflowSpan.children).toHaveLength(2) + + const nestedWorkflowSpan = workflowSpan.children?.find((span) => span.type === 'workflow') + expect(nestedWorkflowSpan).toBeDefined() + expect(nestedWorkflowSpan?.name).toBe('Nested Workflow Block') + expect(nestedWorkflowSpan?.children).toBeDefined() + expect(nestedWorkflowSpan?.children).toHaveLength(1) + expect(nestedWorkflowSpan?.children?.[0].name).toBe('Deep API Call') + expect(nestedWorkflowSpan?.children?.[0].type).toBe('api') + + const helperToolSpan = workflowSpan.children?.find((span) => span.id === 'child-tool-span') + expect(helperToolSpan?.type).toBe('tool') + + const syntheticWrappers = workflowSpan.children?.filter( + (span) => span.name === 'Workflow Execution' + ) + expect(syntheticWrappers).toHaveLength(0) + }) + + test('should handle nested child workflow errors with proper hierarchy', () => { + const functionErrorSpan = { + id: 'function-error-span', + name: 'Function 1', + type: 'function', + duration: 200, + startTime: '2024-01-01T10:01:02.000Z', + endTime: '2024-01-01T10:01:02.200Z', + status: 'error' as const, + blockId: 'function-1', + output: { + error: 'Syntax Error: Line 1: `retur "HELLO"` - Unexpected string', + }, + } + + const rainbowCupcakeSpan = { + id: 'rainbow-workflow-span', + name: 'Rainbow Cupcake', + type: 'workflow', + duration: 300, + startTime: '2024-01-01T10:01:02.000Z', + endTime: '2024-01-01T10:01:02.300Z', + status: 'error' as const, + blockId: 'workflow-rainbow', + output: { + childWorkflowName: 'rainbow-cupcake', + error: 'Syntax Error: Line 1: `retur "HELLO"` - Unexpected string', + childTraceSpans: [functionErrorSpan], + }, + } + + const mockExecutionResult: ExecutionResult = { + success: false, + output: { result: null }, + metadata: { + duration: 3000, + startTime: '2024-01-01T10:01:00.000Z', + }, + logs: [ + { + blockId: 'workflow-silk', + blockName: 'Silk Pond', + blockType: 'workflow', + startedAt: '2024-01-01T10:01:00.000Z', + endedAt: '2024-01-01T10:01:03.000Z', + durationMs: 3000, + success: false, + error: + 'Error in child workflow "silk-pond": Error in child workflow "rainbow-cupcake": Syntax Error', + output: { + childWorkflowName: 'silk-pond', + childTraceSpans: [rainbowCupcakeSpan], + }, + }, + ], + } + + const { traceSpans } = buildTraceSpans(mockExecutionResult) + + expect(traceSpans).toHaveLength(1) + const workflowExecutionSpan = traceSpans[0] + expect(workflowExecutionSpan.name).toBe('Workflow Execution') + expect(workflowExecutionSpan.status).toBe('error') + expect(workflowExecutionSpan.children).toBeDefined() + expect(workflowExecutionSpan.children).toHaveLength(1) + + const silkPondSpan = workflowExecutionSpan.children?.[0] + expect(silkPondSpan?.name).toBe('Silk Pond') + expect(silkPondSpan?.status).toBe('error') + expect(silkPondSpan?.children).toBeDefined() + expect(silkPondSpan?.children).toHaveLength(1) + + const rainbowSpan = silkPondSpan?.children?.[0] + expect(rainbowSpan?.name).toBe('Rainbow Cupcake') + expect(rainbowSpan?.status).toBe('error') + expect(rainbowSpan?.type).toBe('workflow') + expect(rainbowSpan?.children).toBeDefined() + expect(rainbowSpan?.children).toHaveLength(1) + + const functionSpan = rainbowSpan?.children?.[0] + expect(functionSpan?.name).toBe('Function 1') + expect(functionSpan?.status).toBe('error') + expect((functionSpan?.output as { error?: string })?.error).toContain('Syntax Error') + }) }) describe('stripCustomToolPrefix', () => { diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts index d2013dd3e8..31020d3165 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts @@ -1,9 +1,63 @@ import { createLogger } from '@/lib/logs/console/logger' import type { TraceSpan } from '@/lib/logs/types' +import { isWorkflowBlockType } from '@/executor/consts' import type { ExecutionResult } from '@/executor/types' const logger = createLogger('TraceSpans') +function isSyntheticWorkflowWrapper(span: TraceSpan | undefined): boolean { + if (!span || span.type !== 'workflow') return false + return !span.blockId +} + +function flattenWorkflowChildren(spans: TraceSpan[]): TraceSpan[] { + const flattened: TraceSpan[] = [] + + spans.forEach((span) => { + if (isSyntheticWorkflowWrapper(span)) { + if (span.children && Array.isArray(span.children)) { + flattened.push(...flattenWorkflowChildren(span.children)) + } + return + } + + const processedSpan = ensureNestedWorkflowsProcessed(span) + flattened.push(processedSpan) + }) + + return flattened +} + +function getTraceSpanKey(span: TraceSpan): string { + if (span.id) { + return span.id + } + + const name = span.name || 'span' + const start = span.startTime || 'unknown-start' + const end = span.endTime || 'unknown-end' + + return `${name}|${start}|${end}` +} + +function mergeTraceSpanChildren(...childGroups: TraceSpan[][]): TraceSpan[] { + const merged: TraceSpan[] = [] + const seen = new Set() + + childGroups.forEach((group) => { + group.forEach((child) => { + const key = getTraceSpanKey(child) + if (seen.has(key)) { + return + } + seen.add(key) + merged.push(child) + }) + }) + + return merged +} + // Helper function to build a tree of trace spans from execution logs export function buildTraceSpans(result: ExecutionResult): { traceSpans: TraceSpan[] @@ -56,11 +110,8 @@ export function buildTraceSpans(result: ExecutionResult): { } } - // Prefer human-friendly workflow block naming if provided by child execution mapping - const displayName = - log.blockType === 'workflow' && log.output?.childWorkflowName - ? `${log.output.childWorkflowName} workflow` - : log.blockName || log.blockId + // Use block name consistently for all block types + const displayName = log.blockName || log.blockId const span: TraceSpan = { id: spanId, @@ -106,42 +157,11 @@ export function buildTraceSpans(result: ExecutionResult): { ;(span as any).model = log.output.model } - // Handle child workflow spans for workflow blocks - if ( - log.blockType === 'workflow' && - log.output?.childTraceSpans && - Array.isArray(log.output.childTraceSpans) - ) { - // Convert child trace spans to be direct children of this workflow block span - const childTraceSpans = log.output.childTraceSpans as TraceSpan[] - - // Process child workflow spans and add them as children - const flatChildSpans: TraceSpan[] = [] - childTraceSpans.forEach((childSpan) => { - // Skip the synthetic workflow span wrapper - we only want the actual block executions - if ( - childSpan.type === 'workflow' && - (childSpan.name === 'Workflow Execution' || childSpan.name.endsWith(' workflow')) - ) { - // Add its children directly, skipping the synthetic wrapper - if (childSpan.children && Array.isArray(childSpan.children)) { - flatChildSpans.push(...childSpan.children) - } - } else { - // This is a regular span, add it directly - // But first, ensure nested workflow blocks in this span are also processed - const processedSpan = ensureNestedWorkflowsProcessed(childSpan) - flatChildSpans.push(processedSpan) - } - }) - - // Add the child spans as children of this workflow block - span.children = flatChildSpans - } - // Enhanced approach: Use timeSegments for sequential flow if available // This provides the actual model→tool→model execution sequence + // Skip for workflow blocks since they will be processed via output.childTraceSpans at the end if ( + !isWorkflowBlockType(log.blockType) && log.output?.providerTiming?.timeSegments && Array.isArray(log.output.providerTiming.timeSegments) ) { @@ -250,6 +270,17 @@ export function buildTraceSpans(result: ExecutionResult): { } } + // Handle child workflow spans for workflow blocks - process at the end to avoid being overwritten + if ( + isWorkflowBlockType(log.blockType) && + log.output?.childTraceSpans && + Array.isArray(log.output.childTraceSpans) + ) { + const childTraceSpans = log.output.childTraceSpans as TraceSpan[] + const flattenedChildren = flattenWorkflowChildren(childTraceSpans) + span.children = mergeTraceSpanChildren(span.children || [], flattenedChildren) + } + // Store in map spanMap.set(spanId, span) }) @@ -327,7 +358,7 @@ export function buildTraceSpans(result: ExecutionResult): { } // Check if this span could be a parent to future spans - if (log.blockType === 'agent' || log.blockType === 'workflow') { + if (log.blockType === 'agent' || isWorkflowBlockType(log.blockType)) { spanStack.push(span) } }) @@ -594,36 +625,41 @@ function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] { } function ensureNestedWorkflowsProcessed(span: TraceSpan): TraceSpan { - const processedSpan = { ...span } - - if ( - span.type === 'workflow' && - span.output?.childTraceSpans && - Array.isArray(span.output.childTraceSpans) - ) { - const childTraceSpans = span.output.childTraceSpans as TraceSpan[] - const nestedChildren: TraceSpan[] = [] - - childTraceSpans.forEach((childSpan) => { - if ( - childSpan.type === 'workflow' && - (childSpan.name === 'Workflow Execution' || childSpan.name.endsWith(' workflow')) - ) { - if (childSpan.children && Array.isArray(childSpan.children)) { - childSpan.children.forEach((grandchildSpan) => { - nestedChildren.push(ensureNestedWorkflowsProcessed(grandchildSpan)) - }) - } - } else { - nestedChildren.push(ensureNestedWorkflowsProcessed(childSpan)) - } - }) + const processedSpan: TraceSpan = { ...span } + + if (processedSpan.output && typeof processedSpan.output === 'object') { + processedSpan.output = { ...processedSpan.output } + } + + const normalizedChildren = Array.isArray(span.children) + ? span.children.map((child) => ensureNestedWorkflowsProcessed(child)) + : [] - processedSpan.children = nestedChildren - } else if (span.children && Array.isArray(span.children)) { - processedSpan.children = span.children.map((child) => ensureNestedWorkflowsProcessed(child)) + const outputChildSpans = (() => { + if (!processedSpan.output || typeof processedSpan.output !== 'object') { + return [] as TraceSpan[] + } + + const maybeChildSpans = (processedSpan.output as { childTraceSpans?: TraceSpan[] }) + .childTraceSpans + if (!Array.isArray(maybeChildSpans) || maybeChildSpans.length === 0) { + return [] as TraceSpan[] + } + + return flattenWorkflowChildren(maybeChildSpans) + })() + + const mergedChildren = mergeTraceSpanChildren(normalizedChildren, outputChildSpans) + + if (processedSpan.output && 'childTraceSpans' in processedSpan.output) { + const { childTraceSpans, ...cleanOutput } = processedSpan.output as { + childTraceSpans?: TraceSpan[] + } & Record + processedSpan.output = cleanOutput } + processedSpan.children = mergedChildren.length > 0 ? mergedChildren : undefined + return processedSpan } diff --git a/apps/sim/lib/webhooks/processor.ts b/apps/sim/lib/webhooks/processor.ts index 38bf2ccb31..73730df2d9 100644 --- a/apps/sim/lib/webhooks/processor.ts +++ b/apps/sim/lib/webhooks/processor.ts @@ -2,6 +2,7 @@ import { db, webhook, workflow } from '@sim/db' import { tasks } from '@trigger.dev/sdk' import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' +import { getApiKeyOwnerUserId } from '@/lib/api-key/service' import { checkServerSideUsageLimits } from '@/lib/billing' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' import { env, isTruthy } from '@/lib/env' @@ -268,18 +269,25 @@ export async function checkRateLimits( requestId: string ): Promise { try { - const userSubscription = await getHighestPrioritySubscription(foundWorkflow.userId) + const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId) + + if (!actorUserId) { + logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`) + return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 }) + } + + const userSubscription = await getHighestPrioritySubscription(actorUserId) const rateLimiter = new RateLimiter() const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription( - foundWorkflow.userId, + actorUserId, userSubscription, 'webhook', true ) if (!rateLimitCheck.allowed) { - logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, { + logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${actorUserId}`, { provider: foundWebhook.provider, remaining: rateLimitCheck.remaining, resetAt: rateLimitCheck.resetAt, @@ -319,10 +327,17 @@ export async function checkUsageLimits( } try { - const usageCheck = await checkServerSideUsageLimits(foundWorkflow.userId) + const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId) + + if (!actorUserId) { + logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`) + return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 }) + } + + const usageCheck = await checkServerSideUsageLimits(actorUserId) if (usageCheck.isExceeded) { logger.warn( - `[${requestId}] User ${foundWorkflow.userId} has exceeded usage limits. Skipping webhook execution.`, + `[${requestId}] User ${actorUserId} has exceeded usage limits. Skipping webhook execution.`, { currentUsage: usageCheck.currentUsage, limit: usageCheck.limit, @@ -361,10 +376,16 @@ export async function queueWebhookExecution( options: WebhookProcessorOptions ): Promise { try { + const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId) + if (!actorUserId) { + logger.warn(`[${options.requestId}] Webhook requires pinned API key to attribute usage`) + return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 }) + } + const payload = { webhookId: foundWebhook.id, workflowId: foundWorkflow.id, - userId: foundWorkflow.userId, + userId: actorUserId, provider: foundWebhook.provider, body, headers: Object.fromEntries(request.headers.entries()),