Skip to content

Implement example of streamText fallback using an AI data stream #2068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 353 additions & 2 deletions references/d3-chat/src/trigger/chat.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { anthropic } from "@ai-sdk/anthropic";
import { openai } from "@ai-sdk/openai";
import { ai } from "@trigger.dev/sdk/ai";
import { logger, metadata, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3";
import { logger, metadata, runs, schemaTask, tasks, wait } from "@trigger.dev/sdk/v3";
import { sql } from "@vercel/postgres";
import { streamText, TextStreamPart, tool } from "ai";
import {
CoreMessage,
createDataStream,
DataStreamWriter,
streamText,
TextStreamPart,
tool,
} from "ai";
import { nanoid } from "nanoid";
import { z } from "zod";
import { sendSQLApprovalMessage } from "../lib/slack";
Expand Down Expand Up @@ -267,3 +274,347 @@ export const interruptibleChat = schemaTask({
}
},
});

async function createStreamWithProvider(params: {
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>;
messages: CoreMessage[];
message_request_id: string;
chat_id: string;
userId?: string;
}) {
const { model, messages, message_request_id, chat_id, userId } = params;

return new Promise<string>((resolve, reject) => {
const dataStreamResponse = createDataStream({
execute: async (dataStream) => {
const result = streamText({
model,
system: "This is the system prompt, please be nice.",
messages,
maxSteps: 20,
toolCallStreaming: true,
onError: (error) => {
logger.error("Error in chatStream task (streamText)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
reject(error);
},
onChunk: async (chunk) => {
console.log("Chunk:", chunk);
},
onFinish: async ({ response, reasoning }) => {
metadata.flush();
logger.info("AI stream finished", {
chat_id,
userId,
messageCount: response.messages.length,
provider: model.provider,
});

if (userId) {
try {
// Pretend to save messages
await new Promise((resolve) => setTimeout(resolve, 1000));

logger.info("Successfully saved AI response messages", {
chat_id,
userId,
messageCount: response.messages.length,
message: JSON.stringify(response.messages, null, 2),
provider: model.provider,
});
} catch (error) {
logger.error("Failed to save AI response messages", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
chat_id,
userId,
provider: model.provider,
});
}
}
},
});

result.consumeStream();

result.mergeIntoDataStream(dataStream, {
sendReasoning: true,
});
},
onError: (error) => {
logger.error("Error in chatStream task (createDataStream)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
reject(error);
return error instanceof Error ? error.message : String(error);
},
});

// Process the stream
(async () => {
try {
const stream = await metadata.stream("dataStream", dataStreamResponse);
let fullResponse = "";

for await (const chunk of stream) {
fullResponse += chunk;
}

// Only resolve if we haven't rejected due to an error
resolve(fullResponse);
} catch (error) {
reject(error);
}
})();
});
}
Comment on lines +278 to +375
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against multiple resolve / reject executions in createStreamWithProvider

streamText and createDataStream can emit both onError and onFinish; additionally the outer IIFE that reads the stream can complete even after onError fired.
Because the promise returned by createStreamWithProvider calls reject() inside onError and later calls resolve() when the reader loop ends, the same promise may attempt to settle twice, which is illegal (the second call is silently ignored) and makes debugging harder.

-  return new Promise<string>((resolve, reject) => {
+  return new Promise<string>((resolve, reject) => {
+    // Ensure the promise is settled exactly once
+    let settled = false;
...
-            reject(error);
+            if (!settled) {
+              settled = true;
+              reject(error);
+            }
...
-        resolve(fullResponse);
+        if (!settled) {
+          settled = true;
+          resolve(fullResponse);
+        }
...
-        reject(error);
+        if (!settled) {
+          settled = true;
+          reject(error);
+        }

This single-settle guard ensures only the first outcome wins, preventing confusing “handled after rejection” warnings and potential memory leaks.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async function createStreamWithProvider(params: {
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>;
messages: CoreMessage[];
message_request_id: string;
chat_id: string;
userId?: string;
}) {
const { model, messages, message_request_id, chat_id, userId } = params;
return new Promise<string>((resolve, reject) => {
const dataStreamResponse = createDataStream({
execute: async (dataStream) => {
const result = streamText({
model,
system: "This is the system prompt, please be nice.",
messages,
maxSteps: 20,
toolCallStreaming: true,
onError: (error) => {
logger.error("Error in chatStream task (streamText)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
reject(error);
},
onChunk: async (chunk) => {
console.log("Chunk:", chunk);
},
onFinish: async ({ response, reasoning }) => {
metadata.flush();
logger.info("AI stream finished", {
chat_id,
userId,
messageCount: response.messages.length,
provider: model.provider,
});
if (userId) {
try {
// Pretend to save messages
await new Promise((resolve) => setTimeout(resolve, 1000));
logger.info("Successfully saved AI response messages", {
chat_id,
userId,
messageCount: response.messages.length,
message: JSON.stringify(response.messages, null, 2),
provider: model.provider,
});
} catch (error) {
logger.error("Failed to save AI response messages", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
chat_id,
userId,
provider: model.provider,
});
}
}
},
});
result.consumeStream();
result.mergeIntoDataStream(dataStream, {
sendReasoning: true,
});
},
onError: (error) => {
logger.error("Error in chatStream task (createDataStream)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
reject(error);
return error instanceof Error ? error.message : String(error);
},
});
// Process the stream
(async () => {
try {
const stream = await metadata.stream("dataStream", dataStreamResponse);
let fullResponse = "";
for await (const chunk of stream) {
fullResponse += chunk;
}
// Only resolve if we haven't rejected due to an error
resolve(fullResponse);
} catch (error) {
reject(error);
}
})();
});
}
async function createStreamWithProvider(params: {
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>;
messages: CoreMessage[];
message_request_id: string;
chat_id: string;
userId?: string;
}) {
const { model, messages, message_request_id, chat_id, userId } = params;
return new Promise<string>((resolve, reject) => {
// Ensure the promise is settled exactly once
let settled = false;
const dataStreamResponse = createDataStream({
execute: async (dataStream) => {
const result = streamText({
model,
system: "This is the system prompt, please be nice.",
messages,
maxSteps: 20,
toolCallStreaming: true,
onError: (error) => {
logger.error("Error in chatStream task (streamText)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
if (!settled) {
settled = true;
reject(error);
}
},
onChunk: async (chunk) => {
console.log("Chunk:", chunk);
},
onFinish: async ({ response, reasoning }) => {
metadata.flush();
logger.info("AI stream finished", {
chat_id,
userId,
messageCount: response.messages.length,
provider: model.provider,
});
if (userId) {
try {
// Pretend to save messages
await new Promise((resolve) => setTimeout(resolve, 1000));
logger.info("Successfully saved AI response messages", {
chat_id,
userId,
messageCount: response.messages.length,
message: JSON.stringify(response.messages, null, 2),
provider: model.provider,
});
} catch (error) {
logger.error("Failed to save AI response messages", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
chat_id,
userId,
provider: model.provider,
});
}
}
},
});
result.consumeStream();
result.mergeIntoDataStream(dataStream, {
sendReasoning: true,
});
},
onError: (error) => {
logger.error("Error in chatStream task (createDataStream)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});
reject(error);
return error instanceof Error ? error.message : String(error);
},
});
// Process the stream
(async () => {
try {
const stream = await metadata.stream("dataStream", dataStreamResponse);
let fullResponse = "";
for await (const chunk of stream) {
fullResponse += chunk;
}
// Only resolve if we haven't rejected due to an error
if (!settled) {
settled = true;
resolve(fullResponse);
}
} catch (error) {
if (!settled) {
settled = true;
reject(error);
}
}
})();
});
}
🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts between lines 278 and 375, the promise
in createStreamWithProvider can call resolve or reject multiple times due to
overlapping error and finish handlers. To fix this, introduce a boolean flag to
track if the promise has already settled, and check this flag before calling
resolve or reject anywhere in the function. This ensures the promise settles
only once, preventing illegal multiple settlements and related debugging issues.


export const chatStream = schemaTask({
id: "chat-stream",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
chat_id: z.string().default("chat"),
messages: z.array(z.unknown()).describe("Array of chat messages"),
message_request_id: z.string().describe("Unique identifier for the message request"),
model: z.string().default("claude-3-7-sonnet-20250219"),
userId: z.string().optional().describe("User ID for authentication"),
existingProject: z.boolean().default(false).describe("Whether the project already exists"),
}),
Comment on lines +381 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Strengthen the Zod schema for messages

messages is currently declared as z.array(z.unknown()) but immediately cast to CoreMessage[] downstream.
You lose compile-time safety and runtime validation (e.g. that role & content are present).

Consider validating the expected shape:

-  messages: z.array(z.unknown()).describe("Array of chat messages"),
+  messages: z.array(
+    z.object({
+      role: z.enum(["system", "assistant", "user", "tool"]),
+      content: z.string(),
+    })
+  ).describe("Array of chat messages"),

This prevents malformed inputs from reaching the model and removes the need for as CoreMessage[] casts.

🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts around lines 381 to 387, the Zod
schema for the `messages` field is currently defined as an array of unknowns,
which lacks proper validation and type safety. To fix this, define a more
specific Zod schema for each message object that includes required properties
like `role` and `content` with appropriate types, then use this schema in the
array definition. This will enforce runtime validation and eliminate the need
for unsafe type casting to `CoreMessage[]`.

machine: "large-2x",
run: async ({ chat_id, messages, model, userId, message_request_id }) => {
logger.info("Running chat stream", {
chat_id,
messages,
model,
userId,
message_request_id,
});

try {
// First try with Anthropic
return await createStreamWithProvider({
model: anthropic(model),
messages: messages as CoreMessage[],
message_request_id,
chat_id,
userId,
});
} catch (error) {
logger.info("Anthropic stream failed, falling back to OpenAI", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
chat_id,
userId,
message_request_id,
});

try {
// Fallback to OpenAI
return await createStreamWithProvider({
model: openai("gpt-4"),
messages: messages as CoreMessage[],
message_request_id,
chat_id,
userId,
});
} catch (fallbackError) {
logger.error("Both Anthropic and OpenAI streams failed", {
error: fallbackError instanceof Error ? fallbackError.message : "Unknown error",
stack: fallbackError instanceof Error ? fallbackError.stack : undefined,
chat_id,
userId,
message_request_id,
});
throw fallbackError;
}
}
},
});

export const chatStreamCaller = schemaTask({
id: "chat-stream-caller",
description: "Call the chat stream",
schema: z.object({
prompt: z.string().describe("The prompt to chat with the AI"),
}),
run: async ({ prompt }, { ctx }) => {
const result = await chatStream.trigger({
messages: [
{
role: "user",
content: prompt,
},
],
message_request_id: ctx.run.id,
});

const stream = await runs.fetchStream(result.id, "dataStream");

for await (const chunk of stream) {
console.log("Chunk:", chunk);
}

return result;
},
});

export const streamFetcher = schemaTask({
id: "stream-fetcher",
description: "Fetch a stream",
schema: z.object({
runId: z.string().describe("The run ID to fetch the stream for"),
streamId: z.string().describe("The stream ID to fetch"),
}),
run: async ({ runId, streamId }) => {
const result = await runs.fetchStream(runId, streamId);

for await (const chunk of result) {
console.log("Chunk:", chunk);
}

return result;
},
});

export const chatStream2 = schemaTask({
id: "chat-stream-2",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
chat_id: z.string().default("chat"),
messages: z.array(z.unknown()).describe("Array of chat messages"),
message_request_id: z.string().describe("Unique identifier for the message request"),
model: z.string().default("claude-3-7-sonnet-20250219"),
userId: z.string().optional().describe("User ID for authentication"),
existingProject: z.boolean().default(false).describe("Whether the project already exists"),
}),
machine: "large-2x",
run: async ({ chat_id, messages, model, userId, message_request_id }) => {
logger.info("Running chat stream", {
chat_id,
messages,
model,
userId,
message_request_id,
});

const dataStreamResponse = createDataStream({
execute: async (dataStream) => {
streamTextWithModel(
dataStream,
anthropic(model),
messages as CoreMessage[],
chat_id,
openai("gpt-4"),
userId
);
},
});

const stream = await metadata.stream("dataStream", dataStreamResponse);

for await (const chunk of stream) {
console.log("Chunk:", chunk);
}
},
});

function streamTextWithModel(
dataStream: DataStreamWriter,
model: ReturnType<typeof anthropic> | ReturnType<typeof openai>,
messages: CoreMessage[],
chat_id: string,
fallbackModel?: ReturnType<typeof anthropic> | ReturnType<typeof openai>,
userId?: string
) {
const result = streamText({
model,
system: "This is the system prompt, please be nice.",
messages,
maxSteps: 20,
toolCallStreaming: true,
onError: (error) => {
logger.error("Error in chatStream task (streamText)", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
provider: model.provider,
});

if (fallbackModel) {
streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId);
}
},
onChunk: async (chunk) => {
Comment on lines +540 to +551
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fallback recursion can leak streams & duplicate output

streamTextWithModel recursively calls itself on onError, but it doesn’t cancel/abort the failing stream or gate against multiple merges into the same DataStreamWriter.
This can lead to:

  1. Two active model streams writing concurrently to the same dataStream.
  2. Memory/file-descriptor leakage if the first stream keeps running.
  3. Duplicate chunks in the consumer.

Introduce an AbortController (or equivalent) to cancel the first stream before invoking the fallback, and ensure you only merge once:

const controller = new AbortController();
...
onError: (err) => {
  controller.abort();          // cancel first stream
  if (fallbackModel) {
    streamTextWithModel(dataStream, fallbackModel, messages, chat_id, undefined, userId);
  }
},
...
const result = streamText({ ..., abortSignal: controller.signal, ... });

This guarantees a clean hand-over to the fallback provider.

🤖 Prompt for AI Agents
In references/d3-chat/src/trigger/chat.ts around lines 540 to 551, the onError
handler calls streamTextWithModel recursively without aborting the current
stream, causing potential concurrent writes, resource leaks, and duplicate
output. Fix this by creating an AbortController before starting the stream,
passing its signal to the streamText call, and calling controller.abort() inside
onError before invoking the fallbackModel streamTextWithModel call. Also, ensure
that fallback merging happens only once to prevent multiple concurrent streams
writing to the same dataStream.

console.log("Chunk:", chunk);
},
onFinish: async ({ response, reasoning }) => {
metadata.flush();
logger.info("AI stream finished", {
chat_id,
userId,
messageCount: response.messages.length,
provider: model.provider,
});

if (userId) {
try {
// Pretend to save messages
await new Promise((resolve) => setTimeout(resolve, 1000));

logger.info("Successfully saved AI response messages", {
chat_id,
userId,
messageCount: response.messages.length,
message: JSON.stringify(response.messages, null, 2),
provider: model.provider,
});
} catch (error) {
logger.error("Failed to save AI response messages", {
error: error instanceof Error ? error.message : "Unknown error",
stack: error instanceof Error ? error.stack : undefined,
chat_id,
userId,
provider: model.provider,
});
}
}
},
});

result.consumeStream();

result.mergeIntoDataStream(dataStream, {
sendReasoning: true,
});
}

export const chatStreamCaller2 = schemaTask({
id: "chat-stream-caller-2",
description: "Call the chat stream",
schema: z.object({
prompt: z.string().describe("The prompt to chat with the AI"),
}),
run: async ({ prompt }, { ctx }) => {
const result = await chatStream2.trigger({
messages: [
{
role: "user",
content: prompt,
},
],
message_request_id: ctx.run.id,
});

const stream = await runs.fetchStream(result.id, "dataStream");

for await (const chunk of stream) {
console.log("Chunk:", chunk);
}

return result;
},
});