Skip to content

Commit f34a538

Browse files
committed
refactor: import readable stream
1 parent bfe4773 commit f34a538

File tree

3 files changed

+16
-56
lines changed

3 files changed

+16
-56
lines changed

templates/components/multiagent/typescript/express/chat.controller.ts

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import {
55
retrieveMessageContent,
66
} from "./llamaindex/streaming/annotations";
77
import { createWorkflow } from "./workflow/factory";
8-
import {
9-
createStreamFromWorkflowContext,
10-
streamToAsyncIterable,
11-
} from "./workflow/stream";
8+
import { createStreamFromWorkflowContext } from "./workflow/stream";
129

1310
export const chat = async (req: Request, res: Response) => {
1411
try {
@@ -30,13 +27,10 @@ export const chat = async (req: Request, res: Response) => {
3027

3128
const { stream, dataStream } =
3229
await createStreamFromWorkflowContext(context);
33-
const streamIterable = streamToAsyncIterable(stream);
34-
const streamResponse = LlamaIndexAdapter.toDataStreamResponse(
35-
streamIterable,
36-
{
37-
data: dataStream,
38-
},
39-
);
30+
31+
const streamResponse = LlamaIndexAdapter.toDataStreamResponse(stream, {
32+
data: dataStream,
33+
});
4034
if (streamResponse.body) {
4135
const reader = streamResponse.body.getReader();
4236
while (true) {

templates/components/multiagent/typescript/nextjs/route.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ import {
88
retrieveMessageContent,
99
} from "./llamaindex/streaming/annotations";
1010
import { createWorkflow } from "./workflow/factory";
11-
import {
12-
createStreamFromWorkflowContext,
13-
streamToAsyncIterable,
14-
} from "./workflow/stream";
11+
import { createStreamFromWorkflowContext } from "./workflow/stream";
1512

1613
initObservability();
1714
initSettings();
@@ -44,8 +41,7 @@ export async function POST(request: NextRequest) {
4441
});
4542
const { stream, dataStream } =
4643
await createStreamFromWorkflowContext(context);
47-
const streamIterable = streamToAsyncIterable(stream);
48-
return LlamaIndexAdapter.toDataStreamResponse(streamIterable, {
44+
return LlamaIndexAdapter.toDataStreamResponse(stream, {
4945
data: dataStream,
5046
});
5147
} catch (error) {

templates/components/multiagent/typescript/workflow/stream.ts

Lines changed: 9 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,26 @@ import {
33
WorkflowContext,
44
WorkflowEvent,
55
} from "@llamaindex/workflow";
6-
import {
7-
StreamData,
8-
createStreamDataTransformer,
9-
parseDataStreamPart,
10-
} from "ai";
6+
import { StreamData } from "ai";
117
import { ChatResponseChunk, EngineResponse } from "llamaindex";
8+
import { ReadableStream } from "stream/web";
129
import { AgentRunEvent } from "./type";
1310

1411
export async function createStreamFromWorkflowContext<Input, Output, Context>(
1512
context: WorkflowContext<Input, Output, Context>,
16-
): Promise<{ stream: ReadableStream<string>; dataStream: StreamData }> {
13+
): Promise<{ stream: ReadableStream<EngineResponse>; dataStream: StreamData }> {
1714
const dataStream = new StreamData();
18-
const encoder = new TextEncoder();
1915
let generator: AsyncGenerator<ChatResponseChunk> | undefined;
2016

2117
const closeStreams = (controller: ReadableStreamDefaultController) => {
2218
controller.close();
2319
dataStream.close();
2420
};
2521

26-
const mainStream = new ReadableStream({
22+
const stream = new ReadableStream<EngineResponse>({
2723
async start(controller) {
2824
// Kickstart the stream by sending an empty string
29-
controller.enqueue(encoder.encode(""));
25+
controller.enqueue({ delta: "" } as EngineResponse);
3026
},
3127
async pull(controller) {
3228
while (!generator) {
@@ -45,18 +41,14 @@ export async function createStreamFromWorkflowContext<Input, Output, Context>(
4541
closeStreams(controller);
4642
return;
4743
}
48-
if (chunk.delta) {
49-
controller.enqueue(encoder.encode(chunk.delta));
44+
const delta = chunk.delta ?? "";
45+
if (delta) {
46+
controller.enqueue({ delta } as EngineResponse);
5047
}
5148
},
5249
});
5350

54-
return {
55-
stream: mainStream
56-
.pipeThrough(createStreamDataTransformer())
57-
.pipeThrough(new TextDecoderStream()),
58-
dataStream,
59-
};
51+
return { stream, dataStream };
6052
}
6153

6254
function handleEvent(
@@ -75,25 +67,3 @@ function handleEvent(
7567
});
7668
}
7769
}
78-
79-
export function streamToAsyncIterable(stream: ReadableStream<string>) {
80-
const streamIterable: AsyncIterable<EngineResponse> = {
81-
[Symbol.asyncIterator]() {
82-
const reader = stream.getReader();
83-
return {
84-
async next() {
85-
const { done, value } = await reader.read();
86-
if (done) {
87-
return { done: true, value: undefined };
88-
}
89-
const delta = parseDataStreamPart(value)?.value.toString() || "";
90-
return {
91-
done: false,
92-
value: { delta } as unknown as EngineResponse,
93-
};
94-
},
95-
};
96-
},
97-
};
98-
return streamIterable;
99-
}

0 commit comments

Comments
 (0)