Skip to content

Commit

Permalink
feat(client): Async iterator for subscriptions (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo authored Jun 22, 2023
1 parent a49c5b5 commit fb4b967
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 96 deletions.
259 changes: 259 additions & 0 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from '../common';
import { startRawServer, startWSTServer as startTServer } from './utils';
import { ExecutionResult } from 'graphql';
import { pong } from './fixtures/simple';

// silence console.error calls for nicer tests overview
const consoleError = console.error;
Expand Down Expand Up @@ -2324,3 +2325,261 @@ describe('events', () => {
expect(expected).toBeCalledTimes(6);
});
});

describe('iterate', () => {
it('should iterate a single result query', async () => {
const { url } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const iterator = client.iterate({
query: '{ getValue }',
});

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"getValue": "value",
},
},
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should iterate over subscription events', async () => {
const { url } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const iterator = client.iterate({
query: 'subscription { greetings }',
});

// Hi
await expect(iterator.next()).resolves.toBeDefined();
// Bonjour
await expect(iterator.next()).resolves.toBeDefined();
// Hola
await expect(iterator.next()).resolves.toBeDefined();
// Ciao
await expect(iterator.next()).resolves.toBeDefined();
// Zdravo
await expect(iterator.next()).resolves.toBeDefined();

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should report execution errors to iterator', async () => {
const { url } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const iterator = client.iterate({
query: 'subscription { throwing }',
});

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"errors": [
{
"locations": [
{
"column": 16,
"line": 1,
},
],
"message": "Kaboom!",
"path": [
"throwing",
],
},
],
},
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should throw in iterator connection errors', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});

pong(pingKey);
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"ping": "pong",
},
},
}
`);

await server.dispose(false);

await expect(iterator.next()).rejects.toEqual(
// forceful close
expect.objectContaining({
code: 1006,
reason: '',
}),
);
});

it('should complete subscription when iterator loop breaks', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});
iterator.return = jest.fn(iterator.return);

setTimeout(() => pong(pingKey), 0);

for await (const val of iterator) {
expect(val).toMatchInlineSnapshot(`
{
"data": {
"ping": "pong",
},
}
`);
break;
}

expect(iterator.return).toHaveBeenCalled();

await server.waitForClientClose();
});

it('should complete subscription when iterator loop throws', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});
iterator.return = jest.fn(iterator.return);

setTimeout(() => pong(pingKey), 0);

await expect(async () => {
for await (const val of iterator) {
expect(val).toMatchInlineSnapshot(`
{
"data": {
"ping": "pong",
},
}
`);
throw new Error(':)');
}
}).rejects.toBeDefined();

expect(iterator.return).toHaveBeenCalled();

await server.waitForClientClose();
});

it('should complete subscription when calling return directly on iterator', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
retryAttempts: 0,
onNonLazyError: noop,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});

pong(pingKey);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"ping": "pong",
},
},
}
`);

await expect(iterator.return?.()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);

await server.waitForClientClose();
});
});
6 changes: 6 additions & 0 deletions src/__tests__/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ export const schemaConfig: GraphQLSchemaConfig = {
};
},
},
throwing: {
type: new GraphQLNonNull(GraphQLString),
subscribe: async function () {
throw new Error('Kaboom!');
},
},
},
}),
};
Expand Down
74 changes: 74 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,13 @@ export interface Client extends Disposable {
payload: SubscribePayload,
sink: Sink<ExecutionResult<Data, Extensions>>,
): () => void;
/**
* Subscribes and iterates over emitted results from the WebSocket
* through the returned async iterator.
*/
iterate<Data = Record<string, unknown>, Extensions = unknown>(
payload: SubscribePayload,
): AsyncIterableIterator<ExecutionResult<Data, Extensions>>;
/**
* Terminates the WebSocket abruptly and immediately.
*
Expand Down Expand Up @@ -972,6 +979,73 @@ export function createClient<
if (!done) releaser();
};
},
iterate(request) {
const pending: ExecutionResult<
// TODO: how to not use `any` and not have a redundant function signature?
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any
>[] = [];
const deferred = {
done: false,
error: null as unknown,
resolve: () => {
// noop
},
};
const dispose = this.subscribe(request, {
next(val) {
pending.push(val);
deferred.resolve();
},
error(err) {
deferred.done = true;
deferred.error = err;
deferred.resolve();
},
complete() {
deferred.done = true;
deferred.resolve();
},
});

const iterator = (async function* iterator() {
for (;;) {
if (!pending.length) {
// only wait if there are no pending messages available
await new Promise<void>((resolve) => (deferred.resolve = resolve));
}
// first flush
while (pending.length) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
yield pending.shift()!;
}
// then error
if (deferred.error) {
throw deferred.error;
}
// or complete
if (deferred.done) {
return;
}
}
})();
iterator.throw = async (err) => {
if (!deferred.done) {
deferred.done = true;
deferred.error = err;
deferred.resolve();
}
return { done: true, value: undefined };
};
iterator.return = async () => {
dispose();
return { done: true, value: undefined };
};

return iterator;
},
async dispose() {
disposed = true;
if (connecting) {
Expand Down
Loading

0 comments on commit fb4b967

Please sign in to comment.