Skip to content

Commit

Permalink
fix: Stop sending messages after receiving complete (enisdenjo#65)
Browse files Browse the repository at this point in the history
* Do not send Complete event from client after Complete received from server

* Apply suggestions

* Remove check for socket.readyState

* Server updates

* refactor: test simply doesnt want a message

* refactor: unnecessary

* test: server wait for complete

* fix: dont rely on timeouts

* style: small change

* refactor: simplify

* docs: reminded*

* test: client should stop dispatching after complete

Co-authored-by: Denis Badurina <badurinadenis@gmail.com>
  • Loading branch information
rpastro and enisdenjo authored Nov 12, 2020
1 parent ff54d5d commit 3f4f836
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 21 deletions.
18 changes: 11 additions & 7 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ export function createClient(options: ClientOptions): Client {
on: emitter.on,
subscribe(payload, sink) {
const id = generateID();
let completed = false;
const cancellerRef: CancellerRef = { current: null };

const messageListener = ({ data }: MessageEvent) => {
Expand Down Expand Up @@ -464,6 +465,7 @@ export function createClient(options: ClientOptions): Client {
}
case MessageType.Complete: {
if (message.id === id) {
completed = true;
// the canceller must be set at this point
// because you cannot receive a message
// if there is no existing connection
Expand Down Expand Up @@ -495,13 +497,15 @@ export function createClient(options: ClientOptions): Client {
// either the canceller will be called and the promise resolved
// or the socket closed and the promise rejected
await throwOnCloseOrWaitForCancel(() => {
// send complete message to server on cancel
socket.send(
stringifyMessage<MessageType.Complete>({
id: id,
type: MessageType.Complete,
}),
);
// if not completed already, send complete message to server on cancel
if (!completed) {
socket.send(
stringifyMessage<MessageType.Complete>({
id: id,
type: MessageType.Complete,
}),
);
}
});

socket.removeEventListener('message', messageListener);
Expand Down
14 changes: 10 additions & 4 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,15 @@ export function createServer(
}
await sendMessage<MessageType.Error>(ctx, errorMessage);
},
complete: async () => {
complete: async (notifyClient: boolean) => {
const completeMessage: CompleteMessage = {
id: message.id,
type: MessageType.Complete,
};
await onComplete?.(ctx, completeMessage);
await sendMessage<MessageType.Complete>(ctx, completeMessage);
if (notifyClient) {
await sendMessage<MessageType.Complete>(ctx, completeMessage);
}
},
};

Expand Down Expand Up @@ -680,18 +682,22 @@ export function createServer(
for await (const result of operationResult) {
await emit.next(result, execArgs);
}
await emit.complete();

// lack of subscription at this point indicates that the client
// completed the stream, he doesnt need to be reminded
await emit.complete(Boolean(ctx.subscriptions[message.id]));
delete ctx.subscriptions[message.id];
} else {
/** single emitted result */

await emit.next(operationResult, execArgs);
await emit.complete();
await emit.complete(true);
}
break;
}
case MessageType.Complete: {
await ctx.subscriptions[message.id]?.return?.();
delete ctx.subscriptions[message.id]; // deleting the subscription means no further action
break;
}
default:
Expand Down
24 changes: 24 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,30 @@ describe('subscription operation', () => {

expect(server.clients.size).toBe(0);
});

it('should stop dispatching messages after completing a subscription', async () => {
const {
url,
server,
waitForOperation,
waitForComplete,
} = await startTServer();

const sub = tsubscribe(createClient({ url }), {
query: 'subscription { greetings }',
});
await waitForOperation();

for (const client of server.webSocketServer.clients) {
client.once('message', () => {
// no more messages from the client
fail("Shouldn't have dispatched a message");
});
}

await waitForComplete();
await sub.waitForComplete();
});
});

describe('"concurrency"', () => {
Expand Down
28 changes: 27 additions & 1 deletion src/tests/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface TServer {
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
waitForComplete: (test?: () => void, expire?: number) => Promise<void>;
waitForClientClose: (test?: () => void, expire?: number) => Promise<void>;
dispose: Dispose;
}
Expand Down Expand Up @@ -137,7 +138,8 @@ export async function startTServer(
});

// create server and hook up for tracking operations
let pendingOperations = 0;
let pendingOperations = 0,
pendingCompletes = 0;
const server = await createServer(
{
schema,
Expand All @@ -155,6 +157,11 @@ export async function startTServer(
emitter.emit('operation');
return maybeResult;
},
onComplete: async (...args) => {
pendingCompletes++;
await options?.onComplete?.(...args);
emitter.emit('compl');
},
},
{
server: httpServer,
Expand Down Expand Up @@ -263,6 +270,25 @@ export async function startTServer(
}
});
},
waitForComplete(test, expire) {
return new Promise((resolve) => {
function done() {
pendingCompletes--;
test?.();
resolve();
}
if (pendingCompletes > 0) {
return done();
}
emitter.once('compl', done);
if (expire) {
setTimeout(() => {
emitter.off('compl', done); // expired
resolve();
}, expire);
}
});
},
waitForClientClose(test, expire) {
return new Promise((resolve) => {
function done() {
Expand Down
12 changes: 3 additions & 9 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1446,28 +1446,22 @@ describe('Subscribe', () => {
});
});

// complete
// send complete
client.ws.send(
stringifyMessage<MessageType.Complete>({
id: '1',
type: MessageType.Complete,
}),
);

// confirm complete
await client.waitForMessage(({ data }) => {
expect(parseMessage(data)).toEqual({
id: '1',
type: MessageType.Complete,
});
});
await server.waitForComplete();

server.pong();
server.pong();
server.pong();

await client.waitForMessage(() => {
fail('Shouldnt have received a message');
fail("Shouldn't have received a message");
}, 30);
});

Expand Down

0 comments on commit 3f4f836

Please sign in to comment.