From 8741125e5e577a6f031980a81c380a12cb9f40f7 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Thu, 16 Feb 2023 13:23:29 +0300 Subject: [PATCH 1/9] GraphQL SSE Distinct Connections support --- .changeset/tricky-teachers-sin.md | 5 ++ .../__integration-tests__/fastify.spec.ts | 4 + .../generic-auth.spec.ts | 6 +- .../__integration-tests__/node-ts.spec.ts | 2 + .../__integration-tests__/pothos.spec.ts | 2 + .../graphql-sse-client.spec.ts | 87 +++++++++++++++++++ .../__tests__/subscriptions.spec.ts | 25 +++--- packages/graphql-yoga/package.json | 1 + .../src/plugins/resultProcessor/push.ts | 78 ----------------- .../src/plugins/resultProcessor/sse.ts | 86 ++++++++++++++++++ .../src/plugins/useResultProcessor.ts | 23 +++-- packages/graphql-yoga/src/server.ts | 13 ++- .../__tests__/apollo-inline-trace.spec.ts | 2 +- pnpm-lock.yaml | 11 +++ .../src/pages/docs/features/subscriptions.mdx | 41 ++++++++- 15 files changed, 282 insertions(+), 104 deletions(-) create mode 100644 .changeset/tricky-teachers-sin.md create mode 100644 packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts delete mode 100644 packages/graphql-yoga/src/plugins/resultProcessor/push.ts create mode 100644 packages/graphql-yoga/src/plugins/resultProcessor/sse.ts diff --git a/.changeset/tricky-teachers-sin.md b/.changeset/tricky-teachers-sin.md new file mode 100644 index 0000000000..0eb3f75d73 --- /dev/null +++ b/.changeset/tricky-teachers-sin.md @@ -0,0 +1,5 @@ +--- +'graphql-yoga': minor +--- + +GraphQL SSE Distinct Connections mode support with `sse.graphqlSSEDistinctConnections` flag diff --git a/examples/fastify/__integration-tests__/fastify.spec.ts b/examples/fastify/__integration-tests__/fastify.spec.ts index 96940ef6e9..ad6f2b75b7 100644 --- a/examples/fastify/__integration-tests__/fastify.spec.ts +++ b/examples/fastify/__integration-tests__/fastify.spec.ts @@ -161,6 +161,8 @@ describe('fastify example integration', () => { data: {"data":{"countdown":0}} + event: complete + " `) }) @@ -202,6 +204,8 @@ describe('fastify example integration', () => { data: {"data":{"countdown":0}} + event: complete + " `) }) diff --git a/examples/generic-auth/__integration-tests__/generic-auth.spec.ts b/examples/generic-auth/__integration-tests__/generic-auth.spec.ts index 005599d15d..082e6df2a4 100644 --- a/examples/generic-auth/__integration-tests__/generic-auth.spec.ts +++ b/examples/generic-auth/__integration-tests__/generic-auth.spec.ts @@ -70,7 +70,7 @@ describe('graphql-auth example integration', () => { for await (const chunk of response.body!) { const chunkString = Buffer.from(chunk).toString('utf-8') if (chunkString.includes('data:')) { - expect(chunkString.trim()).toBe('data: {"data":{"public":"hi"}}') + expect(chunkString.trim()).toContain('data: {"data":{"public":"hi"}}') break } } @@ -91,7 +91,7 @@ describe('graphql-auth example integration', () => { for await (const chunk of response.body!) { const chunkStr = Buffer.from(chunk).toString('utf-8') if (chunkStr.startsWith('data:')) { - expect(chunkStr.trim()).toBe( + expect(chunkStr.trim()).toContain( 'data: {"data":{"requiresAuth":"hi foo@foo.com"}}', ) break @@ -112,7 +112,7 @@ describe('graphql-auth example integration', () => { for await (const chunk of response.body!) { const chunkStr = Buffer.from(chunk).toString('utf-8') if (chunkStr.startsWith('data:')) { - expect(chunkStr.trim()).toBe( + expect(chunkStr.trim()).toContain( 'data: {"data":null,"errors":[{"message":"Accessing \'Subscription.requiresAuth\' requires authentication.","locations":[{"line":1,"column":14}]}]}', ) break diff --git a/examples/node-ts/__integration-tests__/node-ts.spec.ts b/examples/node-ts/__integration-tests__/node-ts.spec.ts index 0dd2f38f03..34f0b9f1de 100644 --- a/examples/node-ts/__integration-tests__/node-ts.spec.ts +++ b/examples/node-ts/__integration-tests__/node-ts.spec.ts @@ -24,6 +24,8 @@ describe('node-ts example integration', () => { expect(await response.text()).toMatchInlineSnapshot(` "data: {"errors":[{"message":"Subscriptions have been disabled"}]} + event: complete + " `) }) diff --git a/examples/pothos/__integration-tests__/pothos.spec.ts b/examples/pothos/__integration-tests__/pothos.spec.ts index e96b353fc3..ba791b539f 100644 --- a/examples/pothos/__integration-tests__/pothos.spec.ts +++ b/examples/pothos/__integration-tests__/pothos.spec.ts @@ -32,6 +32,8 @@ describe('pothos example integration', () => { data: {"data":{"greetings":"Zdravo"}} + event: complete + " `) }) diff --git a/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts b/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts new file mode 100644 index 0000000000..40af4d578a --- /dev/null +++ b/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts @@ -0,0 +1,87 @@ +import { ExecutionResult } from 'graphql' +import { createClient } from 'graphql-sse' +import { createSchema, createYoga } from '../src' + +describe('GraphQL SSE Client compatibility', () => { + describe('Distinct Connections', () => { + const yoga = createYoga({ + legacySse: false, + schema: createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String + } + type Subscription { + greetings: String + } + `, + resolvers: { + Query: { + hello: () => 'world', + }, + Subscription: { + greetings: { + async *subscribe() { + yield { greetings: 'Hi' } + await new Promise((resolve) => setTimeout(resolve, 300)) + yield { greetings: 'Bonjour' } + await new Promise((resolve) => setTimeout(resolve, 300)) + yield { greetings: 'Hola' } + await new Promise((resolve) => setTimeout(resolve, 300)) + yield { greetings: 'Ciao' } + await new Promise((resolve) => setTimeout(resolve, 300)) + yield { greetings: 'Hallo' } + }, + }, + }, + }, + }), + }) + const client = createClient({ + url: 'http://localhost:4000/graphql', + fetchFn: yoga.fetch, + abortControllerImpl: yoga.fetchAPI.AbortController, + retryAttempts: 0, + }) + let unsubscribe: () => void + afterAll(() => { + unsubscribe?.() + client.dispose() + }) + it('handle queries', async () => { + const result = await new Promise((resolve, reject) => { + let result: ExecutionResult, unknown> + unsubscribe = client.subscribe( + { + query: '{ hello }', + }, + { + next: (data) => (result = data), + error: reject, + complete: () => resolve(result), + }, + ) + }) + + expect(result).toEqual({ data: { hello: 'world' } }) + }) + it('handle subscriptions', async () => { + const onNext = jest.fn() + + await new Promise((resolve, reject) => { + unsubscribe = client.subscribe( + { + query: 'subscription { greetings }', + }, + { + next: onNext, + error: reject, + complete: resolve, + }, + ) + }) + + expect(onNext).toBeCalledTimes(5) // we say "Hi" in 5 languages + }) + }) +}) diff --git a/packages/graphql-yoga/__tests__/subscriptions.spec.ts b/packages/graphql-yoga/__tests__/subscriptions.spec.ts index 1a860a34e0..46a94343a6 100644 --- a/packages/graphql-yoga/__tests__/subscriptions.spec.ts +++ b/packages/graphql-yoga/__tests__/subscriptions.spec.ts @@ -143,21 +143,24 @@ describe('Subscription', () => { } expect(results).toMatchInlineSnapshot(` - [ - ": + [ + ": + + ", + ": - ", - ": + ", + ": - ", - ": + ", + "data: {"data":{"hi":"hi"}} - ", - "data: {"data":{"hi":"hi"}} + ", + "event: complete - ", - ] - `) + ", + ] + `) }) test('should issue pings event if event source never publishes anything', async () => { diff --git a/packages/graphql-yoga/package.json b/packages/graphql-yoga/package.json index f7c863e03e..675a550c64 100644 --- a/packages/graphql-yoga/package.json +++ b/packages/graphql-yoga/package.json @@ -73,6 +73,7 @@ "graphql": "^16.0.1", "graphql-http": "^1.7.2", "graphql-scalars": "1.20.4", + "graphql-sse": "2.0.0", "html-minifier-terser": "7.1.0", "json-bigint-patch": "0.0.8", "puppeteer": "19.6.0" diff --git a/packages/graphql-yoga/src/plugins/resultProcessor/push.ts b/packages/graphql-yoga/src/plugins/resultProcessor/push.ts deleted file mode 100644 index 5cf2a3b169..0000000000 --- a/packages/graphql-yoga/src/plugins/resultProcessor/push.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { ExecutionResult } from 'graphql' -import { isAsyncIterable } from '@envelop/core' - -import { getResponseInitByRespectingErrors } from '../../error.js' -import { FetchAPI, MaybeArray } from '../../types.js' -import { ResultProcessorInput } from '../types.js' -import { jsonStringifyResultWithoutInternals } from './stringify.js' - -export function processPushResult( - result: ResultProcessorInput, - fetchAPI: FetchAPI, -): Response { - let pingIntervalMs = 12_000 - - // for testing the pings, reduce the timeout - if (globalThis.process?.env?.NODE_ENV === 'test') { - pingIntervalMs = 5 - } - - const headersInit = { - 'Content-Type': 'text/event-stream', - Connection: 'keep-alive', - 'Cache-Control': 'no-cache', - 'Content-Encoding': 'none', - } - - const responseInit = getResponseInitByRespectingErrors(result, headersInit) - - let iterator: AsyncIterator> - - let pingInterval: number - const textEncoder = new fetchAPI.TextEncoder() - const readableStream = new fetchAPI.ReadableStream({ - start(controller) { - // ping client every 12 seconds to keep the connection alive - pingInterval = setInterval(() => { - if (!controller.desiredSize) { - clearInterval(pingInterval) - return - } - - controller.enqueue(textEncoder.encode(':\n\n')) - }, pingIntervalMs) as unknown as number - - if (isAsyncIterable(result)) { - iterator = result[Symbol.asyncIterator]() - } else { - let finished = false - iterator = { - next: () => { - if (finished) { - return Promise.resolve({ done: true, value: null }) - } - finished = true - return Promise.resolve({ done: false, value: result }) - }, - } - } - }, - async pull(controller) { - const { done, value } = await iterator.next() - - if (value != null) { - const chunk = jsonStringifyResultWithoutInternals(value) - controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`)) - } - if (done) { - clearInterval(pingInterval) - controller.close() - } - }, - async cancel(e) { - clearInterval(pingInterval) - await iterator.return?.(e) - }, - }) - return new fetchAPI.Response(readableStream, responseInit) -} diff --git a/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts b/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts new file mode 100644 index 0000000000..21fd4013f2 --- /dev/null +++ b/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts @@ -0,0 +1,86 @@ +import { ExecutionResult } from 'graphql' +import { isAsyncIterable } from '@envelop/core' + +import { getResponseInitByRespectingErrors } from '../../error.js' +import { FetchAPI, MaybeArray } from '../../types.js' +import { ResultProcessor, ResultProcessorInput } from '../types.js' +import { jsonStringifyResultWithoutInternals } from './stringify.js' + +export interface SSEProcessorOptions { + legacySSE: boolean +} + +export function getSSEProcessor(opts: SSEProcessorOptions): ResultProcessor { + return function processSSEResult( + result: ResultProcessorInput, + fetchAPI: FetchAPI, + ): Response { + let pingIntervalMs = 12_000 + + // for testing the pings, reduce the timeout + if (globalThis.process?.env?.NODE_ENV === 'test') { + pingIntervalMs = 300 + } + + const headersInit = { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Content-Encoding': 'none', + } + + const responseInit = getResponseInitByRespectingErrors(result, headersInit) + + let iterator: AsyncIterator> + + let pingInterval: number + const textEncoder = new fetchAPI.TextEncoder() + const readableStream = new fetchAPI.ReadableStream({ + start(controller) { + // ping client every 12 seconds to keep the connection alive + pingInterval = setInterval(() => { + if (!controller.desiredSize) { + clearInterval(pingInterval) + return + } + controller.enqueue(textEncoder.encode(':\n\n')) + }, pingIntervalMs) as unknown as number + + if (isAsyncIterable(result)) { + iterator = result[Symbol.asyncIterator]() + } else { + let finished = false + iterator = { + next: () => { + if (finished) { + return Promise.resolve({ done: true, value: null }) + } + finished = true + return Promise.resolve({ done: false, value: result }) + }, + } + } + }, + async pull(controller) { + const { done, value } = await iterator.next() + if (value != null) { + if (!opts.legacySSE) { + controller.enqueue(textEncoder.encode(`event: next\n`)) + } + const chunk = jsonStringifyResultWithoutInternals(value) + controller.enqueue(textEncoder.encode(`data: ${chunk}\n\n`)) + } + if (done) { + controller.enqueue(textEncoder.encode(`event: complete\n\n`)) + clearInterval(pingInterval) + controller.close() + } + }, + async cancel(e) { + clearInterval(pingInterval) + await iterator.return?.(e) + }, + }) + return new fetchAPI.Response(readableStream, responseInit) + } +} diff --git a/packages/graphql-yoga/src/plugins/useResultProcessor.ts b/packages/graphql-yoga/src/plugins/useResultProcessor.ts index 66a7b785b0..289ef0e3ca 100644 --- a/packages/graphql-yoga/src/plugins/useResultProcessor.ts +++ b/packages/graphql-yoga/src/plugins/useResultProcessor.ts @@ -5,7 +5,7 @@ import { isMatchingMediaType, } from './resultProcessor/accept.js' import { processMultipartResult } from './resultProcessor/multipart.js' -import { processPushResult } from './resultProcessor/push.js' +import { getSSEProcessor, SSEProcessorOptions } from './resultProcessor/sse.js' import { processRegularResult } from './resultProcessor/regular.js' import { Plugin, ResultProcessor } from './types.js' @@ -21,10 +21,14 @@ const multipart: ResultProcessorConfig = { processResult: processMultipartResult, } -const textEventStream: ResultProcessorConfig = { - mediaTypes: ['text/event-stream'], - asyncIterables: true, - processResult: processPushResult, +function getSSEProcessorConfig( + opts: SSEProcessorOptions, +): ResultProcessorConfig { + return { + mediaTypes: ['text/event-stream'], + asyncIterables: true, + processResult: getSSEProcessor(opts), + } } const regular: ResultProcessorConfig = { @@ -33,11 +37,12 @@ const regular: ResultProcessorConfig = { processResult: processRegularResult, } -const defaultList = [textEventStream, multipart, regular] -const subscriptionList = [multipart, textEventStream, regular] - -export function useResultProcessors(): Plugin { +export function useResultProcessors(opts: SSEProcessorOptions): Plugin { const isSubscriptionRequestMap = new WeakMap() + + const sse = getSSEProcessorConfig(opts) + const defaultList = [sse, multipart, regular] + const subscriptionList = [multipart, sse, regular] return { onSubscribe({ args: { contextValue } }) { if (contextValue.request) { diff --git a/packages/graphql-yoga/src/server.ts b/packages/graphql-yoga/src/server.ts index 2d028f056e..e7a32f66d2 100644 --- a/packages/graphql-yoga/src/server.ts +++ b/packages/graphql-yoga/src/server.ts @@ -170,6 +170,15 @@ export type YogaServerOptions = { * @default false */ batching?: BatchingOptions + /** + * Whether to use the legacy Yoga Server-Sent Events and not + * the GraphQL over SSE spec's distinct connection mode. + * + * @default true + * + * @deprecated Consider using GraphQL over SSE spec instead by setting this to `false`. Starting with the next major release, this flag will default to `false`. + */ + legacySse?: boolean } export type BatchingOptions = @@ -344,7 +353,9 @@ export class YogaServer< parse: parsePOSTFormUrlEncodedRequest, }), // Middlewares after the GraphQL execution - useResultProcessors(), + useResultProcessors({ + legacySSE: options?.legacySse !== false, + }), useErrorHandling((error, request) => { const errors = handleError(error, this.maskedErrorsOpts, this.logger) diff --git a/packages/plugins/apollo-inline-trace/__tests__/apollo-inline-trace.spec.ts b/packages/plugins/apollo-inline-trace/__tests__/apollo-inline-trace.spec.ts index a4ec2c49a3..bd8e0decfb 100644 --- a/packages/plugins/apollo-inline-trace/__tests__/apollo-inline-trace.spec.ts +++ b/packages/plugins/apollo-inline-trace/__tests__/apollo-inline-trace.spec.ts @@ -469,7 +469,7 @@ describe('Inline Trace', () => { expect(response.ok).toBe(true) const result = await response.text() - expect(result).toBe('data: {"data":{"hello":"world"}}\n\n') + expect(result).toContain('data: {"data":{"hello":"world"}}\n\n') }) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ee89b6d7b..e11be6db12 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1064,6 +1064,7 @@ importers: graphql: 16.6.0 graphql-http: ^1.7.2 graphql-scalars: 1.20.4 + graphql-sse: 2.0.0 html-minifier-terser: 7.1.0 json-bigint-patch: 0.0.8 lru-cache: ^8.0.0 @@ -1093,6 +1094,7 @@ importers: graphql: 16.6.0 graphql-http: 1.7.2_graphql@16.6.0 graphql-scalars: 1.20.4_graphql@16.6.0 + graphql-sse: 2.0.0_graphql@16.6.0 html-minifier-terser: 7.1.0 json-bigint-patch: 0.0.8 puppeteer: 19.6.0 @@ -18061,6 +18063,15 @@ packages: graphql: '>=0.11 <=16' dev: false + /graphql-sse/2.0.0_graphql@16.6.0: + resolution: {integrity: sha512-TTdFwxGM9RY68s22XWyhc+SyQn3PLbELDD2So0K6Cc6EIlBAyPuNV8VlPfNKa/la7gEf2SwHY7JoJplOmOY4LA==} + engines: {node: '>=12'} + peerDependencies: + graphql: '>=0.11 <=16' + dependencies: + graphql: 16.6.0 + dev: true + /graphql-tag/2.12.6: resolution: {integrity: sha512-FdSNcu2QQcWnM2VNvSCCDCVS5PpPqpzgFT8+GXzqJuoDd0CBncxCY278u4mhRO7tMgo2JjgJA5aZ+nWSQ/Z+xg==} engines: {node: '>=10'} diff --git a/website/src/pages/docs/features/subscriptions.mdx b/website/src/pages/docs/features/subscriptions.mdx index 0b03145e4e..b90a419c23 100644 --- a/website/src/pages/docs/features/subscriptions.mdx +++ b/website/src/pages/docs/features/subscriptions.mdx @@ -129,6 +129,10 @@ eventsource.onmessage = function (event) { const data = JSON.parse(event.data) console.log(data) // This will result something like `{ "data": { "countdown": 0 } }` } + +eventsource.addEventListener('complete', () => { + eventsource.close() // If operation ends, close the connection and prevent the client from reconnecting +}) ``` ### Client Usage with Apollo @@ -214,6 +218,9 @@ class SSELink extends ApolloLink { eventsource.onerror = function (error) { sink.error(error) } + eventsource.addEventListener('complete', () => { + eventsource.close() // If operation ends, close the connection and prevent the client from reconnecting + }) return () => eventsource.close() }) } @@ -306,6 +313,9 @@ const client = createClient({ eventsource.onerror = (error) => { sink.error(error) } + eventsource.addEventListener('complete', () => { + eventsource.close() // If operation ends, close the connection and prevent the client from reconnecting + }) return { unsubscribe: () => eventsource.close() } @@ -362,6 +372,9 @@ const executeSubscription = ( console.error(ev) sink.error(new Error('Unexpected error.')) } + eventsource.addEventListener('complete', () => { + eventsource.close() // If operation ends, close the connection and prevent the client from reconnecting + }) return () => source.close() }) } @@ -371,7 +384,33 @@ const network = Network.create(executeQueryOrMutation, executeSubscription) ## GraphQL over Server-Sent Events Protocol (via `graphql-sse`) -In case you want the subscriptions to be transported following the [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md), you simply use the `@graphql-yoga/plugin-graphql-sse` plugin for GraphQL Yoga that exposes an additional endpoint (defaulting to `/graphql/stream`) used for [graphql-sse](https://github.com/enisdenjo/graphql-sse) clients. +There are two different modes in [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md). You can see the differences in the [protocol specification](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md). + +### Distinct Connection Mode + +GraphQL Yoga supports [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode) only in distinct connection mode with an `sse.graphqlSSEDistinctConnections` flag. + +```ts filename="yoga.ts" +import { createServer } from 'node:http' +import { createYoga } from 'graphql-yoga' + +const yogaApp = createYoga({ + // ... + sse: { + graphqlSSEDistinctConnections: true + } +}) +``` + + + When you enable this, simple SSE recipes won't work because it changes the + data events format. You should use refer to [`graphql-sse` client + recipes](https://github.com/enisdenjo/graphql-sse#recipes). + + +### Single Connection Mode + +In case you want the subscriptions to be transported following the [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#single-connection-mode) also in the single connection mode, you simply use the `@graphql-yoga/plugin-graphql-sse` plugin for GraphQL Yoga that exposes an additional endpoint (defaulting to `/graphql/stream`) used for [graphql-sse](https://github.com/enisdenjo/graphql-sse) clients. The plugin will hijack the request from the `onRequest` hook and will use **all** envelop plugins provided. From 92eb617ca1ea5d67e36d7257806050c3423838c7 Mon Sep 17 00:00:00 2001 From: enisdenjo Date: Mon, 20 Mar 2023 16:31:50 +0100 Subject: [PATCH 2/9] add graphql-sse spec --- .../__tests__/graphql-sse.spec.ts | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 packages/graphql-yoga/__tests__/graphql-sse.spec.ts diff --git a/packages/graphql-yoga/__tests__/graphql-sse.spec.ts b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts new file mode 100644 index 0000000000..2bee443af8 --- /dev/null +++ b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts @@ -0,0 +1,188 @@ +import { createSchema, createYoga } from '../src/index.js' +import { createClient } from 'graphql-sse' + +describe('GraphQL over SSE', () => { + const schema = createSchema({ + typeDefs: /* GraphQL */ ` + type Query { + hello: String! + } + type Subscription { + greetings: String! + waitForPings: String! + } + `, + resolvers: { + Query: { + async hello() { + return 'world' + }, + }, + Subscription: { + greetings: { + async *subscribe() { + for (const hi of ['Hi', 'Bonjour', 'Hola', 'Ciao', 'Zdravo']) { + yield { greetings: hi } + } + }, + }, + waitForPings: { + // eslint-disable-next-line require-yield + async *subscribe() { + // a ping is issued every 5ms, wait for a few and just return + await new Promise((resolve) => setTimeout(resolve, 35)) + return + }, + }, + }, + }, + }) + + const yoga = createYoga({ + schema, + legacySse: false, + maskedErrors: false, + }) + + describe('Distinct connections mode', () => { + test('should issue pings while connected', async () => { + const res = await yoga.fetch( + 'http://yoga/graphql?query=subscription{waitForPings}', + { + headers: { + accept: 'text/event-stream', + }, + }, + ) + expect(res.ok).toBeTruthy() + await expect(res.text()).resolves.toMatchInlineSnapshot(` + ": + + : + + : + + event: complete + + " + `) + }) + + it('should support single result operations', async () => { + const client = createClient({ + url: 'http://yoga/graphql', + fetchFn: yoga.fetch, + abortControllerImpl: yoga.fetchAPI.AbortController, + singleConnection: false, // distinct connection mode + retryAttempts: 0, + }) + + await expect( + new Promise((resolve, reject) => { + let result: unknown + client.subscribe( + { + query: /* GraphQL */ ` + { + hello + } + `, + }, + { + next: (msg) => (result = msg), + error: reject, + complete: () => resolve(result), + }, + ) + }), + ).resolves.toMatchInlineSnapshot(` + { + "data": { + "hello": "world", + }, + } + `) + + client.dispose() + }) + + it('should support streaming operations', async () => { + const client = createClient({ + url: 'http://yoga/graphql', + fetchFn: yoga.fetch, + abortControllerImpl: yoga.fetchAPI.AbortController, + singleConnection: false, // distinct connection mode + retryAttempts: 0, + }) + + await expect( + new Promise((resolve, reject) => { + const msgs: unknown[] = [] + client.subscribe( + { + query: /* GraphQL */ ` + subscription { + greetings + } + `, + }, + { + next: (msg) => msgs.push(msg), + error: reject, + complete: () => resolve(msgs), + }, + ) + }), + ).resolves.toMatchInlineSnapshot(` + [ + { + "data": { + "greetings": "Hi", + }, + }, + { + "data": { + "greetings": "Bonjour", + }, + }, + { + "data": { + "greetings": "Hola", + }, + }, + { + "data": { + "greetings": "Ciao", + }, + }, + { + "data": { + "greetings": "Zdravo", + }, + }, + ] + `) + + client.dispose() + }) + + it('should report errors through the stream', async () => { + const res = await yoga.fetch('http://yoga/graphql?query={nope}', { + headers: { + accept: 'text/event-stream', + }, + }) + expect(res.ok).toBeTruthy() + await expect(res.text()).resolves.toMatchInlineSnapshot(` + "event: next + data: {"errors":[{"message":"Cannot query field \\"nope\\" on type \\"Query\\".","locations":[{"line":1,"column":2}]}]} + + event: complete + + " + `) + }) + }) + + it.todo('Single connections mode') +}) From f97a3d53b6a0eebde3321ff5eedc2ea73d76051a Mon Sep 17 00:00:00 2001 From: enisdenjo Date: Mon, 20 Mar 2023 16:34:39 +0100 Subject: [PATCH 3/9] drop graphql-sse-client spec --- .../graphql-sse-client.spec.ts | 87 ------------------- 1 file changed, 87 deletions(-) delete mode 100644 packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts diff --git a/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts b/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts deleted file mode 100644 index 40af4d578a..0000000000 --- a/packages/graphql-yoga/__integration-tests__/graphql-sse-client.spec.ts +++ /dev/null @@ -1,87 +0,0 @@ -import { ExecutionResult } from 'graphql' -import { createClient } from 'graphql-sse' -import { createSchema, createYoga } from '../src' - -describe('GraphQL SSE Client compatibility', () => { - describe('Distinct Connections', () => { - const yoga = createYoga({ - legacySse: false, - schema: createSchema({ - typeDefs: /* GraphQL */ ` - type Query { - hello: String - } - type Subscription { - greetings: String - } - `, - resolvers: { - Query: { - hello: () => 'world', - }, - Subscription: { - greetings: { - async *subscribe() { - yield { greetings: 'Hi' } - await new Promise((resolve) => setTimeout(resolve, 300)) - yield { greetings: 'Bonjour' } - await new Promise((resolve) => setTimeout(resolve, 300)) - yield { greetings: 'Hola' } - await new Promise((resolve) => setTimeout(resolve, 300)) - yield { greetings: 'Ciao' } - await new Promise((resolve) => setTimeout(resolve, 300)) - yield { greetings: 'Hallo' } - }, - }, - }, - }, - }), - }) - const client = createClient({ - url: 'http://localhost:4000/graphql', - fetchFn: yoga.fetch, - abortControllerImpl: yoga.fetchAPI.AbortController, - retryAttempts: 0, - }) - let unsubscribe: () => void - afterAll(() => { - unsubscribe?.() - client.dispose() - }) - it('handle queries', async () => { - const result = await new Promise((resolve, reject) => { - let result: ExecutionResult, unknown> - unsubscribe = client.subscribe( - { - query: '{ hello }', - }, - { - next: (data) => (result = data), - error: reject, - complete: () => resolve(result), - }, - ) - }) - - expect(result).toEqual({ data: { hello: 'world' } }) - }) - it('handle subscriptions', async () => { - const onNext = jest.fn() - - await new Promise((resolve, reject) => { - unsubscribe = client.subscribe( - { - query: 'subscription { greetings }', - }, - { - next: onNext, - error: reject, - complete: resolve, - }, - ) - }) - - expect(onNext).toBeCalledTimes(5) // we say "Hi" in 5 languages - }) - }) -}) From aa8e7e1c5010fa0395a9728b0e74f91939afc53d Mon Sep 17 00:00:00 2001 From: enisdenjo Date: Mon, 20 Mar 2023 16:39:38 +0100 Subject: [PATCH 4/9] adapt pings timeout --- packages/graphql-yoga/__tests__/graphql-sse.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/graphql-yoga/__tests__/graphql-sse.spec.ts b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts index 2bee443af8..7029564a3e 100644 --- a/packages/graphql-yoga/__tests__/graphql-sse.spec.ts +++ b/packages/graphql-yoga/__tests__/graphql-sse.spec.ts @@ -29,8 +29,8 @@ describe('GraphQL over SSE', () => { waitForPings: { // eslint-disable-next-line require-yield async *subscribe() { - // a ping is issued every 5ms, wait for a few and just return - await new Promise((resolve) => setTimeout(resolve, 35)) + // a ping is issued every 300ms, wait for a few and just return + await new Promise((resolve) => setTimeout(resolve, 300 * 3 + 100)) return }, }, From e7a54397f858f73fb82ae59ff2b71f749497f956 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 20 Mar 2023 18:44:57 +0300 Subject: [PATCH 5/9] Always use 200 per GraphQL SSE spec --- packages/graphql-yoga/src/plugins/resultProcessor/sse.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts b/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts index 21fd4013f2..68cd1e540a 100644 --- a/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts +++ b/packages/graphql-yoga/src/plugins/resultProcessor/sse.ts @@ -29,7 +29,11 @@ export function getSSEProcessor(opts: SSEProcessorOptions): ResultProcessor { 'Content-Encoding': 'none', } - const responseInit = getResponseInitByRespectingErrors(result, headersInit) + const responseInit = getResponseInitByRespectingErrors( + result, + headersInit, + !opts.legacySSE, + ) let iterator: AsyncIterator> From 582ed3d36f99e1c76a1f1b673536ace3651bf600 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Mon, 20 Mar 2023 19:08:22 +0300 Subject: [PATCH 6/9] Hapi hapi hapi chulo --- examples/hapi/__integration-tests__/hapi.spec.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/hapi/__integration-tests__/hapi.spec.ts b/examples/hapi/__integration-tests__/hapi.spec.ts index 36d840a623..167c4386de 100644 --- a/examples/hapi/__integration-tests__/hapi.spec.ts +++ b/examples/hapi/__integration-tests__/hapi.spec.ts @@ -68,6 +68,8 @@ describe('hapi example integration', () => { data: {"data":{"greetings":"Zdravo"}} + event: complete + " `) }) From 979906861ab124bbecf1026f81426896e2383e36 Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 21 Mar 2023 09:21:33 -0400 Subject: [PATCH 7/9] Update website/src/pages/docs/features/subscriptions.mdx Co-authored-by: Denis Badurina --- website/src/pages/docs/features/subscriptions.mdx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/website/src/pages/docs/features/subscriptions.mdx b/website/src/pages/docs/features/subscriptions.mdx index b90a419c23..bd544299bf 100644 --- a/website/src/pages/docs/features/subscriptions.mdx +++ b/website/src/pages/docs/features/subscriptions.mdx @@ -396,9 +396,7 @@ import { createYoga } from 'graphql-yoga' const yogaApp = createYoga({ // ... - sse: { - graphqlSSEDistinctConnections: true - } + legacySse: false }) ``` From bba3ca6229196af5265d50673515fbf2e057fabf Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 21 Mar 2023 09:21:41 -0400 Subject: [PATCH 8/9] Update website/src/pages/docs/features/subscriptions.mdx Co-authored-by: Denis Badurina --- website/src/pages/docs/features/subscriptions.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/src/pages/docs/features/subscriptions.mdx b/website/src/pages/docs/features/subscriptions.mdx index bd544299bf..3ed493f13c 100644 --- a/website/src/pages/docs/features/subscriptions.mdx +++ b/website/src/pages/docs/features/subscriptions.mdx @@ -388,7 +388,7 @@ There are two different modes in [GraphQL over Server-Sent Events Protocol](http ### Distinct Connection Mode -GraphQL Yoga supports [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode) only in distinct connection mode with an `sse.graphqlSSEDistinctConnections` flag. +GraphQL Yoga supports [GraphQL over Server-Sent Events Protocol](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md#distinct-connections-mode) only in distinct connection mode with an `legacySse = false` flag. ```ts filename="yoga.ts" import { createServer } from 'node:http' From a12ef36618aee6af096d03d8688fb949d5b1603a Mon Sep 17 00:00:00 2001 From: Arda TANRIKULU Date: Tue, 21 Mar 2023 09:21:48 -0400 Subject: [PATCH 9/9] Update .changeset/tricky-teachers-sin.md Co-authored-by: Denis Badurina --- .changeset/tricky-teachers-sin.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/tricky-teachers-sin.md b/.changeset/tricky-teachers-sin.md index 0eb3f75d73..04c3c823f5 100644 --- a/.changeset/tricky-teachers-sin.md +++ b/.changeset/tricky-teachers-sin.md @@ -2,4 +2,4 @@ 'graphql-yoga': minor --- -GraphQL SSE Distinct Connections mode support with `sse.graphqlSSEDistinctConnections` flag +GraphQL SSE Distinct Connections mode support with `legacySse = false` flag