Skip to content

Commit 1330730

Browse files
authored
fix(server): SSE with keepalive crashes server on client disconnect (#464)
Fixes #463 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved event stream handling to ensure that no additional events or errors are processed after a stream is cancelled, providing more reliable cancellation behavior. - **Tests** - Enhanced test coverage for event stream cancellation, including scenarios where errors are thrown and ensuring cleanup logic executes correctly. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 95b67ec commit 1330730

File tree

4 files changed

+84
-4
lines changed

4 files changed

+84
-4
lines changed

packages/standard-server-fetch/src/event-iterator.test.ts

+29-2
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,10 @@ describe('toEventStream', () => {
233233

234234
async function* gen() {
235235
try {
236+
await new Promise(resolve => setTimeout(resolve, 10))
236237
yield 1
237-
yield undefined
238-
return { value: true }
238+
await new Promise(resolve => setTimeout(resolve, 10))
239+
yield 2
239240
}
240241
catch (err) {
241242
hasError = err
@@ -257,6 +258,32 @@ describe('toEventStream', () => {
257258
})
258259
})
259260

261+
it('when canceled from client without region - throw', async () => {
262+
let hasFinally = false
263+
264+
async function* gen() {
265+
try {
266+
await new Promise(resolve => setTimeout(resolve, 10))
267+
yield 1
268+
await new Promise(resolve => setTimeout(resolve, 10))
269+
throw new Error('something')
270+
}
271+
finally {
272+
hasFinally = true
273+
}
274+
}
275+
276+
const stream = toEventStream(gen(), {})
277+
278+
const reader = stream.getReader()
279+
await reader.read()
280+
await reader.cancel()
281+
282+
await vi.waitFor(() => {
283+
expect(hasFinally).toBe(true)
284+
})
285+
})
286+
260287
it('when canceled from client with region', async () => {
261288
let hasError: any
262289
let hasFinally = false

packages/standard-server-fetch/src/event-iterator.ts

+12
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ export function toEventStream(
9898
const keepAliveInterval = options.eventIteratorKeepAliveInterval ?? 5000
9999
const keepAliveComment = options.eventIteratorKeepAliveComment ?? ''
100100

101+
let cancelled = false
101102
let timeout: ReturnType<typeof setInterval> | undefined
102103

103104
const stream = new ReadableStream<string>({
@@ -115,6 +116,10 @@ export function toEventStream(
115116

116117
clearInterval(timeout)
117118

119+
if (cancelled) {
120+
return
121+
}
122+
118123
const meta = getEventMeta(value.value)
119124

120125
if (!value.done || value.value !== undefined || meta !== undefined) {
@@ -132,6 +137,10 @@ export function toEventStream(
132137
catch (err) {
133138
clearInterval(timeout)
134139

140+
if (cancelled) {
141+
return
142+
}
143+
135144
controller.enqueue(encodeEventMessage({
136145
...getEventMeta(err),
137146
event: 'error',
@@ -142,6 +151,9 @@ export function toEventStream(
142151
}
143152
},
144153
async cancel(reason) {
154+
cancelled = true
155+
clearInterval(timeout)
156+
145157
if (reason) {
146158
await iterator.throw?.(reason)
147159
}

packages/standard-server-node/src/event-iterator.test.ts

+31-2
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,10 @@ describe('toEventStream', () => {
233233

234234
async function* gen() {
235235
try {
236+
await new Promise(resolve => setTimeout(resolve, 100))
236237
yield 1
237-
yield undefined
238-
return { value: true }
238+
await new Promise(resolve => setTimeout(resolve, 100))
239+
yield 2
239240
}
240241
catch (err) {
241242
hasError = err
@@ -249,6 +250,7 @@ describe('toEventStream', () => {
249250

250251
const reader = Readable.toWeb(stream).getReader()
251252
await reader.read()
253+
await new Promise(resolve => setTimeout(resolve, 1))
252254
await stream.destroy() // use stream.destroy() instead of reader.cancel() to improve node compatibility
253255

254256
await vi.waitFor(() => {
@@ -257,6 +259,33 @@ describe('toEventStream', () => {
257259
})
258260
})
259261

262+
it('when canceled from client without region - throw', async () => {
263+
let hasFinally = false
264+
265+
async function* gen() {
266+
try {
267+
await new Promise(resolve => setTimeout(resolve, 10))
268+
yield 1
269+
await new Promise(resolve => setTimeout(resolve, 10))
270+
throw new Error('something')
271+
}
272+
finally {
273+
hasFinally = true
274+
}
275+
}
276+
277+
const stream = toEventStream(gen(), {})
278+
279+
const reader = Readable.toWeb(stream).getReader()
280+
await reader.read()
281+
await new Promise(resolve => setTimeout(resolve, 1))
282+
await stream.destroy() // use stream.destroy() instead of reader.cancel() to improve node compatibility
283+
284+
await vi.waitFor(() => {
285+
expect(hasFinally).toBe(true)
286+
})
287+
})
288+
260289
it('when canceled from client with region', async () => {
261290
let hasError: any
262291
let hasFinally = false

packages/standard-server-node/src/event-iterator.ts

+12
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ export function toEventStream(
9999
const keepAliveInterval = options.eventIteratorKeepAliveInterval ?? 5000
100100
const keepAliveComment = options.eventIteratorKeepAliveComment ?? ''
101101

102+
let cancelled = false
102103
let timeout: ReturnType<typeof setInterval> | undefined
103104

104105
const stream = new ReadableStream<string>({
@@ -116,6 +117,10 @@ export function toEventStream(
116117

117118
clearInterval(timeout)
118119

120+
if (cancelled) {
121+
return
122+
}
123+
119124
const meta = getEventMeta(value.value)
120125

121126
if (!value.done || value.value !== undefined || meta !== undefined) {
@@ -133,6 +138,10 @@ export function toEventStream(
133138
catch (err) {
134139
clearInterval(timeout)
135140

141+
if (cancelled) {
142+
return
143+
}
144+
136145
controller.enqueue(encodeEventMessage({
137146
...getEventMeta(err),
138147
event: 'error',
@@ -143,6 +152,9 @@ export function toEventStream(
143152
}
144153
},
145154
async cancel(reason) {
155+
cancelled = true
156+
clearInterval(timeout)
157+
146158
if (reason) {
147159
await iterator.throw?.(reason)
148160
}

0 commit comments

Comments
 (0)