Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 57 additions & 14 deletions apps/sim/app/api/chat/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console-logger'
import { persistExecutionLogs } from '@/lib/logs/execution-logger'
import { EnhancedLoggingSession } from '@/lib/logs/enhanced-logging-session'
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { processStreamingBlockLogs } from '@/lib/tokenization'
import { decryptSecret } from '@/lib/utils'
import { db } from '@/db'
import { chat, environment as envTable, userStats, workflow } from '@/db/schema'
Expand Down Expand Up @@ -252,11 +253,14 @@ export async function executeWorkflowForChat(

const deployment = deploymentResult[0]
const workflowId = deployment.workflowId
const executionId = uuidv4()

// Set up enhanced logging for chat execution
const loggingSession = new EnhancedLoggingSession(workflowId, executionId, 'chat', requestId)

// Check for multi-output configuration in customizations
const customizations = (deployment.customizations || {}) as Record<string, any>
let outputBlockIds: string[] = []
let outputPaths: string[] = []

// Extract output configs from the new schema format
if (deployment.outputConfigs && Array.isArray(deployment.outputConfigs)) {
Expand All @@ -271,13 +275,11 @@ export async function executeWorkflowForChat(
})

outputBlockIds = deployment.outputConfigs.map((config) => config.blockId)
outputPaths = deployment.outputConfigs.map((config) => config.path || '')
} else {
// Use customizations as fallback
outputBlockIds = Array.isArray(customizations.outputBlockIds)
? customizations.outputBlockIds
: []
outputPaths = Array.isArray(customizations.outputPaths) ? customizations.outputPaths : []
}

// Fall back to customizations if we still have no outputs
Expand All @@ -287,7 +289,6 @@ export async function executeWorkflowForChat(
customizations.outputBlockIds.length > 0
) {
outputBlockIds = customizations.outputBlockIds
outputPaths = customizations.outputPaths || new Array(outputBlockIds.length).fill('')
}

logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`)
Expand Down Expand Up @@ -407,6 +408,13 @@ export async function executeWorkflowForChat(
{} as Record<string, Record<string, any>>
)

// Start enhanced logging session
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: '', // TODO: Get from workflow
variables: workflowVariables,
})

const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
Expand Down Expand Up @@ -458,16 +466,41 @@ export async function executeWorkflowForChat(
},
})

const result = await executor.execute(workflowId)
// Set up enhanced logging on the executor
loggingSession.setupExecutor(executor)

let result
try {
result = await executor.execute(workflowId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Chat workflow execution failed',
stackTrace: error.stack,
},
})
throw error
}

if (result && 'success' in result) {
result.logs?.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
if (log.output) {
log.output.content = streamedContent.get(log.blockId)
// Update streamed content and apply tokenization
if (result.logs) {
result.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId)
if (log.output) {
log.output.content = content
}
}
}
})
})

// Process all logs for streaming tokenization
const processedCount = processStreamingBlockLogs(result.logs, streamedContent)
logger.info(`[CHAT-API] Processed ${processedCount} blocks for streaming tokenization`)
}

const { traceSpans, totalDuration } = buildTraceSpans(result)
const enrichedResult = { ...result, traceSpans, totalDuration }
Expand All @@ -481,8 +514,7 @@ export async function executeWorkflowForChat(
;(enrichedResult.metadata as any).conversationId = conversationId
}
const executionId = uuidv4()
await persistExecutionLogs(workflowId, executionId, enrichedResult, 'chat')
logger.debug(`Persisted logs for deployed chat: ${executionId}`)
logger.debug(`Generated execution ID for deployed chat: ${executionId}`)

if (result.success) {
try {
Expand All @@ -506,6 +538,17 @@ export async function executeWorkflowForChat(
)
}

// Complete enhanced logging session (for both success and failure)
if (result && 'success' in result) {
const { traceSpans } = buildTraceSpans(result)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
finalOutput: result.output,
traceSpans,
})
}

controller.close()
},
})
Expand Down
76 changes: 76 additions & 0 deletions apps/sim/app/api/logs/[executionId]/frozen-canvas/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@/db/schema'

const logger = createLogger('FrozenCanvasAPI')

export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ executionId: string }> }
) {
try {
const { executionId } = await params

logger.debug(`Fetching frozen canvas data for execution: ${executionId}`)

// Get the workflow execution log to find the snapshot
const [workflowLog] = await db
.select()
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)

if (!workflowLog) {
return NextResponse.json({ error: 'Workflow execution not found' }, { status: 404 })
}

// Get the workflow state snapshot
const [snapshot] = await db
.select()
.from(workflowExecutionSnapshots)
.where(eq(workflowExecutionSnapshots.id, workflowLog.stateSnapshotId))
.limit(1)

if (!snapshot) {
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}

const response = {
executionId,
workflowId: workflowLog.workflowId,
workflowState: snapshot.stateData,
executionMetadata: {
trigger: workflowLog.trigger,
startedAt: workflowLog.startedAt.toISOString(),
endedAt: workflowLog.endedAt?.toISOString(),
totalDurationMs: workflowLog.totalDurationMs,
blockStats: {
total: workflowLog.blockCount,
success: workflowLog.successCount,
error: workflowLog.errorCount,
skipped: workflowLog.skippedCount,
},
cost: {
total: workflowLog.totalCost ? Number.parseFloat(workflowLog.totalCost) : null,
input: workflowLog.totalInputCost ? Number.parseFloat(workflowLog.totalInputCost) : null,
output: workflowLog.totalOutputCost
? Number.parseFloat(workflowLog.totalOutputCost)
: null,
},
totalTokens: workflowLog.totalTokens,
},
}

logger.debug(`Successfully fetched frozen canvas data for execution: ${executionId}`)
logger.debug(
`Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
)

return NextResponse.json(response)
} catch (error) {
logger.error('Error fetching frozen canvas data:', error)
return NextResponse.json({ error: 'Failed to fetch frozen canvas data' }, { status: 500 })
}
}
Loading