Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Stop sending messages after receiving complete #65

Merged
merged 13 commits into from
Nov 12, 2020
18 changes: 11 additions & 7 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ export function createClient(options: ClientOptions): Client {
on: emitter.on,
subscribe(payload, sink) {
const id = generateID();
let completed = false;
const cancellerRef: CancellerRef = { current: null };

const messageListener = ({ data }: MessageEvent) => {
Expand Down Expand Up @@ -464,6 +465,7 @@ export function createClient(options: ClientOptions): Client {
}
case MessageType.Complete: {
if (message.id === id) {
completed = true;
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
Expand Down Expand Up @@ -495,13 +497,15 @@ export function createClient(options: ClientOptions): Client {
// either the canceller will be called and the promise resolved
// or the socket closed and the promise rejected
await throwOnCloseOrWaitForCancel(() => {
// send complete message to server on cancel
socket.send(
stringifyMessage<MessageType.Complete>({
id: id,
type: MessageType.Complete,
}),
);
// if not completed already, send complete message to server on cancel
if (!completed) {
socket.send(
stringifyMessage<MessageType.Complete>({
id: id,
type: MessageType.Complete,
}),
);
}
});

socket.removeEventListener('message', messageListener);
Expand Down
14 changes: 10 additions & 4 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,15 @@ export function createServer(
}
await sendMessage<MessageType.Error>(ctx, errorMessage);
},
complete: async () => {
complete: async (notifyClient: boolean) => {
const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
};
await onComplete?.(ctx, completeMessage);
await sendMessage<MessageType.Complete>(ctx, completeMessage);
if (notifyClient) {
await sendMessage<MessageType.Complete>(ctx, completeMessage);
}
},
};

Expand Down Expand Up @@ -680,18 +682,22 @@ export function createServer(
for await (const result of operationResult) {
await emit.next(result, execArgs);
}
await emit.complete();

// lack of subscription at this point indicates that the client
// completed the stream, he doesnt need to be reminded
await emit.complete(Boolean(ctx.subscriptions[message.id]));
delete ctx.subscriptions[message.id];
} else {
/** single emitted result */

await emit.next(operationResult, execArgs);
await emit.complete();
await emit.complete(true);
}
break;
}
case MessageType.Complete: {
await ctx.subscriptions[message.id]?.return?.();
delete ctx.subscriptions[message.id]; // deleting the subscription means no further action
break;
}
default:
Expand Down
24 changes: 24 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,30 @@ describe('subscription operation', () => {

expect(server.clients.size).toBe(0);
});

it('should stop dispatching messages after completing a subscription', async () => {
const {
url,
server,
waitForOperation,
waitForComplete,
} = await startTServer();

const sub = tsubscribe(createClient({ url }), {
query: 'subscription { greetings }',
});
await waitForOperation();

for (const client of server.webSocketServer.clients) {
client.once('message', () => {
// no more messages from the client
fail("Shouldn't have dispatched a message");
});
}

await waitForComplete();
await sub.waitForComplete();
});
});

describe('"concurrency"', () => {
Expand Down
28 changes: 27 additions & 1 deletion src/tests/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface TServer {
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
waitForComplete: (test?: () => void, expire?: number) => Promise<void>;
waitForClientClose: (test?: () => void, expire?: number) => Promise<void>;
dispose: Dispose;
}
Expand Down Expand Up @@ -137,7 +138,8 @@ export async function startTServer(
});

// create server and hook up for tracking operations
let pendingOperations = 0;
let pendingOperations = 0,
pendingCompletes = 0;
const server = await createServer(
{
schema,
Expand All @@ -155,6 +157,11 @@ export async function startTServer(
emitter.emit('operation');
return maybeResult;
},
onComplete: async (...args) => {
pendingCompletes++;
await options?.onComplete?.(...args);
emitter.emit('compl');
},
},
{
server: httpServer,
Expand Down Expand Up @@ -263,6 +270,25 @@ export async function startTServer(
}
});
},
waitForComplete(test, expire) {
return new Promise((resolve) => {
function done() {
pendingCompletes--;
test?.();
resolve();
}
if (pendingCompletes > 0) {
return done();
}
emitter.once('compl', done);
if (expire) {
setTimeout(() => {
emitter.off('compl', done); // expired
resolve();
}, expire);
}
});
},
waitForClientClose(test, expire) {
return new Promise((resolve) => {
function done() {
Expand Down
12 changes: 3 additions & 9 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1446,28 +1446,22 @@ describe('Subscribe', () => {
});
});

// complete
// send complete
client.ws.send(
stringifyMessage<MessageType.Complete>({
id: '1',
type: MessageType.Complete,
}),
);

// confirm complete
await client.waitForMessage(({ data }) => {
expect(parseMessage(data)).toEqual({
id: '1',
type: MessageType.Complete,
});
});
await server.waitForComplete();

server.pong();
server.pong();
server.pong();

await client.waitForMessage(() => {
fail('Shouldnt have received a message');
fail("Shouldn't have received a message");
}, 30);
});

Expand Down