Skip to content

Commit 096368a

Browse files
committed
GraphQL SSE Distinct Connections support
1 parent fb94474 commit 096368a

File tree

15 files changed

+284
-104
lines changed

15 files changed

+284
-104
lines changed

.changeset/tricky-teachers-sin.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'graphql-yoga': minor
3+
---
4+
5+
GraphQL SSE Distinct Connections mode support with `sse.graphqlSSEDistinctConnections` flag

examples/fastify/__integration-tests__/fastify.spec.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ describe('fastify example integration', () => {
161161
162162
data: {"data":{"countdown":0}}
163163
164+
event: complete
165+
164166
"
165167
`)
166168
})
@@ -202,6 +204,8 @@ describe('fastify example integration', () => {
202204
203205
data: {"data":{"countdown":0}}
204206
207+
event: complete
208+
205209
"
206210
`)
207211
})

examples/generic-auth/__integration-tests__/generic-auth.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ describe('graphql-auth example integration', () => {
7070
for await (const chunk of response.body!) {
7171
const chunkString = Buffer.from(chunk).toString('utf-8')
7272
if (chunkString.includes('data:')) {
73-
expect(chunkString.trim()).toBe('data: {"data":{"public":"hi"}}')
73+
expect(chunkString.trim()).toContain('data: {"data":{"public":"hi"}}')
7474
break
7575
}
7676
}
@@ -91,7 +91,7 @@ describe('graphql-auth example integration', () => {
9191
for await (const chunk of response.body!) {
9292
const chunkStr = Buffer.from(chunk).toString('utf-8')
9393
if (chunkStr.startsWith('data:')) {
94-
expect(chunkStr.trim()).toBe(
94+
expect(chunkStr.trim()).toContain(
9595
'data: {"data":{"requiresAuth":"hi foo@foo.com"}}',
9696
)
9797
break
@@ -112,7 +112,7 @@ describe('graphql-auth example integration', () => {
112112
for await (const chunk of response.body!) {
113113
const chunkStr = Buffer.from(chunk).toString('utf-8')
114114
if (chunkStr.startsWith('data:')) {
115-
expect(chunkStr.trim()).toBe(
115+
expect(chunkStr.trim()).toContain(
116116
'data: {"data":null,"errors":[{"message":"Accessing \'Subscription.requiresAuth\' requires authentication.","locations":[{"line":1,"column":14}]}]}',
117117
)
118118
break

examples/node-ts/__integration-tests__/node-ts.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ describe('node-ts example integration', () => {
2424
expect(await response.text()).toMatchInlineSnapshot(`
2525
"data: {"errors":[{"message":"Subscriptions have been disabled"}]}
2626
27+
event: complete
28+
2729
"
2830
`)
2931
})

examples/pothos/__integration-tests__/pothos.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ describe('pothos example integration', () => {
3232
3333
data: {"data":{"greetings":"Zdravo"}}
3434
35+
event: complete
36+
3537
"
3638
`)
3739
})
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { ExecutionResult } from 'graphql'
2+
import { createClient } from 'graphql-sse'
3+
import { createSchema, createYoga } from '../src'
4+
5+
describe('GraphQL SSE Client compatibility', () => {
6+
describe('Distinct Connections', () => {
7+
const yoga = createYoga({
8+
sse: {
9+
graphqlSSEDistinctConnections: true,
10+
},
11+
schema: createSchema({
12+
typeDefs: /* GraphQL */ `
13+
type Query {
14+
hello: String
15+
}
16+
type Subscription {
17+
greetings: String
18+
}
19+
`,
20+
resolvers: {
21+
Query: {
22+
hello: () => 'world',
23+
},
24+
Subscription: {
25+
greetings: {
26+
async *subscribe() {
27+
yield { greetings: 'Hi' }
28+
await new Promise((resolve) => setTimeout(resolve, 300))
29+
yield { greetings: 'Bonjour' }
30+
await new Promise((resolve) => setTimeout(resolve, 300))
31+
yield { greetings: 'Hola' }
32+
await new Promise((resolve) => setTimeout(resolve, 300))
33+
yield { greetings: 'Ciao' }
34+
await new Promise((resolve) => setTimeout(resolve, 300))
35+
yield { greetings: 'Hallo' }
36+
},
37+
},
38+
},
39+
},
40+
}),
41+
})
42+
const client = createClient({
43+
url: 'http://localhost:4000/graphql',
44+
fetchFn: yoga.fetch,
45+
abortControllerImpl: yoga.fetchAPI.AbortController,
46+
retryAttempts: 0,
47+
})
48+
let unsubscribe: () => void
49+
afterAll(() => {
50+
unsubscribe?.()
51+
client.dispose()
52+
})
53+
it('handle queries', async () => {
54+
const result = await new Promise((resolve, reject) => {
55+
let result: ExecutionResult<Record<string, unknown>, unknown>
56+
unsubscribe = client.subscribe(
57+
{
58+
query: '{ hello }',
59+
},
60+
{
61+
next: (data) => (result = data),
62+
error: reject,
63+
complete: () => resolve(result),
64+
},
65+
)
66+
})
67+
68+
expect(result).toEqual({ data: { hello: 'world' } })
69+
})
70+
it('handle subscriptions', async () => {
71+
const onNext = jest.fn()
72+
73+
await new Promise<void>((resolve, reject) => {
74+
unsubscribe = client.subscribe(
75+
{
76+
query: 'subscription { greetings }',
77+
},
78+
{
79+
next: onNext,
80+
error: reject,
81+
complete: resolve,
82+
},
83+
)
84+
})
85+
86+
expect(onNext).toBeCalledTimes(5) // we say "Hi" in 5 languages
87+
})
88+
})
89+
})

packages/graphql-yoga/__tests__/subscriptions.spec.ts

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -143,21 +143,24 @@ describe('Subscription', () => {
143143
}
144144

145145
expect(results).toMatchInlineSnapshot(`
146-
[
147-
":
146+
[
147+
":
148+
149+
",
150+
":
148151
149-
",
150-
":
152+
",
153+
":
151154
152-
",
153-
":
155+
",
156+
"data: {"data":{"hi":"hi"}}
154157
155-
",
156-
"data: {"data":{"hi":"hi"}}
158+
",
159+
"event: complete
157160
158-
",
159-
]
160-
`)
161+
",
162+
]
163+
`)
161164
})
162165

163166
test('should issue pings event if event source never publishes anything', async () => {

packages/graphql-yoga/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
"graphql": "^16.0.1",
7474
"graphql-http": "^1.7.2",
7575
"graphql-scalars": "1.20.4",
76+
"graphql-sse": "2.0.0",
7677
"html-minifier-terser": "7.1.0",
7778
"json-bigint-patch": "0.0.8",
7879
"puppeteer": "19.6.0"

packages/graphql-yoga/src/plugins/resultProcessor/push.ts

Lines changed: 0 additions & 78 deletions
This file was deleted.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { ExecutionResult } from 'graphql'
2+
import { isAsyncIterable } from '@envelop/core'
3+
4+
import { getResponseInitByRespectingErrors } from '../../error.js'
5+
import { FetchAPI, MaybeArray } from '../../types.js'
6+
import { ResultProcessor, ResultProcessorInput } from '../types.js'
7+
import { jsonStringifyResultWithoutInternals } from './stringify.js'
8+
9+
export interface SSEProcessorOptions {
10+
legacySSE: boolean
11+
}
12+
13+
export function getSSEProcessor(opts: SSEProcessorOptions): ResultProcessor {
14+
return function processSSEResult(
15+
result: ResultProcessorInput,
16+
fetchAPI: FetchAPI,
17+
): Response {
18+
let pingIntervalMs = 12_000
19+
20+
// for testing the pings, reduce the timeout
21+
if (globalThis.process?.env?.NODE_ENV === 'test') {
22+
pingIntervalMs = 300
23+
}
24+
25+
const headersInit = {
26+
'Content-Type': 'text/event-stream',
27+
Connection: 'keep-alive',
28+
'Cache-Control': 'no-cache',
29+
'Content-Encoding': 'none',
30+
}
31+
32+
const responseInit = getResponseInitByRespectingErrors(result, headersInit)
33+
34+
let iterator: AsyncIterator<MaybeArray<ExecutionResult>>
35+
36+
let pingInterval: number
37+
const textEncoder = new fetchAPI.TextEncoder()
38+
const readableStream = new fetchAPI.ReadableStream({
39+
start(controller) {
40+
// ping client every 12 seconds to keep the connection alive
41+
pingInterval = setInterval(() => {
42+
if (!controller.desiredSize) {
43+
clearInterval(pingInterval)
44+
return
45+
}
46+
controller.enqueue(textEncoder.encode(':\n\n'))
47+
}, pingIntervalMs) as unknown as number
48+
49+
if (isAsyncIterable(result)) {
50+
iterator = result[Symbol.asyncIterator]()
51+
} else {
52+
let finished = false
53+
iterator = {
54+
next: () => {
55+
if (finished) {
56+
return Promise.resolve({ done: true, value: null })
57+
}
58+
finished = true
59+
return Promise.resolve({ done: false, value: result })
60+
},
61+
}
62+
}
63+
},
64+
async pull(controller) {
65+
const { done, value } = await iterator.next()
66+
if (value != null) {
67+
if (!opts.legacySSE) {
68+
controller.enqueue(textEncoder.encode(`event: next\n`))
69+
}
70+
const chunk = jsonStringifyResultWithoutInternals(value)
71+
controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`))
72+
}
73+
if (done) {
74+
controller.enqueue(textEncoder.encode(`event: complete\n\n`))
75+
clearInterval(pingInterval)
76+
controller.close()
77+
}
78+
},
79+
async cancel(e) {
80+
clearInterval(pingInterval)
81+
await iterator.return?.(e)
82+
},
83+
})
84+
return new fetchAPI.Response(readableStream, responseInit)
85+
}
86+
}

0 commit comments

Comments
 (0)