From a51abcd6a996140a5fda5f31a3efce614e10c3c4 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Wed, 12 Nov 2025 21:02:24 -0800 Subject: [PATCH 1/2] Provider changes --- apps/sim/executor/execution/block-executor.ts | 138 +++++++++++++++++- apps/sim/providers/anthropic/index.ts | 6 +- apps/sim/providers/azure-openai/index.ts | 6 +- apps/sim/providers/cerebras/index.ts | 4 +- apps/sim/providers/deepseek/index.ts | 4 +- apps/sim/providers/groq/index.ts | 4 +- apps/sim/providers/mistral/index.ts | 4 +- apps/sim/providers/ollama/index.ts | 4 +- apps/sim/providers/openai/index.ts | 6 +- apps/sim/providers/openrouter/index.ts | 2 +- apps/sim/providers/xai/index.ts | 4 +- 11 files changed, 155 insertions(+), 27 deletions(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index d39387168b..07ea3ccb27 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -22,6 +22,7 @@ import type { NormalizedBlockOutput, } from '@/executor/types' import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' +import { streamingResponseFormatProcessor } from '@/executor/utils' import type { VariableResolver } from '@/executor/variables/resolver' import type { SerializedBlock } from '@/serializer/types' import type { SubflowType } from '@/stores/workflows/workflow/types' @@ -100,11 +101,14 @@ export class BlockExecutor { const streamingExec = output as { stream: ReadableStream; execution: any } if (ctx.onStream) { - try { - await ctx.onStream(streamingExec) - } catch (error) { - logger.error('Error in onStream callback', { blockId: node.id, error }) - } + await this.handleStreamingExecution( + ctx, + node, + block, + streamingExec, + resolvedInputs, + ctx.selectedOutputs ?? [] + ) } normalizedOutput = this.normalizeOutput( @@ -446,4 +450,128 @@ export class BlockExecutor { } } } + + private async handleStreamingExecution( + ctx: ExecutionContext, + node: DAGNode, + block: SerializedBlock, + streamingExec: { stream: ReadableStream; execution: any }, + resolvedInputs: Record, + selectedOutputs: string[] + ): Promise { + const blockId = node.id + + const responseFormat = + resolvedInputs?.responseFormat ?? + (block.config?.params as Record | undefined)?.responseFormat ?? + (block.config as Record | undefined)?.responseFormat + + const stream = streamingExec.stream + if (typeof stream.tee !== 'function') { + await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs) + return + } + + const [clientStream, executorStream] = stream.tee() + + const processedClientStream = streamingResponseFormatProcessor.processStream( + clientStream, + blockId, + selectedOutputs, + responseFormat + ) + + const clientStreamingExec = { + ...streamingExec, + stream: processedClientStream, + } + + const executorConsumption = this.consumeExecutorStream( + executorStream, + streamingExec, + blockId, + responseFormat + ) + + const clientConsumption = (async () => { + try { + await ctx.onStream?.(clientStreamingExec) + } catch (error) { + logger.error('Error in onStream callback', { blockId, error }) + } + })() + + await Promise.all([clientConsumption, executorConsumption]) + } + + private async forwardStream( + ctx: ExecutionContext, + blockId: string, + streamingExec: { stream: ReadableStream; execution: any }, + stream: ReadableStream, + responseFormat: any, + selectedOutputs: string[] + ): Promise { + const processedStream = streamingResponseFormatProcessor.processStream( + stream, + blockId, + selectedOutputs, + responseFormat + ) + + try { + await ctx.onStream?.({ + ...streamingExec, + stream: processedStream, + }) + } catch (error) { + logger.error('Error in onStream callback', { blockId, error }) + } + } + + private async consumeExecutorStream( + stream: ReadableStream, + streamingExec: { execution: any }, + blockId: string, + responseFormat: any + ): Promise { + const reader = stream.getReader() + const decoder = new TextDecoder() + let fullContent = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + fullContent += decoder.decode(value, { stream: true }) + } + } catch (error) { + logger.error('Error reading executor stream for block', { blockId, error }) + } finally { + try { + reader.releaseLock() + } catch {} + } + + if (!fullContent) { + return + } + + const executionOutput = streamingExec.execution?.output + if (!executionOutput || typeof executionOutput !== 'object') { + return + } + + if (responseFormat) { + try { + const parsed = JSON.parse(fullContent.trim()) + Object.assign(executionOutput, parsed) + return + } catch (error) { + logger.warn('Failed to parse streamed content for response format', { blockId, error }) + } + } + + executionOutput.content = fullContent + } } diff --git a/apps/sim/providers/anthropic/index.ts b/apps/sim/providers/anthropic/index.ts index 80eb1344da..8afa26446d 100644 --- a/apps/sim/providers/anthropic/index.ts +++ b/apps/sim/providers/anthropic/index.ts @@ -985,9 +985,9 @@ ${fieldDescriptions} const providerEndTimeISO = new Date(providerEndTime).toISOString() const totalDuration = providerEndTime - providerStartTime - // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final Anthropic response after tool calls') + // After all tool processing complete, if streaming was requested, use streaming for the final response + if (request.stream) { + logger.info('Using streaming for final Anthropic response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is removed // This prevents the API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index 419c86dcac..b4af62f63a 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -523,9 +523,9 @@ export const azureOpenAIProvider: ProviderConfig = { iterationCount++ } - // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final response after tool calls') + // After all tool processing complete, if streaming was requested, use streaming for the final response + if (request.stream) { + logger.info('Using streaming for final response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto' // This prevents Azure OpenAI API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/cerebras/index.ts b/apps/sim/providers/cerebras/index.ts index f332a6e1df..717d0babc1 100644 --- a/apps/sim/providers/cerebras/index.ts +++ b/apps/sim/providers/cerebras/index.ts @@ -455,8 +455,8 @@ export const cerebrasProvider: ProviderConfig = { const totalDuration = providerEndTime - providerStartTime // POST-TOOL-STREAMING: stream after tool calls if requested - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final Cerebras response after tool calls') + if (request.stream) { + logger.info('Using streaming for final Cerebras response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto' // This prevents the API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/deepseek/index.ts b/apps/sim/providers/deepseek/index.ts index 1fc9f2af30..a303b70b65 100644 --- a/apps/sim/providers/deepseek/index.ts +++ b/apps/sim/providers/deepseek/index.ts @@ -457,8 +457,8 @@ export const deepseekProvider: ProviderConfig = { const totalDuration = providerEndTime - providerStartTime // POST-TOOL STREAMING: stream final response after tool calls if requested - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final DeepSeek response after tool calls') + if (request.stream) { + logger.info('Using streaming for final DeepSeek response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto' // This prevents the API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/groq/index.ts b/apps/sim/providers/groq/index.ts index a986f5da20..d9ac569d21 100644 --- a/apps/sim/providers/groq/index.ts +++ b/apps/sim/providers/groq/index.ts @@ -374,8 +374,8 @@ export const groqProvider: ProviderConfig = { } // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final Groq response after tool calls') + if (request.stream) { + logger.info('Using streaming for final Groq response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto' // This prevents the API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/mistral/index.ts b/apps/sim/providers/mistral/index.ts index 6a9cb6dbe0..e2a194962f 100644 --- a/apps/sim/providers/mistral/index.ts +++ b/apps/sim/providers/mistral/index.ts @@ -447,8 +447,8 @@ export const mistralProvider: ProviderConfig = { iterationCount++ } - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final response after tool calls') + if (request.stream) { + logger.info('Using streaming for final response after tool processing') const streamingPayload = { ...payload, diff --git a/apps/sim/providers/ollama/index.ts b/apps/sim/providers/ollama/index.ts index c529ce0420..21a50efac4 100644 --- a/apps/sim/providers/ollama/index.ts +++ b/apps/sim/providers/ollama/index.ts @@ -529,8 +529,8 @@ export const ollamaProvider: ProviderConfig = { } // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final response after tool calls') + if (request.stream) { + logger.info('Using streaming for final response after tool processing') const streamingPayload = { ...payload, diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index 5d33812b44..b925dc7d1f 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -504,9 +504,9 @@ export const openaiProvider: ProviderConfig = { iterationCount++ } - // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { - logger.info('Using streaming for final response after tool calls') + // After all tool processing complete, if streaming was requested, use streaming for the final response + if (request.stream) { + logger.info('Using streaming for final response after tool processing') // When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto' // This prevents OpenAI API from trying to force tool usage again in the final streaming response diff --git a/apps/sim/providers/openrouter/index.ts b/apps/sim/providers/openrouter/index.ts index 87a08fdb95..979b5783ac 100644 --- a/apps/sim/providers/openrouter/index.ts +++ b/apps/sim/providers/openrouter/index.ts @@ -381,7 +381,7 @@ export const openRouterProvider: ProviderConfig = { iterationCount++ } - if (request.stream && iterationCount > 0) { + if (request.stream) { const streamingPayload = { ...payload, messages: currentMessages, diff --git a/apps/sim/providers/xai/index.ts b/apps/sim/providers/xai/index.ts index a8d5c3a3f4..cfa73baf27 100644 --- a/apps/sim/providers/xai/index.ts +++ b/apps/sim/providers/xai/index.ts @@ -501,8 +501,8 @@ export const xAIProvider: ProviderConfig = { }) } - // After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response - if (request.stream && iterationCount > 0) { + // After all tool processing complete, if streaming was requested, use streaming for the final response + if (request.stream) { // For final streaming response, choose between tools (auto) or response_format (never both) let finalStreamingPayload: any From 59922960a8ed19d746dbf60ed991c0b3f35faa5a Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Thu, 13 Nov 2025 10:40:35 -0800 Subject: [PATCH 2/2] Fix lint --- apps/sim/executor/execution/block-executor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 07ea3ccb27..fd93d11261 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -21,8 +21,8 @@ import type { ExecutionContext, NormalizedBlockOutput, } from '@/executor/types' -import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' import { streamingResponseFormatProcessor } from '@/executor/utils' +import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' import type { VariableResolver } from '@/executor/variables/resolver' import type { SerializedBlock } from '@/serializer/types' import type { SubflowType } from '@/stores/workflows/workflow/types'