Skip to content

Commit c61c57a

Browse files
Sg312waleedlatif1icecrasher321
authored andcommitted
fix(streaming-api): fix streaming api (#1846)
* fix(billing): should allow restoring subscription (#1728) * fix(already-cancelled-sub): UI should allow restoring subscription * restore functionality fixed * fix * Fix streaming api * Fix uuid stuff * Lint * Stripe docs fix * Fix docs build error * Fix uuid check * Fix deployed chat streaming for non agent blocks * Fix lint --------- Co-authored-by: Waleed <walif6@gmail.com> Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
1 parent fd2129d commit c61c57a

File tree

8 files changed

+195
-16
lines changed

8 files changed

+195
-16
lines changed

apps/docs/content/docs/de/tools/stripe.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ Ein neues Abonnement für einen Kunden erstellen
359359
| --------- | ---- | -------- | ----------- |
360360
| `apiKey` | string | Ja | Stripe API-Schlüssel (geheimer Schlüssel) |
361361
| `customer` | string | Ja | Kunden-ID für das Abonnement |
362-
| `items` | json | Ja | Array von Artikeln mit Preis-IDs (z.B. [{"price": "price_xxx", "quantity": 1}]) |
362+
| `items` | json | Ja | Array von Artikeln mit Preis-IDs \(z.B. \[\{"price": "price_xxx", "quantity": 1\}\]\) |
363363
| `trial_period_days` | number | Nein | Anzahl der Testtage |
364364
| `default_payment_method` | string | Nein | Zahlungsmethoden-ID |
365365
| `cancel_at_period_end` | boolean | Nein | Abonnement am Ende der Periode kündigen |
@@ -774,7 +774,7 @@ Alle Zahlungen auflisten
774774
| `apiKey` | string | Ja | Stripe API-Schlüssel (geheimer Schlüssel) |
775775
| `limit` | number | Nein | Anzahl der zurückzugebenden Ergebnisse (Standard 10, max. 100) |
776776
| `customer` | string | Nein | Nach Kunden-ID filtern |
777-
| `created` | json | Nein | Nach Erstellungsdatum filtern (z.B. {"gt": 1633024800}) |
777+
| `created` | json | Nein | Nach Erstellungsdatum filtern \(z.B. \{"gt": 1633024800\}\) |
778778

779779
#### Output
780780

@@ -1051,7 +1051,7 @@ Alle Events auflisten
10511051
| `apiKey` | string | Ja | Stripe API-Schlüssel (Secret-Key) |
10521052
| `limit` | number | Nein | Anzahl der zurückzugebenden Ergebnisse (Standard 10, max. 100) |
10531053
| `type` | string | Nein | Nach Event-Typ filtern (z.B. payment_intent.created) |
1054-
| `created` | json | Nein | Nach Erstellungsdatum filtern (z.B. {"gt": 1633024800}) |
1054+
| `created` | json | Nein | Nach Erstellungsdatum filtern \(z.B. \{"gt": 1633024800\}\) |
10551055

10561056
#### Output
10571057

apps/docs/content/docs/es/tools/stripe.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ Crear un nuevo Payment Intent para procesar un pago
7979
| `description` | string | No | Descripción del pago |
8080
| `receipt_email` | string | No | Dirección de correo electrónico para enviar el recibo |
8181
| `metadata` | json | No | Conjunto de pares clave-valor para almacenar información adicional |
82-
| `automatic_payment_methods` | json | No | Habilitar métodos de pago automáticos (p. ej., {"enabled": true}) |
82+
| `automatic_payment_methods` | json | No | Habilitar métodos de pago automáticos \(p. ej., \{"enabled": true\}\) |
8383

8484
#### Salida
8585

@@ -359,7 +359,7 @@ Crear una nueva suscripción para un cliente
359359
| --------- | ---- | -------- | ----------- |
360360
| `apiKey` | string || Clave API de Stripe (clave secreta) |
361361
| `customer` | string || ID del cliente a suscribir |
362-
| `items` | json || Array de elementos con IDs de precio (p. ej., [{"price": "price_xxx", "quantity": 1}]) |
362+
| `items` | json || Array de elementos con IDs de precio \(p. ej., \[\{"price": "price_xxx", "quantity": 1\}\]\) |
363363
| `trial_period_days` | number | No | Número de días de prueba |
364364
| `default_payment_method` | string | No | ID del método de pago |
365365
| `cancel_at_period_end` | boolean | No | Cancelar suscripción al final del período |
@@ -1051,7 +1051,7 @@ Listar todos los eventos
10511051
| `apiKey` | string || Clave API de Stripe (clave secreta) |
10521052
| `limit` | number | No | Número de resultados a devolver (predeterminado 10, máximo 100) |
10531053
| `type` | string | No | Filtrar por tipo de evento (p. ej., payment_intent.created) |
1054-
| `created` | json | No | Filtrar por fecha de creación (p. ej., {"gt": 1633024800}) |
1054+
| `created` | json | No | Filtrar por fecha de creación \(p. ej., \{"gt": 1633024800\}\) |
10551055

10561056
#### Salida
10571057

apps/docs/content/docs/fr/tools/stripe.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ Créer une nouvelle intention de paiement pour traiter un paiement
7979
| `description` | string | Non | Description du paiement |
8080
| `receipt_email` | string | Non | Adresse e-mail pour l'envoi du reçu |
8181
| `metadata` | json | Non | Ensemble de paires clé-valeur pour stocker des informations supplémentaires |
82-
| `automatic_payment_methods` | json | Non | Activer les méthodes de paiement automatiques (par ex., {"enabled": true}) |
82+
| `automatic_payment_methods` | json | Non | Activer les méthodes de paiement automatiques \(par ex., \{"enabled": true\}\) |
8383

8484
#### Sortie
8585

@@ -359,7 +359,7 @@ Créer un nouvel abonnement pour un client
359359
| --------- | ---- | -------- | ----------- |
360360
| `apiKey` | string | Oui | Clé API Stripe (clé secrète) |
361361
| `customer` | string | Oui | ID du client à abonner |
362-
| `items` | json | Oui | Tableau d'articles avec IDs de prix (ex., \[\{"price": "price_xxx", "quantity": 1\}\]) |
362+
| `items` | json | Oui | Tableau d'articles avec IDs de prix \(ex., \[\{"price": "price_xxx", "quantity": 1\}\]\) |
363363
| `trial_period_days` | number | Non | Nombre de jours d'essai |
364364
| `default_payment_method` | string | Non | ID de la méthode de paiement |
365365
| `cancel_at_period_end` | boolean | Non | Annuler l'abonnement à la fin de la période |
@@ -1051,7 +1051,7 @@ Lister tous les événements
10511051
| `apiKey` | string | Oui | Clé API Stripe (clé secrète) |
10521052
| `limit` | number | Non | Nombre de résultats à retourner (par défaut 10, max 100) |
10531053
| `type` | string | Non | Filtrer par type d'événement (ex. : payment_intent.created) |
1054-
| `created` | json | Non | Filtrer par date de création (ex. : {"gt": 1633024800}) |
1054+
| `created` | json | Non | Filtrer par date de création \(ex. : \{"gt": 1633024800\}\) |
10551055

10561056
#### Sortie
10571057

apps/docs/content/docs/ja/tools/stripe.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ IDで既存の顧客を取得する
359359
| --------- | ---- | -------- | ----------- |
360360
| `apiKey` | string | はい | Stripe APIキー(シークレットキー) |
361361
| `customer` | string | はい | サブスクライブする顧客ID |
362-
| `items` | json | はい | 価格IDを含むアイテムの配列(例:[{"price": "price_xxx", "quantity": 1}]|
362+
| `items` | json | はい | 価格IDを含むアイテムの配列(例:\[\{"price": "price_xxx", "quantity": 1\}\]|
363363
| `trial_period_days` | number | いいえ | トライアル日数 |
364364
| `default_payment_method` | string | いいえ | 支払い方法ID |
365365
| `cancel_at_period_end` | boolean | いいえ | 期間終了時にサブスクリプションをキャンセルする |

apps/docs/content/docs/zh/tools/stripe.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
7979
| `description` | string || 支付描述 |
8080
| `receipt_email` | string || 用于发送收据的电子邮件地址 |
8181
| `metadata` | json || 用于存储附加信息的键值对集合 |
82-
| `automatic_payment_methods` | json || 启用自动支付方式(例如,{"enabled": true}|
82+
| `automatic_payment_methods` | json || 启用自动支付方式(例如,\{"enabled": true\}|
8383

8484
#### 输出
8585

@@ -197,7 +197,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
197197
| `apiKey` | string || Stripe API 密钥(secret key) |
198198
| `limit` | number || 返回结果数量(默认 10,最大 100) |
199199
| `customer` | string || 按客户 ID 过滤 |
200-
| `created` | json || 按创建日期过滤(例如,{"gt": 1633024800}|
200+
| `created` | json || 按创建日期过滤(例如,\{"gt": 1633024800\}|
201201

202202
#### 输出
203203

@@ -774,7 +774,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
774774
| `apiKey` | string || Stripe API 密钥(密钥) |
775775
| `limit` | number || 返回结果的数量(默认 10,最大 100) |
776776
| `customer` | string || 按客户 ID 过滤 |
777-
| `created` | json || 按创建日期过滤(例如,{"gt": 1633024800}|
777+
| `created` | json || 按创建日期过滤(例如,\{"gt": 1633024800\}|
778778

779779
#### 输出
780780

@@ -1051,7 +1051,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
10511051
| `apiKey` | string || Stripe API 密钥(密钥) |
10521052
| `limit` | number || 返回结果数量(默认 10,最大 100) |
10531053
| `type` | string || 按事件类型过滤(例如,payment_intent.created) |
1054-
| `created` | json || 按创建日期过滤(例如,{"gt": 1633024800}|
1054+
| `created` | json || 按创建日期过滤(例如,\{"gt": 1633024800\}|
10551055

10561056
#### 输出
10571057

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { type NextRequest, NextResponse } from 'next/server'
2-
import { v4 as uuidv4 } from 'uuid'
2+
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { checkServerSideUsageLimits } from '@/lib/billing'
55
import { processInputFileFields } from '@/lib/execution/files'
@@ -13,6 +13,7 @@ import {
1313
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
1414
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
1515
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
16+
import { createStreamingResponse } from '@/lib/workflows/streaming'
1617
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
1718
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
1819
import type { StreamingExecution } from '@/executor/types'
@@ -184,6 +185,60 @@ export function createFilteredResult(result: any) {
184185
}
185186
}
186187

188+
function resolveOutputIds(
189+
selectedOutputs: string[] | undefined,
190+
blocks: Record<string, any>
191+
): string[] | undefined {
192+
if (!selectedOutputs || selectedOutputs.length === 0) {
193+
return selectedOutputs
194+
}
195+
196+
return selectedOutputs.map((outputId) => {
197+
const underscoreIndex = outputId.indexOf('_')
198+
const dotIndex = outputId.indexOf('.')
199+
if (underscoreIndex > 0) {
200+
const maybeUuid = outputId.substring(0, underscoreIndex)
201+
if (uuidValidate(maybeUuid)) {
202+
return outputId
203+
}
204+
}
205+
206+
if (dotIndex > 0) {
207+
const maybeUuid = outputId.substring(0, dotIndex)
208+
if (uuidValidate(maybeUuid)) {
209+
return `${outputId.substring(0, dotIndex)}_${outputId.substring(dotIndex + 1)}`
210+
}
211+
}
212+
213+
if (uuidValidate(outputId)) {
214+
return outputId
215+
}
216+
217+
if (dotIndex === -1) {
218+
logger.warn(`Invalid output ID format (missing dot): ${outputId}`)
219+
return outputId
220+
}
221+
222+
const blockName = outputId.substring(0, dotIndex)
223+
const path = outputId.substring(dotIndex + 1)
224+
225+
const normalizedBlockName = blockName.toLowerCase().replace(/\s+/g, '')
226+
const block = Object.values(blocks).find((b: any) => {
227+
const normalized = (b.name || '').toLowerCase().replace(/\s+/g, '')
228+
return normalized === normalizedBlockName
229+
})
230+
231+
if (!block) {
232+
logger.warn(`Block not found for name: ${blockName} (from output ID: ${outputId})`)
233+
return outputId
234+
}
235+
236+
const resolvedId = `${block.id}_${path}`
237+
logger.debug(`Resolved output ID: ${outputId} -> ${resolvedId}`)
238+
return resolvedId
239+
})
240+
}
241+
187242
/**
188243
* POST /api/workflows/[id]/execute
189244
*
@@ -425,7 +480,32 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
425480
}
426481
}
427482

428-
logger.info(`[${requestId}] Using SSE execution (streaming response)`)
483+
if (shouldUseDraftState) {
484+
logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`)
485+
} else {
486+
logger.info(`[${requestId}] Using streaming API response`)
487+
const deployedData = await loadDeployedWorkflowState(workflowId)
488+
const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {})
489+
const stream = await createStreamingResponse({
490+
requestId,
491+
workflow,
492+
input: processedInput,
493+
executingUserId: userId,
494+
streamConfig: {
495+
selectedOutputs: resolvedSelectedOutputs,
496+
isSecureMode: false,
497+
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
498+
},
499+
createFilteredResult,
500+
executionId,
501+
})
502+
503+
return new NextResponse(stream, {
504+
status: 200,
505+
headers: SSE_HEADERS,
506+
})
507+
}
508+
429509
const encoder = new TextEncoder()
430510
let executorInstance: any = null
431511
let isStreamClosed = false

apps/sim/app/chat/[identifier]/chat.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ export default function ChatClient({ identifier }: { identifier: string }) {
409409
autoPlayResponses: shouldPlayAudio,
410410
},
411411
audioStreamHandler: audioHandler,
412+
outputConfigs: chatConfig?.outputConfigs,
412413
}
413414
)
414415
} catch (error: any) {

apps/sim/app/chat/hooks/use-chat-streaming.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export interface StreamingOptions {
2121
onAudioStart?: () => void
2222
onAudioEnd?: () => void
2323
audioStreamHandler?: (text: string) => Promise<void>
24+
outputConfigs?: Array<{ blockId: string; path?: string }>
2425
}
2526

2627
export function useChatStreaming() {
@@ -176,17 +177,114 @@ export function useChatStreaming() {
176177
}
177178

178179
if (eventType === 'final' && json.data) {
180+
const finalData = json.data as {
181+
success: boolean
182+
error?: string | { message?: string }
183+
output?: Record<string, Record<string, any>>
184+
}
185+
186+
const outputConfigs = streamingOptions?.outputConfigs
187+
const formattedOutputs: string[] = []
188+
189+
const formatValue = (value: any): string | null => {
190+
if (value === null || value === undefined) {
191+
return null
192+
}
193+
194+
if (typeof value === 'string') {
195+
return value
196+
}
197+
198+
if (typeof value === 'object') {
199+
try {
200+
return `\`\`\`json\n${JSON.stringify(value, null, 2)}\n\`\`\``
201+
} catch {
202+
return String(value)
203+
}
204+
}
205+
206+
return String(value)
207+
}
208+
209+
const getOutputValue = (blockOutputs: Record<string, any>, path?: string) => {
210+
if (!path || path === 'content') {
211+
if (blockOutputs.content !== undefined) return blockOutputs.content
212+
if (blockOutputs.result !== undefined) return blockOutputs.result
213+
return blockOutputs
214+
}
215+
216+
if (blockOutputs[path] !== undefined) {
217+
return blockOutputs[path]
218+
}
219+
220+
if (path.includes('.')) {
221+
return path.split('.').reduce<any>((current, segment) => {
222+
if (current && typeof current === 'object' && segment in current) {
223+
return current[segment]
224+
}
225+
return undefined
226+
}, blockOutputs)
227+
}
228+
229+
return undefined
230+
}
231+
232+
if (outputConfigs?.length && finalData.output) {
233+
for (const config of outputConfigs) {
234+
const blockOutputs = finalData.output[config.blockId]
235+
if (!blockOutputs) continue
236+
237+
const value = getOutputValue(blockOutputs, config.path)
238+
const formatted = formatValue(value)
239+
if (formatted) {
240+
formattedOutputs.push(formatted)
241+
}
242+
}
243+
}
244+
245+
let finalContent = accumulatedText
246+
247+
if (formattedOutputs.length > 0) {
248+
const combinedOutputs = formattedOutputs.join('\n\n')
249+
finalContent = finalContent
250+
? `${finalContent.trim()}\n\n${combinedOutputs}`
251+
: combinedOutputs
252+
}
253+
254+
if (!finalContent) {
255+
if (finalData.error) {
256+
if (typeof finalData.error === 'string') {
257+
finalContent = finalData.error
258+
} else if (typeof finalData.error?.message === 'string') {
259+
finalContent = finalData.error.message
260+
}
261+
} else if (finalData.success && finalData.output) {
262+
const fallbackOutput = Object.values(finalData.output)
263+
.map((block) => formatValue(block)?.trim())
264+
.filter(Boolean)[0]
265+
if (fallbackOutput) {
266+
finalContent = fallbackOutput
267+
}
268+
}
269+
}
270+
179271
setMessages((prev) =>
180272
prev.map((msg) =>
181273
msg.id === messageId
182274
? {
183275
...msg,
184276
isStreaming: false,
277+
content: finalContent ?? msg.content,
185278
}
186279
: msg
187280
)
188281
)
189282

283+
accumulatedTextRef.current = ''
284+
lastStreamedPositionRef.current = 0
285+
lastDisplayedPositionRef.current = 0
286+
audioStreamingActiveRef.current = false
287+
190288
return
191289
}
192290

0 commit comments

Comments
 (0)