Skip to content

Commit 80c96c1

Browse files
committed
Add an example for streaming with an async generator function
1 parent f55f458 commit 80c96c1

File tree

2 files changed

+85
-20
lines changed

2 files changed

+85
-20
lines changed

examples/react/start-typed-readable-stream/src/routes/index.tsx

Lines changed: 76 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import { createServerFn } from '@tanstack/react-start'
33
import { useCallback, useState } from 'react'
44
import { z } from 'zod'
55

6-
// This schema will be used to define the type
7-
// of each chunk in the `ReadableStream`.
8-
// (It mimics OpenAi's streaming response format.)
6+
/**
7+
This schema will be used to define the type
8+
of each chunk in the `ReadableStream`.
9+
(It mimics OpenAI's streaming response format.)
10+
*/
911
const textPartSchema = z.object({
1012
choices: z.array(
1113
z.object({
@@ -20,13 +22,11 @@ const textPartSchema = z.object({
2022

2123
export type TextPart = z.infer<typeof textPartSchema>
2224

23-
function sleep(ms: number) {
24-
return new Promise((resolve) => setTimeout(resolve, ms))
25-
}
26-
27-
const streamingResponseFn = createServerFn({
28-
method: 'GET',
29-
}).handler(async () => {
25+
/**
26+
This helper function generates the array of messages
27+
that we'll stream to the client.
28+
*/
29+
function generateMessages() {
3030
const messages = Array.from({ length: 10 }, () =>
3131
Math.floor(Math.random() * 100),
3232
).map((n, i) =>
@@ -40,31 +40,71 @@ const streamingResponseFn = createServerFn({
4040
],
4141
}),
4242
)
43+
return messages
44+
}
45+
46+
/**
47+
This helper function is used to simulate the
48+
delay between each message being sent.
49+
*/
50+
function sleep(ms: number) {
51+
return new Promise((resolve) => setTimeout(resolve, ms))
52+
}
4353

44-
// This `ReadableStream` is typed, so each chunk
54+
/**
55+
This server function returns a `ReadableStream`
56+
that streams `TextPart` chunks to the client.
57+
*/
58+
const streamingResponseFn = createServerFn({
59+
method: 'GET',
60+
}).handler(async () => {
61+
const messages = generateMessages()
62+
// This `ReadableStream` is typed, so each
4563
// will be of type `TextPart`.
4664
const stream = new ReadableStream<TextPart>({
4765
async start(controller) {
4866
for (const message of messages) {
67+
// simulate network latency
4968
await sleep(500)
5069
controller.enqueue(message)
5170
}
52-
sleep(500)
5371
controller.close()
5472
},
5573
})
5674

5775
return stream
5876
})
5977

78+
/**
79+
You can also use an async generator function to stream
80+
typed chunks to the client.
81+
*/
82+
const streamingWithAnAsyncGeneratorFn = createServerFn().handler(
83+
async function* () {
84+
const messages = generateMessages()
85+
for (const msg of messages) {
86+
// Notice how we defined the type of the streamed chunks
87+
// in the generic passed down the Promise constructor
88+
yield new Promise<TextPart>(async (r) => {
89+
// simulate network latency
90+
await sleep(500)
91+
return r(msg)
92+
})
93+
}
94+
},
95+
)
96+
6097
export const Route = createFileRoute('/')({
6198
component: RouteComponent,
6299
})
63100

64101
function RouteComponent() {
65-
const [message, setMessage] = useState('')
102+
const [readableStreamMessages, setReadableStreamMessages] = useState('')
66103

67-
const getStreamingResponse = useCallback(async () => {
104+
const [asyncGeneratorFuncMessages, setAsyncGeneratorFuncMessages] =
105+
useState('')
106+
107+
const getTypedReadableStreamResponse = useCallback(async () => {
68108
const response = await streamingResponseFn()
69109

70110
if (!response) {
@@ -73,7 +113,7 @@ function RouteComponent() {
73113

74114
const reader = response.getReader()
75115
let done = false
76-
setMessage('')
116+
setReadableStreamMessages('')
77117
while (!done) {
78118
const { value, done: doneReading } = await reader.read()
79119
done = doneReading
@@ -82,19 +122,35 @@ function RouteComponent() {
82122
// here, because it's coming from the typed `ReadableStream`
83123
const chunk = value?.choices[0].delta.content
84124
if (chunk) {
85-
setMessage((prev) => prev + chunk)
125+
setReadableStreamMessages((prev) => prev + chunk)
86126
}
87127
}
88128
}
89129
}, [])
90130

131+
const getResponseFromTheAsyncGenerator = useCallback(async () => {
132+
setAsyncGeneratorFuncMessages('')
133+
for await (const m of await streamingWithAnAsyncGeneratorFn()) {
134+
const chunk = m?.choices[0].delta.content
135+
if (chunk) {
136+
setAsyncGeneratorFuncMessages((prev) => prev + chunk)
137+
}
138+
}
139+
}, [])
140+
91141
return (
92142
<main>
93143
<h1>Typed Readable Stream</h1>
94-
<button onClick={() => getStreamingResponse()}>
95-
Get 10 random numbers
96-
</button>
97-
<pre>{message}</pre>
144+
<div id="streamed-results">
145+
<button onClick={() => getTypedReadableStreamResponse()}>
146+
Get 10 random numbers
147+
</button>
148+
<button onClick={() => getResponseFromTheAsyncGenerator()}>
149+
Get messages
150+
</button>
151+
<pre>{readableStreamMessages}</pre>
152+
<pre>{asyncGeneratorFuncMessages}</pre>
153+
</div>
98154
</main>
99155
)
100156
}

examples/react/start-typed-readable-stream/src/styles/app.css

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,12 @@ main {
1313
padding: 1em;
1414
margin: 0 auto;
1515
}
16+
17+
#streamed-results {
18+
display: grid;
19+
grid-template-columns: 1fr 1fr;
20+
}
21+
22+
#streamed-results>button {
23+
margin: auto;
24+
}

0 commit comments

Comments
 (0)