-
-
Notifications
You must be signed in to change notification settings - Fork 709
Implement example of streamText fallback using an AI data stream #2068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
WalkthroughA new streaming chat implementation has been introduced, supporting both Anthropic and OpenAI models with provider fallback, chunked streaming, error handling, and detailed logging. Several schema tasks and helper functions were added to manage streaming chat responses, process streaming data, and handle message saving and logging in a consistent interface. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant chatStreamCaller
participant chatStream
participant createStreamWithProvider
participant Anthropic
participant OpenAI
Caller->>chatStreamCaller: Trigger chat stream
chatStreamCaller->>chatStream: Start streaming
chatStream->>createStreamWithProvider: Try Anthropic
createStreamWithProvider->>Anthropic: Stream response
Anthropic-->>createStreamWithProvider: Success/Failure
alt Anthropic fails
chatStream->>createStreamWithProvider: Try OpenAI
createStreamWithProvider->>OpenAI: Stream response
OpenAI-->>createStreamWithProvider: Success/Failure
end
createStreamWithProvider-->>chatStream: Return streamed text
chatStream-->>chatStreamCaller: Stream chunks/logs
chatStreamCaller-->>Caller: Receive streamed chunks
sequenceDiagram
participant Caller
participant chatStreamCaller2
participant chatStream2
participant streamTextWithModel
participant PrimaryModel
participant FallbackModel
Caller->>chatStreamCaller2: Trigger chat stream 2
chatStreamCaller2->>chatStream2: Start streaming 2
chatStream2->>streamTextWithModel: Stream with primary model
streamTextWithModel->>PrimaryModel: Stream response
PrimaryModel-->>streamTextWithModel: Success/Failure
alt PrimaryModel fails and fallback exists
streamTextWithModel->>FallbackModel: Stream response
FallbackModel-->>streamTextWithModel: Success/Failure
end
streamTextWithModel-->>chatStream2: Return streamed text
chatStream2-->>chatStreamCaller2: Stream chunks/logs
chatStreamCaller2-->>Caller: Receive streamed chunks
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
references/d3-chat/src/trigger/chat.ts (1)
418-424
: Parametermodel
is ignored on OpenAI fallbackWhen Anthrop̄ic fails, the fallback hard-codes
openai("gpt-4")
, disregarding themodel
argument supplied by the caller (which defaults to"claude-3-7-sonnet-20250219"
but might be overridden).
If the intention is to always fall back to a specific OpenAI model, document this; otherwise forward the requested model:- model: openai("gpt-4"), + model: openai(model ?? "gpt-4"),(and validate that the string provided is compatible with OpenAI first).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
references/d3-chat/src/trigger/chat.ts
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
references/d3-chat/src/trigger/chat.ts (4)
packages/core/src/v3/workers/taskExecutor.ts (1)
result
(1262-1309)references/d3-chat/src/app/layout.tsx (1)
metadata
(15-18)packages/trigger-sdk/src/v3/tasks.ts (1)
schemaTask
(83-83)packages/core/src/v3/taskContext/index.ts (1)
ctx
(26-28)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
onError: (error) => { | ||
logger.error("Error in chatStream task (streamText)", { | ||
error: error instanceof Error ? error.message : "Unknown error", | ||
stack: error instanceof Error ? error.stack : undefined, | ||
provider: model.provider, | ||
}); | ||
|
||
if (fallbackModel) { | ||
streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId); | ||
} | ||
}, | ||
onChunk: async (chunk) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fallback recursion can leak streams & duplicate output
streamTextWithModel
recursively calls itself on onError
, but it doesn’t cancel/abort the failing stream or gate against multiple merges into the same DataStreamWriter
.
This can lead to:
- Two active model streams writing concurrently to the same
dataStream
. - Memory/file-descriptor leakage if the first stream keeps running.
- Duplicate chunks in the consumer.
Introduce an AbortController
(or equivalent) to cancel the first stream before invoking the fallback, and ensure you only merge once:
const controller = new AbortController();
...
onError: (err) => {
controller.abort(); // cancel first stream
if (fallbackModel) {
streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId);
}
},
...
const result = streamText({ ..., abortSignal: controller.signal, ... });
This guarantees a clean hand-over to the fallback provider.
🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts around lines 540 to 551, the onError
handler calls streamTextWithModel recursively without aborting the current
stream, causing potential concurrent writes, resource leaks, and duplicate
output. Fix this by creating an AbortController before starting the stream,
passing its signal to the streamText call, and calling controller.abort() inside
onError before invoking the fallbackModel streamTextWithModel call. Also, ensure
that fallback merging happens only once to prevent multiple concurrent streams
writing to the same dataStream.
chat_id: z.string().default("chat"), | ||
messages: z.array(z.unknown()).describe("Array of chat messages"), | ||
message_request_id: z.string().describe("Unique identifier for the message request"), | ||
model: z.string().default("claude-3-7-sonnet-20250219"), | ||
userId: z.string().optional().describe("User ID for authentication"), | ||
existingProject: z.boolean().default(false).describe("Whether the project already exists"), | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Strengthen the Zod schema for messages
messages
is currently declared as z.array(z.unknown())
but immediately cast to CoreMessage[]
downstream.
You lose compile-time safety and runtime validation (e.g. that role
& content
are present).
Consider validating the expected shape:
- messages: z.array(z.unknown()).describe("Array of chat messages"),
+ messages: z.array(
+ z.object({
+ role: z.enum(["system", "assistant", "user", "tool"]),
+ content: z.string(),
+ })
+ ).describe("Array of chat messages"),
This prevents malformed inputs from reaching the model and removes the need for as CoreMessage[]
casts.
🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts around lines 381 to 387, the Zod
schema for the `messages` field is currently defined as an array of unknowns,
which lacks proper validation and type safety. To fix this, define a more
specific Zod schema for each message object that includes required properties
like `role` and `content` with appropriate types, then use this schema in the
array definition. This will enforce runtime validation and eliminate the need
for unsafe type casting to `CoreMessage[]`.
async function createStreamWithProvider(params: { | ||
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>; | ||
messages: CoreMessage[]; | ||
message_request_id: string; | ||
chat_id: string; | ||
userId?: string; | ||
}) { | ||
const { model, messages, message_request_id, chat_id, userId } = params; | ||
|
||
return new Promise<string>((resolve, reject) => { | ||
const dataStreamResponse = createDataStream({ | ||
execute: async (dataStream) => { | ||
const result = streamText({ | ||
model, | ||
system: "This is the system prompt, please be nice.", | ||
messages, | ||
maxSteps: 20, | ||
toolCallStreaming: true, | ||
onError: (error) => { | ||
logger.error("Error in chatStream task (streamText)", { | ||
error: error instanceof Error ? error.message : "Unknown error", | ||
stack: error instanceof Error ? error.stack : undefined, | ||
provider: model.provider, | ||
}); | ||
reject(error); | ||
}, | ||
onChunk: async (chunk) => { | ||
console.log("Chunk:", chunk); | ||
}, | ||
onFinish: async ({ response, reasoning }) => { | ||
metadata.flush(); | ||
logger.info("AI stream finished", { | ||
chat_id, | ||
userId, | ||
messageCount: response.messages.length, | ||
provider: model.provider, | ||
}); | ||
|
||
if (userId) { | ||
try { | ||
// Pretend to save messages | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
|
||
logger.info("Successfully saved AI response messages", { | ||
chat_id, | ||
userId, | ||
messageCount: response.messages.length, | ||
message: JSON.stringify(response.messages, null, 2), | ||
provider: model.provider, | ||
}); | ||
} catch (error) { | ||
logger.error("Failed to save AI response messages", { | ||
error: error instanceof Error ? error.message : "Unknown error", | ||
stack: error instanceof Error ? error.stack : undefined, | ||
chat_id, | ||
userId, | ||
provider: model.provider, | ||
}); | ||
} | ||
} | ||
}, | ||
}); | ||
|
||
result.consumeStream(); | ||
|
||
result.mergeIntoDataStream(dataStream, { | ||
sendReasoning: true, | ||
}); | ||
}, | ||
onError: (error) => { | ||
logger.error("Error in chatStream task (createDataStream)", { | ||
error: error instanceof Error ? error.message : "Unknown error", | ||
stack: error instanceof Error ? error.stack : undefined, | ||
provider: model.provider, | ||
}); | ||
reject(error); | ||
return error instanceof Error ? error.message : String(error); | ||
}, | ||
}); | ||
|
||
// Process the stream | ||
(async () => { | ||
try { | ||
const stream = await metadata.stream("dataStream", dataStreamResponse); | ||
let fullResponse = ""; | ||
|
||
for await (const chunk of stream) { | ||
fullResponse += chunk; | ||
} | ||
|
||
// Only resolve if we haven't rejected due to an error | ||
resolve(fullResponse); | ||
} catch (error) { | ||
reject(error); | ||
} | ||
})(); | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against multiple resolve
/ reject
executions in createStreamWithProvider
streamText
and createDataStream
can emit both onError
and onFinish
; additionally the outer IIFE that reads the stream can complete even after onError
fired.
Because the promise returned by createStreamWithProvider
calls reject()
inside onError
and later calls resolve()
when the reader loop ends, the same promise may attempt to settle twice, which is illegal (the second call is silently ignored) and makes debugging harder.
- return new Promise<string>((resolve, reject) => {
+ return new Promise<string>((resolve, reject) => {
+ // Ensure the promise is settled exactly once
+ let settled = false;
...
- reject(error);
+ if (!settled) {
+ settled = true;
+ reject(error);
+ }
...
- resolve(fullResponse);
+ if (!settled) {
+ settled = true;
+ resolve(fullResponse);
+ }
...
- reject(error);
+ if (!settled) {
+ settled = true;
+ reject(error);
+ }
This single-settle guard ensures only the first outcome wins, preventing confusing “handled after rejection” warnings and potential memory leaks.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async function createStreamWithProvider(params: { | |
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>; | |
messages: CoreMessage[]; | |
message_request_id: string; | |
chat_id: string; | |
userId?: string; | |
}) { | |
const { model, messages, message_request_id, chat_id, userId } = params; | |
return new Promise<string>((resolve, reject) => { | |
const dataStreamResponse = createDataStream({ | |
execute: async (dataStream) => { | |
const result = streamText({ | |
model, | |
system: "This is the system prompt, please be nice.", | |
messages, | |
maxSteps: 20, | |
toolCallStreaming: true, | |
onError: (error) => { | |
logger.error("Error in chatStream task (streamText)", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
provider: model.provider, | |
}); | |
reject(error); | |
}, | |
onChunk: async (chunk) => { | |
console.log("Chunk:", chunk); | |
}, | |
onFinish: async ({ response, reasoning }) => { | |
metadata.flush(); | |
logger.info("AI stream finished", { | |
chat_id, | |
userId, | |
messageCount: response.messages.length, | |
provider: model.provider, | |
}); | |
if (userId) { | |
try { | |
// Pretend to save messages | |
await new Promise((resolve) => setTimeout(resolve, 1000)); | |
logger.info("Successfully saved AI response messages", { | |
chat_id, | |
userId, | |
messageCount: response.messages.length, | |
message: JSON.stringify(response.messages, null, 2), | |
provider: model.provider, | |
}); | |
} catch (error) { | |
logger.error("Failed to save AI response messages", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
chat_id, | |
userId, | |
provider: model.provider, | |
}); | |
} | |
} | |
}, | |
}); | |
result.consumeStream(); | |
result.mergeIntoDataStream(dataStream, { | |
sendReasoning: true, | |
}); | |
}, | |
onError: (error) => { | |
logger.error("Error in chatStream task (createDataStream)", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
provider: model.provider, | |
}); | |
reject(error); | |
return error instanceof Error ? error.message : String(error); | |
}, | |
}); | |
// Process the stream | |
(async () => { | |
try { | |
const stream = await metadata.stream("dataStream", dataStreamResponse); | |
let fullResponse = ""; | |
for await (const chunk of stream) { | |
fullResponse += chunk; | |
} | |
// Only resolve if we haven't rejected due to an error | |
resolve(fullResponse); | |
} catch (error) { | |
reject(error); | |
} | |
})(); | |
}); | |
} | |
async function createStreamWithProvider(params: { | |
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>; | |
messages: CoreMessage[]; | |
message_request_id: string; | |
chat_id: string; | |
userId?: string; | |
}) { | |
const { model, messages, message_request_id, chat_id, userId } = params; | |
return new Promise<string>((resolve, reject) => { | |
// Ensure the promise is settled exactly once | |
let settled = false; | |
const dataStreamResponse = createDataStream({ | |
execute: async (dataStream) => { | |
const result = streamText({ | |
model, | |
system: "This is the system prompt, please be nice.", | |
messages, | |
maxSteps: 20, | |
toolCallStreaming: true, | |
onError: (error) => { | |
logger.error("Error in chatStream task (streamText)", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
provider: model.provider, | |
}); | |
if (!settled) { | |
settled = true; | |
reject(error); | |
} | |
}, | |
onChunk: async (chunk) => { | |
console.log("Chunk:", chunk); | |
}, | |
onFinish: async ({ response, reasoning }) => { | |
metadata.flush(); | |
logger.info("AI stream finished", { | |
chat_id, | |
userId, | |
messageCount: response.messages.length, | |
provider: model.provider, | |
}); | |
if (userId) { | |
try { | |
// Pretend to save messages | |
await new Promise((resolve) => setTimeout(resolve, 1000)); | |
logger.info("Successfully saved AI response messages", { | |
chat_id, | |
userId, | |
messageCount: response.messages.length, | |
message: JSON.stringify(response.messages, null, 2), | |
provider: model.provider, | |
}); | |
} catch (error) { | |
logger.error("Failed to save AI response messages", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
chat_id, | |
userId, | |
provider: model.provider, | |
}); | |
} | |
} | |
}, | |
}); | |
result.consumeStream(); | |
result.mergeIntoDataStream(dataStream, { | |
sendReasoning: true, | |
}); | |
}, | |
onError: (error) => { | |
logger.error("Error in chatStream task (createDataStream)", { | |
error: error instanceof Error ? error.message : "Unknown error", | |
stack: error instanceof Error ? error.stack : undefined, | |
provider: model.provider, | |
}); | |
reject(error); | |
return error instanceof Error ? error.message : String(error); | |
}, | |
}); | |
// Process the stream | |
(async () => { | |
try { | |
const stream = await metadata.stream("dataStream", dataStreamResponse); | |
let fullResponse = ""; | |
for await (const chunk of stream) { | |
fullResponse += chunk; | |
} | |
// Only resolve if we haven't rejected due to an error | |
if (!settled) { | |
settled = true; | |
resolve(fullResponse); | |
} | |
} catch (error) { | |
if (!settled) { | |
settled = true; | |
reject(error); | |
} | |
} | |
})(); | |
}); | |
} |
🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts between lines 278 and 375, the promise
in createStreamWithProvider can call resolve or reject multiple times due to
overlapping error and finish handlers. To fix this, introduce a boolean flag to
track if the promise has already settled, and check this flag before calling
resolve or reject anywhere in the function. This ensures the promise settles
only once, preventing illegal multiple settlements and related debugging issues.
Summary by CodeRabbit