Skip to content

Commit

Permalink
fix(server): Client can complete/cancel any operation
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo committed Jan 12, 2021
1 parent 65d1bfa commit 0ad1c4c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
2 changes: 1 addition & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Direction: **bidirectional**

- **Server -> Client** indicates that the requested operation execution has completed. If the server dispatched the `Error` message relative to the original `Subscribe` message, no `Complete` message will be emitted.

- **Client -> Server** indicates that the client has stopped listening and wants to complete the source stream. No further events, relevant to the original subscription, should be sent through.
- **Client -> Server** indicates that the client has stopped listening and wants to complete the subscription. No further events, relevant to the original subscription, should be sent through. Even if the client completed a single result operation before it resolved, the result should not be sent through once it does.

```typescript
interface CompleteMessage {
Expand Down
7 changes: 5 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,10 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
}
} else {
/** single emitted result */
await emit.next(operationResult, execArgs);
// if the client completed the subscription before the single result
// became available, he effectively canceled it and no data should be sent
if (id in ctx.subscriptions)
await emit.next(operationResult, execArgs);
}

// lack of subscription at this point indicates that the client
Expand All @@ -640,7 +643,7 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
}
case MessageType.Complete: {
await ctx.subscriptions[message.id]?.return?.();
delete ctx.subscriptions[message.id]; // deleting the subscription means no further action
delete ctx.subscriptions[message.id]; // deleting the subscription means no further activity should take place
break;
}
default:
Expand Down
64 changes: 64 additions & 0 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
subscribe,
GraphQLError,
ExecutionArgs,
ExecutionResult,
} from 'graphql';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol';
import { MessageType, parseMessage, stringifyMessage } from '../message';
Expand Down Expand Up @@ -943,6 +944,69 @@ describe('Subscribe', () => {
});
});

it('should be able to complete a long running query before the result becomes available', async () => {
let resultIsHere = (_result: ExecutionResult) => {
/* noop for calming typescript */
},
execute = () => {
/* noop for calming typescript */
};
const waitForExecute = new Promise<void>((resolve) => (execute = resolve));

const { url, ws } = await startTServer({
schema,
execute: () =>
new Promise<ExecutionResult>((resolve) => {
resultIsHere = resolve;
execute();
}),
});

const client = await createTClient(url);
client.ws.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);

await client.waitForMessage(({ data }) => {
expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
client.ws.send(
stringifyMessage<MessageType.Subscribe>({
id: '1',
type: MessageType.Subscribe,
payload: {
query: 'query { getValue }',
},
}),
);
});

await waitForExecute;

// complete before resolve
client.ws.send(
stringifyMessage<MessageType.Complete>({
id: '1',
type: MessageType.Complete,
}),
);

// will be just one client and the only next message can be "complete"
for (const client of ws.clients) {
await new Promise<void>((resolve) =>
client.once('message', () => resolve()),
);
}

// result became available after complete
resultIsHere({ data: { getValue: 'nope' } });

await client.waitForMessage(() => {
fail('No further activity expected after complete');
}, 30);
});

it('should execute the query and "error" out because of validation errors', async () => {
const { url } = await startTServer({
schema,
Expand Down

0 comments on commit 0ad1c4c

Please sign in to comment.