Skip to content

Commit 244297c

Browse files
authored
fix(standard-server): improve event-iterator cleanup (#466)
and clean some code <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added a utility for standardizing HTTP header formatting. - **Bug Fixes** - Improved consistency in HTTP header formatting across the application. - **Refactor** - Simplified and unified header handling logic for better maintainability. - Streamlined event iterator and cancellation logic for improved reliability. - **Tests** - Enhanced and updated test coverage to reflect changes in header formatting and event iterator behavior. - Added tests for multipart form-data content type and the new header utility. - **Chores** - Updated internal utilities and imports for consistency. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent d5f6b77 commit 244297c

File tree

12 files changed

+117
-192
lines changed

12 files changed

+117
-192
lines changed

packages/server/src/plugins/cors.test.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ describe('corsPlugin', () => {
3838
expect(response.status).toBe(204)
3939
expect(response.headers.get('access-control-allow-origin')).toBe('https://example.com')
4040
expect(response.headers.get('vary')).toBe('origin')
41-
expect(response.headers.get('access-control-allow-methods')).toBe('GET,HEAD,PUT,POST,DELETE,PATCH')
41+
expect(response.headers.get('access-control-allow-methods')).toBe('GET, HEAD, PUT, POST, DELETE, PATCH')
4242
expect(response.headers.get('access-control-max-age')).toBeNull()
4343
})
4444

@@ -78,8 +78,8 @@ describe('corsPlugin', () => {
7878

7979
assertResponse(response)
8080
expect(response.headers.get('access-control-max-age')).toBe('600')
81-
expect(response.headers.get('access-control-allow-methods')).toBe('GET,HEAD,PUT,POST,DELETE,PATCH')
82-
expect(response.headers.get('access-control-allow-headers')).toBe('Content-Type,Authorization')
81+
expect(response.headers.get('access-control-allow-methods')).toBe('GET, HEAD, PUT, POST, DELETE, PATCH')
82+
expect(response.headers.get('access-control-allow-headers')).toBe('Content-Type, Authorization')
8383
})
8484

8585
it('sets allowed origin only when custom origin function approves', async () => {
@@ -167,7 +167,7 @@ describe('corsPlugin', () => {
167167
}))
168168
assertResponse(response)
169169
expect(response.headers.get('access-control-allow-credentials')).toBe('true')
170-
expect(response.headers.get('access-control-expose-headers')).toBe('X-Custom-Header,X-Another-Header')
170+
expect(response.headers.get('access-control-expose-headers')).toBe('X-Custom-Header, X-Another-Header')
171171
})
172172

