From fb4b9673ed6689dda230c9f9141750b2cb538cb8 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Thu, 22 Jun 2023 14:07:40 +0200 Subject: [PATCH] feat(client): Async iterator for subscriptions (#486) --- src/__tests__/client.ts | 259 ++++++++++++++++++++++++++++++ src/__tests__/fixtures/simple.ts | 6 + src/client.ts | 74 +++++++++ website/src/pages/get-started.mdx | 45 ++---- website/src/pages/recipes.mdx | 71 +------- 5 files changed, 359 insertions(+), 96 deletions(-) diff --git a/src/__tests__/client.ts b/src/__tests__/client.ts index e7256081..92a0c357 100644 --- a/src/__tests__/client.ts +++ b/src/__tests__/client.ts @@ -14,6 +14,7 @@ import { } from '../common'; import { startRawServer, startWSTServer as startTServer } from './utils'; import { ExecutionResult } from 'graphql'; +import { pong } from './fixtures/simple'; // silence console.error calls for nicer tests overview const consoleError = console.error; @@ -2324,3 +2325,261 @@ describe('events', () => { expect(expected).toBeCalledTimes(6); }); }); + +describe('iterate', () => { + it('should iterate a single result query', async () => { + const { url } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const iterator = client.iterate({ + query: '{ getValue }', + }); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": false, + "value": { + "data": { + "getValue": "value", + }, + }, + } + `); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": true, + "value": undefined, + } + `); + }); + + it('should iterate over subscription events', async () => { + const { url } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const iterator = client.iterate({ + query: 'subscription { greetings }', + }); + + // Hi + await expect(iterator.next()).resolves.toBeDefined(); + // Bonjour + await expect(iterator.next()).resolves.toBeDefined(); + // Hola + await expect(iterator.next()).resolves.toBeDefined(); + // Ciao + await expect(iterator.next()).resolves.toBeDefined(); + // Zdravo + await expect(iterator.next()).resolves.toBeDefined(); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": true, + "value": undefined, + } + `); + }); + + it('should report execution errors to iterator', async () => { + const { url } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const iterator = client.iterate({ + query: 'subscription { throwing }', + }); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": false, + "value": { + "errors": [ + { + "locations": [ + { + "column": 16, + "line": 1, + }, + ], + "message": "Kaboom!", + "path": [ + "throwing", + ], + }, + ], + }, + } + `); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": true, + "value": undefined, + } + `); + }); + + it('should throw in iterator connection errors', async () => { + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const pingKey = Math.random().toString(); + const iterator = client.iterate({ + query: `subscription { ping(key: "${pingKey}") }`, + }); + + pong(pingKey); + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": false, + "value": { + "data": { + "ping": "pong", + }, + }, + } + `); + + await server.dispose(false); + + await expect(iterator.next()).rejects.toEqual( + // forceful close + expect.objectContaining({ + code: 1006, + reason: '', + }), + ); + }); + + it('should complete subscription when iterator loop breaks', async () => { + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const pingKey = Math.random().toString(); + const iterator = client.iterate({ + query: `subscription { ping(key: "${pingKey}") }`, + }); + iterator.return = jest.fn(iterator.return); + + setTimeout(() => pong(pingKey), 0); + + for await (const val of iterator) { + expect(val).toMatchInlineSnapshot(` + { + "data": { + "ping": "pong", + }, + } + `); + break; + } + + expect(iterator.return).toHaveBeenCalled(); + + await server.waitForClientClose(); + }); + + it('should complete subscription when iterator loop throws', async () => { + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const pingKey = Math.random().toString(); + const iterator = client.iterate({ + query: `subscription { ping(key: "${pingKey}") }`, + }); + iterator.return = jest.fn(iterator.return); + + setTimeout(() => pong(pingKey), 0); + + await expect(async () => { + for await (const val of iterator) { + expect(val).toMatchInlineSnapshot(` + { + "data": { + "ping": "pong", + }, + } + `); + throw new Error(':)'); + } + }).rejects.toBeDefined(); + + expect(iterator.return).toHaveBeenCalled(); + + await server.waitForClientClose(); + }); + + it('should complete subscription when calling return directly on iterator', async () => { + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 0, + onNonLazyError: noop, + }); + + const pingKey = Math.random().toString(); + const iterator = client.iterate({ + query: `subscription { ping(key: "${pingKey}") }`, + }); + + pong(pingKey); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": false, + "value": { + "data": { + "ping": "pong", + }, + }, + } + `); + + await expect(iterator.return?.()).resolves.toMatchInlineSnapshot(` + { + "done": true, + "value": undefined, + } + `); + + await expect(iterator.next()).resolves.toMatchInlineSnapshot(` + { + "done": true, + "value": undefined, + } + `); + + await server.waitForClientClose(); + }); +}); diff --git a/src/__tests__/fixtures/simple.ts b/src/__tests__/fixtures/simple.ts index 4cf5b978..c50ebc31 100644 --- a/src/__tests__/fixtures/simple.ts +++ b/src/__tests__/fixtures/simple.ts @@ -100,6 +100,12 @@ export const schemaConfig: GraphQLSchemaConfig = { }; }, }, + throwing: { + type: new GraphQLNonNull(GraphQLString), + subscribe: async function () { + throw new Error('Kaboom!'); + }, + }, }, }), }; diff --git a/src/client.ts b/src/client.ts index 500334af..18992fed 100644 --- a/src/client.ts +++ b/src/client.ts @@ -440,6 +440,13 @@ export interface Client extends Disposable { payload: SubscribePayload, sink: Sink>, ): () => void; + /** + * Subscribes and iterates over emitted results from the WebSocket + * through the returned async iterator. + */ + iterate, Extensions = unknown>( + payload: SubscribePayload, + ): AsyncIterableIterator>; /** * Terminates the WebSocket abruptly and immediately. * @@ -972,6 +979,73 @@ export function createClient< if (!done) releaser(); }; }, + iterate(request) { + const pending: ExecutionResult< + // TODO: how to not use `any` and not have a redundant function signature? + // eslint-disable-next-line @typescript-eslint/no-explicit-any + any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + any + >[] = []; + const deferred = { + done: false, + error: null as unknown, + resolve: () => { + // noop + }, + }; + const dispose = this.subscribe(request, { + next(val) { + pending.push(val); + deferred.resolve(); + }, + error(err) { + deferred.done = true; + deferred.error = err; + deferred.resolve(); + }, + complete() { + deferred.done = true; + deferred.resolve(); + }, + }); + + const iterator = (async function* iterator() { + for (;;) { + if (!pending.length) { + // only wait if there are no pending messages available + await new Promise((resolve) => (deferred.resolve = resolve)); + } + // first flush + while (pending.length) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + yield pending.shift()!; + } + // then error + if (deferred.error) { + throw deferred.error; + } + // or complete + if (deferred.done) { + return; + } + } + })(); + iterator.throw = async (err) => { + if (!deferred.done) { + deferred.done = true; + deferred.error = err; + deferred.resolve(); + } + return { done: true, value: undefined }; + }; + iterator.return = async () => { + dispose(); + return { done: true, value: undefined }; + }; + + return iterator; + }, async dispose() { disposed = true; if (connecting) { diff --git a/website/src/pages/get-started.mdx b/website/src/pages/get-started.mdx index c645ad38..2ad80119 100644 --- a/website/src/pages/get-started.mdx +++ b/website/src/pages/get-started.mdx @@ -186,46 +186,25 @@ const client = createClient({ // query (async () => { - const result = await new Promise((resolve, reject) => { - let result; - client.subscribe( - { - query: '{ hello }', - }, - { - next: (data) => (result = data), - error: reject, - complete: () => resolve(result), - }, - ); + const query = client.iterate({ + query: '{ hello }', }); - expect(result).toEqual({ hello: 'Hello World!' }); + const { value } = await query.next(); + expect(value).toEqual({ hello: 'world' }); })(); // subscription (async () => { - const onNext = () => { - /* handle incoming values */ - }; - - let unsubscribe = () => { - /* complete the subscription */ - }; - - await new Promise((resolve, reject) => { - unsubscribe = client.subscribe( - { - query: 'subscription { greetings }', - }, - { - next: onNext, - error: reject, - complete: resolve, - }, - ); + const subscription = client.iterate({ + query: 'subscription { greetings }', }); - expect(onNext).toBeCalledTimes(5); // we say "Hi" in 5 languages + for await (const event of subscription) { + expect(event).toEqual({ greetings: 'Hi' }); + + // complete a running subscription by breaking the iterator loop + break; + } })(); ``` diff --git a/website/src/pages/recipes.mdx b/website/src/pages/recipes.mdx index 1149b660..81f91f07 100644 --- a/website/src/pages/recipes.mdx +++ b/website/src/pages/recipes.mdx @@ -11,25 +11,15 @@ const client = createClient({ url: 'ws://hey.there:4000/graphql', }); -async function execute(payload: SubscribePayload) { - return new Promise((resolve, reject) => { - let result: T; - client.subscribe(payload, { - next: (data) => (result = data), - error: reject, - complete: () => resolve(result), - }); +(async () => { + const query = client.iterate({ + query: '{ hello }', }); -} -// use -(async () => { try { - const result = await execute({ - query: '{ hello }', - }); + const { value } = await query.next(); + // next = value = { data: { hello: 'Hello World!' } } // complete - // next = result = { data: { hello: 'Hello World!' } } } catch (err) { // error } @@ -45,60 +35,15 @@ const client = createClient({ url: 'ws://iterators.ftw:4000/graphql', }); -function subscribe(payload: SubscribePayload): AsyncGenerator { - let deferred: { - resolve: (done: boolean) => void; - reject: (err: unknown) => void; - } | null = null; - const pending: T[] = []; - let throwMe: unknown = null, - done = false; - const dispose = client.subscribe(payload, { - next: (data) => { - pending.push(data); - deferred?.resolve(false); - }, - error: (err) => { - throwMe = err; - deferred?.reject(throwMe); - }, - complete: () => { - done = true; - deferred?.resolve(true); - }, - }); - return { - [Symbol.asyncIterator]() { - return this; - }, - async next() { - if (done) return { done: true, value: undefined }; - if (throwMe) throw throwMe; - if (pending.length) return { value: pending.shift()! }; - return (await new Promise( - (resolve, reject) => (deferred = { resolve, reject }), - )) - ? { done: true, value: undefined } - : { value: pending.shift()! }; - }, - async throw(err) { - throw err; - }, - async return() { - dispose(); - return { done: true, value: undefined }; - }, - }; -} - (async () => { - const subscription = subscribe({ + const subscription = client.iterate({ query: 'subscription { greetings }', }); - // subscription.return() to dispose + // "subscription.return()" to dispose for await (const result of subscription) { // next = result = { data: { greetings: 5x } } + // "break" to dispose } // complete })();