Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 133 additions & 5 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
ExecutionContext,
NormalizedBlockOutput,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
Expand Down Expand Up @@ -100,11 +101,14 @@ export class BlockExecutor {
const streamingExec = output as { stream: ReadableStream; execution: any }

if (ctx.onStream) {
try {
await ctx.onStream(streamingExec)
} catch (error) {
logger.error('Error in onStream callback', { blockId: node.id, error })
}
await this.handleStreamingExecution(
ctx,
node,
block,
streamingExec,
resolvedInputs,
ctx.selectedOutputs ?? []
)
}

normalizedOutput = this.normalizeOutput(
Expand Down Expand Up @@ -446,4 +450,128 @@ export class BlockExecutor {
}
}
}

private async handleStreamingExecution(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
const blockId = node.id

const responseFormat =
resolvedInputs?.responseFormat ??
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat

const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}

const [clientStream, executorStream] = stream.tee()

const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)

const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}

const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)

const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
})()

await Promise.all([clientConsumption, executorConsumption])
}

private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
blockId,
selectedOutputs,
responseFormat
)

try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
})
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
}

private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
let fullContent = ''

try {
while (true) {
const { done, value } = await reader.read()
if (done) break
fullContent += decoder.decode(value, { stream: true })
}
} catch (error) {
logger.error('Error reading executor stream for block', { blockId, error })
} finally {
try {
reader.releaseLock()
} catch {}
}

if (!fullContent) {
return
}

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}

if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
Object.assign(executionOutput, parsed)
return
} catch (error) {
logger.warn('Failed to parse streamed content for response format', { blockId, error })
}
}

executionOutput.content = fullContent
}
}
6 changes: 3 additions & 3 deletions apps/sim/providers/anthropic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,9 @@ ${fieldDescriptions}
const providerEndTimeISO = new Date(providerEndTime).toISOString()
const totalDuration = providerEndTime - providerStartTime

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Anthropic response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final Anthropic response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is removed
// This prevents the API from trying to force tool usage again in the final streaming response
Expand Down
6 changes: 3 additions & 3 deletions apps/sim/providers/azure-openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,9 @@ export const azureOpenAIProvider: ProviderConfig = {
iterationCount++
}

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents Azure OpenAI API from trying to force tool usage again in the final streaming response
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/cerebras/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,8 @@ export const cerebrasProvider: ProviderConfig = {
const totalDuration = providerEndTime - providerStartTime

// POST-TOOL-STREAMING: stream after tool calls if requested
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Cerebras response after tool calls')
if (request.stream) {
logger.info('Using streaming for final Cerebras response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/deepseek/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ export const deepseekProvider: ProviderConfig = {
const totalDuration = providerEndTime - providerStartTime

// POST-TOOL STREAMING: stream final response after tool calls if requested
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final DeepSeek response after tool calls')
if (request.stream) {
logger.info('Using streaming for final DeepSeek response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/groq/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ export const groqProvider: ProviderConfig = {
}

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Groq response after tool calls')
if (request.stream) {
logger.info('Using streaming for final Groq response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/mistral/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ export const mistralProvider: ProviderConfig = {
iterationCount++
}

if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
if (request.stream) {
logger.info('Using streaming for final response after tool processing')

const streamingPayload = {
...payload,
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/ollama/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ export const ollamaProvider: ProviderConfig = {
}

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
if (request.stream) {
logger.info('Using streaming for final response after tool processing')

const streamingPayload = {
...payload,
Expand Down
6 changes: 3 additions & 3 deletions apps/sim/providers/openai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,9 @@ export const openaiProvider: ProviderConfig = {
iterationCount++
}

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final response after tool processing')

// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents OpenAI API from trying to force tool usage again in the final streaming response
Expand Down
2 changes: 1 addition & 1 deletion apps/sim/providers/openrouter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ export const openRouterProvider: ProviderConfig = {
iterationCount++
}

if (request.stream && iterationCount > 0) {
if (request.stream) {
const streamingPayload = {
...payload,
messages: currentMessages,
Expand Down
4 changes: 2 additions & 2 deletions apps/sim/providers/xai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ export const xAIProvider: ProviderConfig = {
})
}

// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
// For final streaming response, choose between tools (auto) or response_format (never both)
let finalStreamingPayload: any

Expand Down