Skip to content

Commit 112610c

Browse files
committed
refactor: move streamToAsyncIterable to private
1 parent 3381dee commit 112610c

File tree

2 files changed

+38
-32
lines changed

2 files changed

+38
-32
lines changed

templates/components/multiagent/typescript/nextjs/route.ts

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { initObservability } from "@/app/observability";
2-
import { LlamaIndexAdapter, parseDataStreamPart, type Message } from "ai";
3-
import { EngineResponse } from "llamaindex";
2+
import { LlamaIndexAdapter, type Message } from "ai";
43
import { NextRequest, NextResponse } from "next/server";
54
import { initSettings } from "./engine/settings";
65
import {
@@ -42,8 +41,7 @@ export async function POST(request: NextRequest) {
4241
});
4342
const { stream, dataStream: data } =
4443
await createStreamFromWorkflowContext(context);
45-
const streamIterable = streamToAsyncIterable(stream);
46-
return LlamaIndexAdapter.toDataStreamResponse(streamIterable, { data });
44+
return LlamaIndexAdapter.toDataStreamResponse(stream, { data });
4745
} catch (error) {
4846
console.error("[LlamaIndex]", error);
4947
return NextResponse.json(
@@ -56,25 +54,3 @@ export async function POST(request: NextRequest) {
5654
);
5755
}
5856
}
59-
60-
function streamToAsyncIterable(stream: ReadableStream<string>) {
61-
const streamIterable: AsyncIterable<EngineResponse> = {
62-
[Symbol.asyncIterator]() {
63-
const reader = stream.getReader();
64-
return {
65-
async next() {
66-
const { done, value } = await reader.read();
67-
if (done) {
68-
return { done: true, value: undefined };
69-
}
70-
const delta = parseDataStreamPart(value)?.value.toString() || "";
71-
return {
72-
done: false,
73-
value: { delta } as unknown as EngineResponse,
74-
};
75-
},
76-
};
77-
},
78-
};
79-
return streamIterable;
80-
}

templates/components/multiagent/typescript/workflow/stream.ts

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ import {
33
WorkflowContext,
44
WorkflowEvent,
55
} from "@llamaindex/workflow";
6-
import { StreamData, createStreamDataTransformer } from "ai";
7-
import { ChatResponseChunk } from "llamaindex";
6+
import {
7+
StreamData,
8+
createStreamDataTransformer,
9+
parseDataStreamPart,
10+
} from "ai";
11+
import { ChatResponseChunk, EngineResponse } from "llamaindex";
812
import { AgentRunEvent } from "./type";
913

1014
export async function createStreamFromWorkflowContext<Input, Output, Context>(
1115
context: WorkflowContext<Input, Output, Context>,
12-
): Promise<{ stream: ReadableStream<string>; dataStream: StreamData }> {
16+
): Promise<{ stream: AsyncIterable<EngineResponse>; dataStream: StreamData }> {
1317
const dataStream = new StreamData();
1418
const encoder = new TextEncoder();
1519
let generator: AsyncGenerator<ChatResponseChunk> | undefined;
@@ -47,10 +51,14 @@ export async function createStreamFromWorkflowContext<Input, Output, Context>(
4751
},
4852
});
4953

54+
const stream = mainStream
55+
.pipeThrough(createStreamDataTransformer())
56+
.pipeThrough(new TextDecoderStream());
57+
58+
const streamIterable = streamToAsyncIterable(stream);
59+
5060
return {
51-
stream: mainStream
52-
.pipeThrough(createStreamDataTransformer())
53-
.pipeThrough(new TextDecoderStream()),
61+
stream: streamIterable,
5462
dataStream,
5563
};
5664
}
@@ -71,3 +79,25 @@ function handleEvent(
7179
});
7280
}
7381
}
82+
83+
function streamToAsyncIterable(stream: ReadableStream<string>) {
84+
const streamIterable: AsyncIterable<EngineResponse> = {
85+
[Symbol.asyncIterator]() {
86+
const reader = stream.getReader();
87+
return {
88+
async next() {
89+
const { done, value } = await reader.read();
90+
if (done) {
91+
return { done: true, value: undefined };
92+
}
93+
const delta = parseDataStreamPart(value)?.value.toString() || "";
94+
return {
95+
done: false,
96+
value: { delta } as unknown as EngineResponse,
97+
};
98+
},
99+
};
100+
},
101+
};
102+
return streamIterable;
103+
}

0 commit comments

Comments
 (0)