From fb4d8e9efdfdd0cbe3b7cc34ddadbad3a795ae35 Mon Sep 17 00:00:00 2001 From: Denis Badurina Date: Thu, 1 Oct 2020 22:49:50 +0200 Subject: [PATCH] fix(client): Dispose of subscription on complete or error messages (#23) * fix: subscription should be canceled on complete * docs: update readme * fix: actually dispose on error or complete --- README.md | 23 +++++++-------------- src/client.ts | 15 ++++++++++++-- src/tests/client.ts | 50 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index c8554ee9..94aba789 100644 --- a/README.md +++ b/README.md @@ -89,17 +89,14 @@ const client = createClient({ (async () => { const result = await new Promise((resolve, reject) => { let result; - const dispose = client.subscribe( + client.subscribe( { query: '{ hello }', }, { next: (data) => (result = data), error: reject, - complete: () => { - dispose(); - resolve(result); - }, + complete: () => resolve(result), }, ); }); @@ -114,17 +111,14 @@ const client = createClient({ }; await new Promise((resolve, reject) => { - const dispose = client.subscribe( + client.subscribe( { query: 'subscription { greetings }', }, { next: onNext, error: reject, - complete: () => { - dispose(); - resolve(); - }, + complete: resolve, }, ); }); @@ -148,13 +142,10 @@ const client = createClient({ async function execute(payload: SubscribePayload) { return new Promise((resolve, reject) => { let result: T; - const dispose = client.subscribe(payload, { + client.subscribe(payload, { next: (data) => (result = data), error: reject, - complete: () => { - dispose(); - resolve(result); - }, + complete: () => resolve(result), }); }); } @@ -165,7 +156,7 @@ async function execute(payload: SubscribePayload) { const result = await execute({ query: '{ hello }', }); - // complete and dispose + // complete // next = result = { data: { hello: 'Hello World!' } } } catch (err) { // error diff --git a/src/client.ts b/src/client.ts index 2799735e..43e36c4e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -424,6 +424,7 @@ export function createClient(options: ClientOptions): Client { on: emitter.on, subscribe(payload, sink) { const id = generateID(); + const cancellerRef: CancellerRef = { current: null }; const messageHandler = ({ data }: MessageEvent) => { const message = memoParseMessage(data); @@ -438,19 +439,28 @@ export function createClient(options: ClientOptions): Client { case MessageType.Error: { if (message.id === id) { sink.error(message.payload); + // the canceller must be set at this point + // because you cannot receive a message + // if there is no existing connection + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + cancellerRef.current!(); } return; } case MessageType.Complete: { if (message.id === id) { sink.complete(); + // the canceller must be set at this point + // because you cannot receive a message + // if there is no existing connection + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + cancellerRef.current!(); } return; } } }; - const cancellerRef: CancellerRef = { current: null }; (async () => { for (;;) { try { @@ -511,7 +521,8 @@ export function createClient(options: ClientOptions): Client { } })() .catch(sink.error) - .then(sink.complete); // resolves on cancel or normal closure + .then(sink.complete) // resolves on cancel or normal closure + .finally(() => (cancellerRef.current = null)); // when this promise settles there is nothing to cancel return () => { if (cancellerRef.current) { diff --git a/src/tests/client.ts b/src/tests/client.ts index 8c843684..3acd1d72 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -281,6 +281,56 @@ describe('subscription operation', () => { expect(generateIDFn).toBeCalled(); }); + + it('should dispose of the subscription on complete', async () => { + const client = createClient({ url }); + + const completeFn = jest.fn(); + client.subscribe( + { + query: `{ + getValue + }`, + }, + { + next: noop, + error: () => { + fail(`Unexpected error call`); + }, + complete: completeFn, + }, + ); + await wait(20); + + expect(completeFn).toBeCalled(); + + await wait(20); + expect(server.webSocketServer.clients.size).toBe(0); + }); + + it('should dispose of the subscription on error', async () => { + const client = createClient({ url }); + + const errorFn = jest.fn(); + client.subscribe( + { + query: `{ + iDontExist + }`, + }, + { + next: noop, + error: errorFn, + complete: noop, + }, + ); + await wait(20); + + expect(errorFn).toBeCalled(); + + await wait(20); + expect(server.webSocketServer.clients.size).toBe(0); + }); }); describe('"concurrency"', () => {