Skip to content

Commit

Permalink
fix(server): Enforce ID uniqueness across all operations and during t…
Browse files Browse the repository at this point in the history
…he whole subscription life (enisdenjo#96)

BREAKING CHANGE: The `Context.subscriptions` record value can be either an `AsyncIterator` or a `Promise`.
  • Loading branch information
enisdenjo authored Jan 12, 2021
1 parent 5483fac commit 65d1bfa
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 28 deletions.
2 changes: 1 addition & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Direction: **Client -> Server**

Requests an operation specified in the message `payload`. This message provides a unique ID field to connect published messages to the operation requested by this message.

If there is already an active subscriber for a streaming operation matching the provided ID, the server will close the socket immediately with the event `4409: Subscriber for <unique-operation-id> already exists`. The server may not assert this rule for operations returning a single result as they do not require reservations for additional future events.
If there is already an active subscriber for an operation matching the provided ID, regardless of the operation type, the server must close the socket immediately with the event `4409: Subscriber for <unique-operation-id> already exists`.

```typescript
interface SubscribeMessage {
Expand Down
13 changes: 9 additions & 4 deletions docs/interfaces/_server_.context.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ ___

### subscriptions

`Readonly` **subscriptions**: Record<[ID](../modules/_types_.md#id), AsyncIterator<unknown\>\>
`Readonly` **subscriptions**: Record<[ID](../modules/_types_.md#id), AsyncIterator<unknown\> \| null\>

Holds the active subscriptions for this context.
Subscriptions are for **streaming operations only**,
those that resolve once wont be added here.
Holds the active subscriptions for this context. **All operations**
that are taking place are aggregated here. The user is _subscribed_
to an operation when waiting for result(s).

If the subscription behind an ID is an `AsyncIterator` - the operation
is streaming; on the contrary, if the subscription is `null` - it is simply
a reservation, meaning - the operation resolves to a single result or is still
pending/being prepared.
44 changes: 23 additions & 21 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,16 @@ export interface Context<E = unknown> {
/** The parameters passed during the connection initialisation. */
readonly connectionParams?: Readonly<Record<string, unknown>>;
/**
* Holds the active subscriptions for this context.
* Subscriptions are for **streaming operations only**,
* those that resolve once wont be added here.
* Holds the active subscriptions for this context. **All operations**
* that are taking place are aggregated here. The user is _subscribed_
* to an operation when waiting for result(s).
*
* If the subscription behind an ID is an `AsyncIterator` - the operation
* is streaming; on the contrary, if the subscription is `null` - it is simply
* a reservation, meaning - the operation resolves to a single result or is still
* pending/being prepared.
*/
readonly subscriptions: Record<ID, AsyncIterator<unknown>>;
readonly subscriptions: Record<ID, AsyncIterator<unknown> | null>;
/**
* An extra field where you can store your own context values
* to pass between callbacks.
Expand Down Expand Up @@ -470,6 +475,14 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
}

const id = message.id;
if (id in ctx.subscriptions) {
return socket.close(4409, `Subscriber for ${id} already exists`);
}

// if this turns out to be a streaming operation, the subscription value
// will change to an `AsyncIterable`, otherwise it will stay as is
ctx.subscriptions[id] = null;

const emit = {
next: async (result: ExecutionResult, args: ExecutionArgs) => {
let nextMessage: NextMessage = {
Expand Down Expand Up @@ -610,30 +623,19 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {

if (isAsyncIterable(operationResult)) {
/** multiple emitted results */

// iterable subscriptions are distinct on ID
if (ctx.subscriptions[id]) {
return socket.close(
4409,
`Subscriber for ${id} already exists`,
);
}
ctx.subscriptions[id] = operationResult;

for await (const result of operationResult) {
await emit.next(result, execArgs);
}

// lack of subscription at this point indicates that the client
// completed the stream, he doesnt need to be reminded
await emit.complete(Boolean(ctx.subscriptions[id]));
delete ctx.subscriptions[id];
} else {
/** single emitted result */

await emit.next(operationResult, execArgs);
await emit.complete(true);
}

// lack of subscription at this point indicates that the client
// completed the subscription, he doesnt need to be reminded
await emit.complete(id in ctx.subscriptions);
delete ctx.subscriptions[id];
break;
}
case MessageType.Complete: {
Expand All @@ -652,7 +654,7 @@ export function makeServer<E = unknown>(options: ServerOptions<E>): Server<E> {
return async () => {
if (connectionInitWait) clearTimeout(connectionInitWait);
for (const sub of Object.values(ctx.subscriptions)) {
await sub.return?.();
await sub?.return?.();
}
};
},
Expand Down
49 changes: 47 additions & 2 deletions src/tests/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ describe('Subscribe', () => {
}, 30);
});

it('should close the socket on duplicate `subscription` operation subscriptions request', async () => {
it('should close the socket on duplicate operation requests', async () => {
const { url } = await startTServer();

const client = await createTClient(url);
Expand Down Expand Up @@ -1130,7 +1130,52 @@ describe('Subscribe', () => {
id: 'not-unique',
type: MessageType.Subscribe,
payload: {
query: 'subscription { greetings }',
query: 'query { getValue }',
},
}),
);

await client.waitForClose((event) => {
expect(event.code).toBe(4409);
expect(event.reason).toBe('Subscriber for not-unique already exists');
expect(event.wasClean).toBeTruthy();
});
});

it('should close the socket on duplicate operation requests even if one is still preparing', async () => {
const { url } = await startTServer({
onSubscribe: () =>
new Promise(() => {
/* i never resolve, the subscription will be preparing forever */
}),
});

const client = await createTClient(url);
client.ws.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);

await client.waitForMessage(({ data }) => {
expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
client.ws.send(
stringifyMessage<MessageType.Subscribe>({
id: 'not-unique',
type: MessageType.Subscribe,
payload: {
query: 'query { getValue }',
},
}),
);
});

client.ws.send(
stringifyMessage<MessageType.Subscribe>({
id: 'not-unique',
type: MessageType.Subscribe,
payload: {
query: 'query { getValue }',
},
}),
);
Expand Down

0 comments on commit 65d1bfa

Please sign in to comment.