diff --git a/src/client.ts b/src/client.ts index f3cbd718..1e8558b5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -436,6 +436,7 @@ export function createClient(options: ClientOptions): Client { on: emitter.on, subscribe(payload, sink) { const id = generateID(); + let completed = false; const cancellerRef: CancellerRef = { current: null }; const messageListener = ({ data }: MessageEvent) => { @@ -464,6 +465,7 @@ export function createClient(options: ClientOptions): Client { } case MessageType.Complete: { if (message.id === id) { + completed = true; // the canceller must be set at this point // because you cannot receive a message // if there is no existing connection @@ -495,13 +497,15 @@ export function createClient(options: ClientOptions): Client { // either the canceller will be called and the promise resolved // or the socket closed and the promise rejected await throwOnCloseOrWaitForCancel(() => { - // send complete message to server on cancel - socket.send( - stringifyMessage({ - id: id, - type: MessageType.Complete, - }), - ); + // if not completed already, send complete message to server on cancel + if (!completed) { + socket.send( + stringifyMessage({ + id: id, + type: MessageType.Complete, + }), + ); + } }); socket.removeEventListener('message', messageListener); diff --git a/src/server.ts b/src/server.ts index b245da15..b0d76c23 100644 --- a/src/server.ts +++ b/src/server.ts @@ -571,13 +571,15 @@ export function createServer( } await sendMessage(ctx, errorMessage); }, - complete: async () => { + complete: async (notifyClient: boolean) => { const completeMessage: CompleteMessage = { id: message.id, type: MessageType.Complete, }; await onComplete?.(ctx, completeMessage); - await sendMessage(ctx, completeMessage); + if (notifyClient) { + await sendMessage(ctx, completeMessage); + } }, }; @@ -680,18 +682,22 @@ export function createServer( for await (const result of operationResult) { await emit.next(result, execArgs); } - await emit.complete(); + + // lack of subscription at this point indicates that the client + // completed the stream, he doesnt need to be reminded + await emit.complete(Boolean(ctx.subscriptions[message.id])); delete ctx.subscriptions[message.id]; } else { /** single emitted result */ await emit.next(operationResult, execArgs); - await emit.complete(); + await emit.complete(true); } break; } case MessageType.Complete: { await ctx.subscriptions[message.id]?.return?.(); + delete ctx.subscriptions[message.id]; // deleting the subscription means no further action break; } default: diff --git a/src/tests/client.ts b/src/tests/client.ts index 7f075f78..c7b5891c 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -380,6 +380,30 @@ describe('subscription operation', () => { expect(server.clients.size).toBe(0); }); + + it('should stop dispatching messages after completing a subscription', async () => { + const { + url, + server, + waitForOperation, + waitForComplete, + } = await startTServer(); + + const sub = tsubscribe(createClient({ url }), { + query: 'subscription { greetings }', + }); + await waitForOperation(); + + for (const client of server.webSocketServer.clients) { + client.once('message', () => { + // no more messages from the client + fail("Shouldn't have dispatched a message"); + }); + } + + await waitForComplete(); + await sub.waitForComplete(); + }); }); describe('"concurrency"', () => { diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts index 68ecf18b..17f97686 100644 --- a/src/tests/fixtures/simple.ts +++ b/src/tests/fixtures/simple.ts @@ -33,6 +33,7 @@ export interface TServer { expire?: number, ) => Promise; waitForOperation: (test?: () => void, expire?: number) => Promise; + waitForComplete: (test?: () => void, expire?: number) => Promise; waitForClientClose: (test?: () => void, expire?: number) => Promise; dispose: Dispose; } @@ -137,7 +138,8 @@ export async function startTServer( }); // create server and hook up for tracking operations - let pendingOperations = 0; + let pendingOperations = 0, + pendingCompletes = 0; const server = await createServer( { schema, @@ -155,6 +157,11 @@ export async function startTServer( emitter.emit('operation'); return maybeResult; }, + onComplete: async (...args) => { + pendingCompletes++; + await options?.onComplete?.(...args); + emitter.emit('compl'); + }, }, { server: httpServer, @@ -263,6 +270,25 @@ export async function startTServer( } }); }, + waitForComplete(test, expire) { + return new Promise((resolve) => { + function done() { + pendingCompletes--; + test?.(); + resolve(); + } + if (pendingCompletes > 0) { + return done(); + } + emitter.once('compl', done); + if (expire) { + setTimeout(() => { + emitter.off('compl', done); // expired + resolve(); + }, expire); + } + }); + }, waitForClientClose(test, expire) { return new Promise((resolve) => { function done() { diff --git a/src/tests/server.ts b/src/tests/server.ts index 07dc9257..4706e0df 100644 --- a/src/tests/server.ts +++ b/src/tests/server.ts @@ -1446,7 +1446,7 @@ describe('Subscribe', () => { }); }); - // complete + // send complete client.ws.send( stringifyMessage({ id: '1', @@ -1454,20 +1454,14 @@ describe('Subscribe', () => { }), ); - // confirm complete - await client.waitForMessage(({ data }) => { - expect(parseMessage(data)).toEqual({ - id: '1', - type: MessageType.Complete, - }); - }); + await server.waitForComplete(); server.pong(); server.pong(); server.pong(); await client.waitForMessage(() => { - fail('Shouldnt have received a message'); + fail("Shouldn't have received a message"); }, 30); });