173173
it('returns "*" for access-control-allow-origin when origin function returns "*"', async () => {

packages/server/src/plugins/cors.ts

+6-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { StandardHeaders } from '@orpc/standard-server'
33
import type { StandardHandlerInterceptorOptions, StandardHandlerOptions, StandardHandlerPlugin } from '../adapters/standard'
44
import type { Context } from '../context'
55
import { value } from '@orpc/shared'
6+
import { flattenHeader } from '@orpc/standard-server'
67

78
export interface CORSOptions<T extends Context> {
89
origin?: Value<string | readonly string[] | null | undefined, [origin: string, options: StandardHandlerInterceptorOptions<T>]>
@@ -48,16 +49,13 @@ export class CORSPlugin<T extends Context> implements StandardHandlerPlugin<T> {
4849
}
4950

5051
if (this.options.allowMethods?.length) {
51-
resHeaders['access-control-allow-methods'] = this.options.allowMethods.join(',')
52+
resHeaders['access-control-allow-methods'] = flattenHeader(this.options.allowMethods)
5253
}
5354

5455
const allowHeaders = this.options.allowHeaders ?? interceptorOptions.request.headers['access-control-request-headers']
5556

56-
if (Array.isArray(allowHeaders) && allowHeaders.length) {
57-
resHeaders['access-control-allow-headers'] = allowHeaders.join(',')
58-
}
59-
else if (typeof allowHeaders === 'string') {
60-
resHeaders['access-control-allow-headers'] = allowHeaders
57+
if (typeof allowHeaders === 'string' || allowHeaders?.length) {
58+
resHeaders['access-control-allow-headers'] = flattenHeader(allowHeaders)
6159
}
6260

6361
return {
@@ -80,9 +78,7 @@ export class CORSPlugin<T extends Context> implements StandardHandlerPlugin<T> {
8078
return result
8179
}
8280

83-
const origin = Array.isArray(interceptorOptions.request.headers.origin)
84-
? interceptorOptions.request.headers.origin.join(',') // the array case is never happen, but we make it for type safety
85-
: interceptorOptions.request.headers.origin || ''
81+
const origin = flattenHeader(interceptorOptions.request.headers.origin) ?? ''
8682

8783
const allowedOrigin = await value(this.options.origin, origin, interceptorOptions)
8884
const allowedOriginArr = Array.isArray(allowedOrigin) ? allowedOrigin : [allowedOrigin]
@@ -113,7 +109,7 @@ export class CORSPlugin<T extends Context> implements StandardHandlerPlugin<T> {
113109
}
114110

115111
if (this.options.exposeHeaders?.length) {
116-
result.response.headers['access-control-expose-headers'] = this.options.exposeHeaders.join(',')
112+
result.response.headers['access-control-expose-headers'] = flattenHeader(this.options.exposeHeaders)
117113
}
118114

119115
return result

packages/standard-server-fetch/src/event-iterator.test.ts

+2-38
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,7 @@ describe('toEventStream', () => {
227227
expect((await reader.read()).done).toEqual(true)
228228
})
229229

230-
it('when canceled from client without region', async () => {
231-
let hasError: any
230+
it('when canceled from client - return', async () => {
232231
let hasFinally = false
233232

234233
async function* gen() {
@@ -238,9 +237,6 @@ describe('toEventStream', () => {
238237
await new Promise(resolve => setTimeout(resolve, 10))
239238
yield 2
240239
}
241-
catch (err) {
242-
hasError = err
243-
}
244240
finally {
245241
hasFinally = true
246242
}
@@ -253,12 +249,11 @@ describe('toEventStream', () => {
253249
await reader.cancel()
254250

255251
await vi.waitFor(() => {
256-
expect(hasError).toBe(undefined)
257252
expect(hasFinally).toBe(true)
258253
})
259254
})
260255

261-
it('when canceled from client without region - throw', async () => {
256+
it('when canceled from client - throw', async () => {
262257
let hasFinally = false
263258

264259
async function* gen() {
@@ -284,37 +279,6 @@ describe('toEventStream', () => {
284279
})
285280
})
286281

287-
it('when canceled from client with region', async () => {
288-
let hasError: any
289-
let hasFinally = false
290-
291-
async function* gen() {
292-
try {
293-
yield 1
294-
yield undefined
295-
return { value: true }
296-
}
297-
catch (err) {
298-
hasError = err
299-
}
300-
finally {
301-
hasFinally = true
302-
}
303-
}
304-
305-
const stream = toEventStream(gen(), {})
306-
307-
const reason = new Error('reason')
308-
const reader = stream.getReader()
309-
await reader.read()
310-
await reader.cancel(reason)
311-
312-
await vi.waitFor(() => {
313-
expect(hasError).toBe(reason)
314-
expect(hasFinally).toBe(true)
315-
})
316-
})
317-
318282
it('keep alive', { retry: 5 }, async () => {
319283
async function* gen() {
320284
while (true) {

packages/standard-server-fetch/src/event-iterator.ts

+33-44
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isTypescriptObject, parseEmptyableJSON, stringifyJSON } from '@orpc/shared'
1+
import { createAsyncIteratorObject, isTypescriptObject, parseEmptyableJSON, stringifyJSON } from '@orpc/shared'
22
import {
33
encodeEventMessage,
44
ErrorEvent,
@@ -9,62 +9,56 @@ import {
99

1010
export function toEventIterator(
1111
stream: ReadableStream<Uint8Array>,
12-
): AsyncGenerator<unknown | void, unknown | void, void> {
12+
): AsyncIteratorObject<unknown | void, unknown | void, void> & AsyncGenerator<unknown | void, unknown | void, void> {
1313
const eventStream = stream
1414
.pipeThrough(new TextDecoderStream())
1515
.pipeThrough(new EventDecoderStream())
1616

1717
const reader = eventStream.getReader()
1818

19-
async function* gen() {
20-
try {
21-
while (true) {
22-
const { done, value } = await reader.read()
19+
return createAsyncIteratorObject(async () => {
20+
while (true) {
21+
const { done, value } = await reader.read()
2322

24-
if (done) {
25-
return
26-
}
27-
28-
switch (value.event) {
29-
case 'message': {
30-
let message = parseEmptyableJSON(value.data)
23+
if (done) {
24+
return { done: true, value: undefined }
25+
}
3126

32-
if (isTypescriptObject(message)) {
33-
message = withEventMeta(message, value)
34-
}
27+
switch (value.event) {
28+
case 'message': {
29+
let message = parseEmptyableJSON(value.data)
3530

36-
yield message
37-
break
31+
if (isTypescriptObject(message)) {
32+
message = withEventMeta(message, value)
3833
}
3934

40-
case 'error': {
41-
let error = new ErrorEvent({
42-
data: parseEmptyableJSON(value.data),
43-
})
35+
return { done: false, value: message }
36+
}
4437

45-
error = withEventMeta(error, value)
38+
case 'error': {
39+
let error = new ErrorEvent({
40+
data: parseEmptyableJSON(value.data),
41+
})
4642

47-
throw error
48-
}
43+
error = withEventMeta(error, value)
4944

50-
case 'done': {
51-
let done = parseEmptyableJSON(value.data)
45+
throw error
46+
}
5247

53-
if (isTypescriptObject(done)) {
54-
done = withEventMeta(done, value)
55-
}
48+
case 'done': {
49+
let done = parseEmptyableJSON(value.data)
5650

57-
return done
51+
if (isTypescriptObject(done)) {
52+
done = withEventMeta(done, value)
5853
}
54+
55+
return { done: true, value: done }
5956
}
6057
}
6158
}
62-
finally {
63-
await reader.cancel()
64-
}
65-
}
66-
67-
return gen()
59+
}, async () => {
60+
await reader.cancel()
61+
})
6862
}
6963

7064
export interface ToEventStreamOptions {
@@ -150,16 +144,11 @@ export function toEventStream(
150144
controller.close()
151145
}
152146
},
153-
async cancel(reason) {
147+
async cancel() {
154148
cancelled = true
155149
clearInterval(timeout)
156150

157-
if (reason) {
158-
await iterator.throw?.(reason)
159-
}
160-
else {
161-
await iterator.return?.()
162-
}
151+
await iterator.return?.()
163152
},
164153
}).pipeThrough(new TextEncoderStream())
165154

packages/standard-server-node/src/body.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import type { Buffer } from 'node:buffer'
33
import type { ToEventStreamOptions } from './event-iterator'
44
import type { NodeHttpRequest } from './types'
55
import { Readable } from 'node:stream'
6-
import { isAsyncIteratorObject, parseEmptyableJSON, stringifyJSON, toArray } from '@orpc/shared'
7-
import { generateContentDisposition, getFilenameFromContentDisposition } from '@orpc/standard-server'
6+
import { isAsyncIteratorObject, parseEmptyableJSON, stringifyJSON } from '@orpc/shared'
7+
import { flattenHeader, generateContentDisposition, getFilenameFromContentDisposition } from '@orpc/standard-server'
88
import { toEventIterator, toEventStream } from './event-iterator'
99

1010
export async function toStandardBody(req: NodeHttpRequest): Promise<StandardBody> {
@@ -60,7 +60,7 @@ export function toNodeHttpBody(
6060
headers: StandardHeaders,
6161
options: ToNodeHttpBodyOptions = {},
6262
): Readable | undefined | string {
63-
const currentContentDisposition = toArray(headers['content-disposition'])[0]
63+
const currentContentDisposition = flattenHeader(headers['content-disposition'])
6464

6565
delete headers['content-type']
6666
delete headers['content-disposition']

packages/standard-server-node/src/event-iterator.test.ts

+2-38
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,7 @@ describe('toEventStream', () => {
227227
expect((await reader.read()).done).toEqual(true)
228228
})
229229

230-
it('when canceled from client without region', async () => {
231-
let hasError: any
230+
it('when canceled from client - yield', async () => {
232231
let hasFinally = false
233232

234233
async function* gen() {
@@ -238,9 +237,6 @@ describe('toEventStream', () => {
238237
await new Promise(resolve => setTimeout(resolve, 100))
239238
yield 2
240239
}
241-
catch (err) {
242-
hasError = err
243-
}
244240
finally {
245241
hasFinally = true
246242
}
@@ -254,12 +250,11 @@ describe('toEventStream', () => {
254250
await stream.destroy() // use stream.destroy() instead of reader.cancel() to improve node compatibility
255251

256252
await vi.waitFor(() => {
257-
expect(hasError).toBe(undefined)
258253
expect(hasFinally).toBe(true)
259254
})
260255
})
261256

262-
it('when canceled from client without region - throw', async () => {
257+
it('when canceled from client - throw', async () => {
263258
let hasFinally = false
264259

265260
async function* gen() {
@@ -286,37 +281,6 @@ describe('toEventStream', () => {
286281
})
287282
})
288283

289-
it('when canceled from client with region', async () => {
290-
let hasError: any
291-
let hasFinally = false
292-
293-
async function* gen() {
294-
try {
295-
yield 1
296-
yield undefined
297-
return { value: true }
298-
}
299-
catch (err) {
300-
hasError = err
301-
}
302-
finally {
303-
hasFinally = true
304-
}
305-
}
306-
307-
const stream = toEventStream(gen(), {})
308-
309-
const reason = new Error('reason')
310-
const reader = Readable.toWeb(stream).getReader()
311-
await reader.read()
312-
await stream.destroy(reason) // use stream.destroy() instead of reader.cancel() to improve node compatibility
313-
314-
await vi.waitFor(() => {
315-
expect(hasError).toBe(reason)
316-
expect(hasFinally).toBe(true)
317-
})
318-
})
319-
320284
it('keep alive', { retry: 5 }, async () => {
321285
async function* gen() {
322286
while (true) {

0 commit comments

Comments
 (0)