Skip to content

Commit

Permalink
GraphQL SSE Distinct Connections support
Browse files Browse the repository at this point in the history
  • Loading branch information
ardatan committed Mar 20, 2023
1 parent fb94474 commit 8741125
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 104 deletions.
5 changes: 5 additions & 0 deletions .changeset/tricky-teachers-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'graphql-yoga': minor
---

GraphQL SSE Distinct Connections mode support with `sse.graphqlSSEDistinctConnections` flag
4 changes: 4 additions & 0 deletions examples/fastify/__integration-tests__/fastify.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ describe('fastify example integration', () => {
data: {"data":{"countdown":0}}
event: complete
"
`)
})
Expand Down Expand Up @@ -202,6 +204,8 @@ describe('fastify example integration', () => {
data: {"data":{"countdown":0}}
event: complete
"
`)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe('graphql-auth example integration', () => {
for await (const chunk of response.body!) {
const chunkString = Buffer.from(chunk).toString('utf-8')
if (chunkString.includes('data:')) {
expect(chunkString.trim()).toBe('data: {"data":{"public":"hi"}}')
expect(chunkString.trim()).toContain('data: {"data":{"public":"hi"}}')
break
}
}
Expand All @@ -91,7 +91,7 @@ describe('graphql-auth example integration', () => {
for await (const chunk of response.body!) {
const chunkStr = Buffer.from(chunk).toString('utf-8')
if (chunkStr.startsWith('data:')) {
expect(chunkStr.trim()).toBe(
expect(chunkStr.trim()).toContain(
'data: {"data":{"requiresAuth":"hi foo@foo.com"}}',
)
break
Expand All @@ -112,7 +112,7 @@ describe('graphql-auth example integration', () => {
for await (const chunk of response.body!) {
const chunkStr = Buffer.from(chunk).toString('utf-8')
if (chunkStr.startsWith('data:')) {
expect(chunkStr.trim()).toBe(
expect(chunkStr.trim()).toContain(
'data: {"data":null,"errors":[{"message":"Accessing \'Subscription.requiresAuth\' requires authentication.","locations":[{"line":1,"column":14}]}]}',
)
break
Expand Down
2 changes: 2 additions & 0 deletions examples/node-ts/__integration-tests__/node-ts.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ describe('node-ts example integration', () => {
expect(await response.text()).toMatchInlineSnapshot(`
"data: {"errors":[{"message":"Subscriptions have been disabled"}]}
event: complete
"
`)
})
Expand Down
2 changes: 2 additions & 0 deletions examples/pothos/__integration-tests__/pothos.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ describe('pothos example integration', () => {
data: {"data":{"greetings":"Zdravo"}}
event: complete
"
`)
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { ExecutionResult } from 'graphql'
import { createClient } from 'graphql-sse'
import { createSchema, createYoga } from '../src'

describe('GraphQL SSE Client compatibility', () => {
describe('Distinct Connections', () => {
const yoga = createYoga({
legacySse: false,
schema: createSchema({
typeDefs: /* GraphQL */ `
type Query {
hello: String
}
type Subscription {
greetings: String
}
`,
resolvers: {
Query: {
hello: () => 'world',
},
Subscription: {
greetings: {
async *subscribe() {
yield { greetings: 'Hi' }
await new Promise((resolve) => setTimeout(resolve, 300))
yield { greetings: 'Bonjour' }
await new Promise((resolve) => setTimeout(resolve, 300))
yield { greetings: 'Hola' }
await new Promise((resolve) => setTimeout(resolve, 300))
yield { greetings: 'Ciao' }
await new Promise((resolve) => setTimeout(resolve, 300))
yield { greetings: 'Hallo' }
},
},
},
},
}),
})
const client = createClient({
url: 'http://localhost:4000/graphql',
fetchFn: yoga.fetch,
abortControllerImpl: yoga.fetchAPI.AbortController,
retryAttempts: 0,
})
let unsubscribe: () => void
afterAll(() => {
unsubscribe?.()
client.dispose()
})
it('handle queries', async () => {
const result = await new Promise((resolve, reject) => {
let result: ExecutionResult<Record<string, unknown>, unknown>
unsubscribe = client.subscribe(
{
query: '{ hello }',
},
{
next: (data) => (result = data),
error: reject,
complete: () => resolve(result),
},
)
})

expect(result).toEqual({ data: { hello: 'world' } })
})
it('handle subscriptions', async () => {
const onNext = jest.fn()

await new Promise<void>((resolve, reject) => {
unsubscribe = client.subscribe(
{
query: 'subscription { greetings }',
},
{
next: onNext,
error: reject,
complete: resolve,
},
)
})

expect(onNext).toBeCalledTimes(5) // we say "Hi" in 5 languages
})
})
})
25 changes: 14 additions & 11 deletions packages/graphql-yoga/__tests__/subscriptions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,24 @@ describe('Subscription', () => {
}

expect(results).toMatchInlineSnapshot(`
[
":
[
":
",
":
",
":
",
":
",
":
",
"data: {"data":{"hi":"hi"}}
",
"data: {"data":{"hi":"hi"}}
",
"event: complete
",
]
`)
",
]
`)
})

test('should issue pings event if event source never publishes anything', async () => {
Expand Down
1 change: 1 addition & 0 deletions packages/graphql-yoga/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"graphql": "^16.0.1",
"graphql-http": "^1.7.2",
"graphql-scalars": "1.20.4",
"graphql-sse": "2.0.0",
"html-minifier-terser": "7.1.0",
"json-bigint-patch": "0.0.8",
"puppeteer": "19.6.0"
Expand Down
78 changes: 0 additions & 78 deletions packages/graphql-yoga/src/plugins/resultProcessor/push.ts

This file was deleted.

86 changes: 86 additions & 0 deletions packages/graphql-yoga/src/plugins/resultProcessor/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { ExecutionResult } from 'graphql'
import { isAsyncIterable } from '@envelop/core'

import { getResponseInitByRespectingErrors } from '../../error.js'
import { FetchAPI, MaybeArray } from '../../types.js'
import { ResultProcessor, ResultProcessorInput } from '../types.js'
import { jsonStringifyResultWithoutInternals } from './stringify.js'

export interface SSEProcessorOptions {
legacySSE: boolean
}

export function getSSEProcessor(opts: SSEProcessorOptions): ResultProcessor {
return function processSSEResult(
result: ResultProcessorInput,
fetchAPI: FetchAPI,
): Response {
let pingIntervalMs = 12_000

// for testing the pings, reduce the timeout
if (globalThis.process?.env?.NODE_ENV === 'test') {
pingIntervalMs = 300
}

const headersInit = {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
'Content-Encoding': 'none',
}

const responseInit = getResponseInitByRespectingErrors(result, headersInit)

let iterator: AsyncIterator<MaybeArray<ExecutionResult>>

let pingInterval: number
const textEncoder = new fetchAPI.TextEncoder()
const readableStream = new fetchAPI.ReadableStream({
start(controller) {
// ping client every 12 seconds to keep the connection alive
pingInterval = setInterval(() => {
if (!controller.desiredSize) {
clearInterval(pingInterval)
return
}
controller.enqueue(textEncoder.encode(':\n\n'))
}, pingIntervalMs) as unknown as number

if (isAsyncIterable(result)) {
iterator = result[Symbol.asyncIterator]()
} else {
let finished = false
iterator = {
next: () => {
if (finished) {
return Promise.resolve({ done: true, value: null })
}
finished = true
return Promise.resolve({ done: false, value: result })
},
}
}
},
async pull(controller) {
const { done, value } = await iterator.next()
if (value != null) {
if (!opts.legacySSE) {
controller.enqueue(textEncoder.encode(`event: next\n`))
}
const chunk = jsonStringifyResultWithoutInternals(value)
controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`))
}
if (done) {
controller.enqueue(textEncoder.encode(`event: complete\n\n`))
clearInterval(pingInterval)
controller.close()
}
},
async cancel(e) {
clearInterval(pingInterval)
await iterator.return?.(e)
},
})
return new fetchAPI.Response(readableStream, responseInit)
}
}
Loading

0 comments on commit 8741125

Please sign in to comment.