Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/sim/app/api/__test-utils__/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,10 @@ export function mockExecutionDependencies() {
provider: 'provider',
providerConfig: 'providerConfig',
},
workflow: { id: 'id', userId: 'userId' },
workflow: {
id: 'id',
userId: 'userId',
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',
Expand Down
86 changes: 35 additions & 51 deletions apps/sim/app/api/chat/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { db } from '@sim/db'
import { chat, userStats, workflow } from '@sim/db/schema'
import { eq, sql } from 'drizzle-orm'
import { chat, workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
Expand All @@ -16,7 +16,7 @@ import { TriggerUtils } from '@/lib/workflows/triggers'
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
import { getBlock } from '@/blocks'
import { Executor } from '@/executor'
import type { BlockLog, ExecutionResult } from '@/executor/types'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
Expand Down Expand Up @@ -548,6 +548,7 @@ export async function executeWorkflowForChat(
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
let executionResultForLogging: ExecutionResult | null = null

try {
const streamedContent = new Map<string, string>()
Expand Down Expand Up @@ -603,6 +604,7 @@ export async function executeWorkflowForChat(
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: errorMessage },
traceSpans: [],
})
sessionCompleted = true
}
Expand Down Expand Up @@ -644,16 +646,24 @@ export async function executeWorkflowForChat(
// Set up logging on the executor
loggingSession.setupExecutor(executor)

let result
let result: ExecutionResult | StreamingExecution | undefined
try {
result = await executor.execute(workflowId, startBlockId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
if (!sessionCompleted) {
const executionResult = error?.executionResult || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)

await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Chat workflow execution failed' },
traceSpans,
})
sessionCompleted = true
}
Expand All @@ -677,31 +687,25 @@ export async function executeWorkflowForChat(
? (result.execution as ExecutionResult)
: (result as ExecutionResult)

if (executionResult?.logs) {
// Update streamed content and apply tokenization - process regardless of overall success
// This ensures partial successes (some agents succeed, some fail) still return results
executionResultForLogging = executionResult

// Add newlines between different agent outputs for better readability
if (executionResult?.logs) {
const processedOutputs = new Set<string>()
executionResult.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId)
if (log.output && content) {
// Add newline separation between different outputs (but not before the first one)
const separator = processedOutputs.size > 0 ? '\n\n' : ''
log.output.content = separator + content
processedOutputs.add(log.blockId)
}
}
})

// Also process non-streamed outputs from selected blocks (like function blocks)
// This uses the same logic as the chat panel to ensure identical behavior
const nonStreamingLogs = executionResult.logs.filter(
(log: BlockLog) => !streamedContent.has(log.blockId)
)

// Extract the exact same functions used by the chat panel
const extractBlockIdFromOutputId = (outputId: string): string => {
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
}
Expand All @@ -719,21 +723,18 @@ export async function executeWorkflowForChat(
try {
return JSON.parse(output.content)
} catch (e) {
// Fallback to original structure if parsing fails
return output
}
}

return output
}

// Filter outputs that have matching logs (exactly like chat panel)
const outputsToRender = selectedOutputIds.filter((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
})

// Process each selected output (exactly like chat panel)
for (const outputId of outputsToRender) {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
Expand All @@ -743,7 +744,6 @@ export async function executeWorkflowForChat(
let outputValue: any = log.output

if (path) {
// Parse JSON content safely (exactly like chat panel)
outputValue = parseOutputContentSafely(outputValue)

const pathParts = path.split('.')
Expand All @@ -758,16 +758,13 @@ export async function executeWorkflowForChat(
}

if (outputValue !== undefined) {
// Add newline separation between different outputs
const separator = processedOutputs.size > 0 ? '\n\n' : ''

// Format the output exactly like the chat panel
const formattedOutput =
typeof outputValue === 'string'
? outputValue
: JSON.stringify(outputValue, null, 2)

// Update the log content
if (!log.output.content) {
log.output.content = separator + formattedOutput
} else {
Expand All @@ -778,7 +775,6 @@ export async function executeWorkflowForChat(
}
}

// Process all logs for streaming tokenization
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)

Expand All @@ -793,23 +789,6 @@ export async function executeWorkflowForChat(
}
;(enrichedResult.metadata as any).conversationId = conversationId
}
// Use the executionId created at the beginning of this function
logger.debug(`Using execution ID for deployed chat: ${executionId}`)

if (executionResult.success) {
try {
await db
.update(userStats)
.set({
totalChatExecutions: sql`total_chat_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, deployment.userId))
logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`)
} catch (error) {
logger.error(`Failed to update user stats for deployed chat:`, error)
}
}
}

if (!(result && typeof result === 'object' && 'stream' in result)) {
Expand All @@ -833,30 +812,35 @@ export async function executeWorkflowForChat(

controller.close()
} catch (error: any) {
// Handle any errors that occur in the stream
logger.error(`[${requestId}] Stream error:`, error)
logger.error(`[${requestId}] Chat execution streaming error:`, error)

// Send error event to client
const encoder = new TextEncoder()
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'An unexpected error occurred',
})}\n\n`
)
)

// Try to complete the logging session with error if not already completed
if (!sessionCompleted && loggingSession) {
const executionResult = executionResultForLogging ||
(error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)

await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Stream processing error' },
traceSpans,
})
sessionCompleted = true
}

controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'Stream processing error',
})}\n\n`
)
)

controller.close()
}
},
Expand Down
Loading