Skip to content

Commit

Permalink
feat(client): Async iterator for subscriptions (enisdenjo#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
enisdenjo authored Jun 22, 2023
1 parent d760f2d commit fb8bf11
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 98 deletions.
260 changes: 260 additions & 0 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,3 +636,263 @@ describe('retries', () => {
expect(retryFn).not.toHaveBeenCalled();
});
});

describe('iterate', () => {
it('should iterate a single result query', async () => {
const { fetch } = createTFetch();

const client = createClient({
url: 'http://localhost',
fetchFn: fetch,
retryAttempts: 0,
});

const iterator = client.iterate({
query: '{ getValue }',
});

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"getValue": "value",
},
},
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should iterate over subscription events', async () => {
const { fetch } = createTFetch();

const client = createClient({
url: 'http://localhost',
fetchFn: fetch,
retryAttempts: 0,
});

const iterator = client.iterate({
query: 'subscription { greetings }',
});

// Hi
await expect(iterator.next()).resolves.toBeDefined();
// Bonjour
await expect(iterator.next()).resolves.toBeDefined();
// Hola
await expect(iterator.next()).resolves.toBeDefined();
// Ciao
await expect(iterator.next()).resolves.toBeDefined();
// Zdravo
await expect(iterator.next()).resolves.toBeDefined();

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should report execution errors to iterator', async () => {
const { fetch } = createTFetch();

const client = createClient({
url: 'http://localhost',
fetchFn: fetch,
retryAttempts: 0,
});

const iterator = client.iterate({
query: 'subscription { throwing }',
});

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"errors": [
{
"locations": [
{
"column": 16,
"line": 1,
},
],
"message": "Kaboom!",
"path": [
"throwing",
],
},
],
},
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);
});

it('should throw in iterator connection errors', async () => {
const { fetch, dispose } = createTFetch();

const client = createClient({
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 0,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});

pong(pingKey);
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"ping": "pong",
},
},
}
`);

await dispose();

await expect(iterator.next()).rejects.toMatchInlineSnapshot(
`[NetworkError: Connection closed while having active streams]`,
);
});

it('should complete subscription when iterator loop breaks', async () => {
const { fetch, waitForRequest } = createTFetch();

const client = createClient({
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 0,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});
iterator.return = jest.fn(iterator.return);

const req = await waitForRequest();

setTimeout(() => pong(pingKey), 0);

for await (const val of iterator) {
expect(val).toMatchInlineSnapshot(`
{
"data": {
"ping": "pong",
},
}
`);
break;
}

expect(iterator.return).toHaveBeenCalled();

expect(req.signal.aborted).toBeTruthy();
});

it('should complete subscription when iterator loop throws', async () => {
const { fetch, waitForRequest } = createTFetch();

const client = createClient({
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 0,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});
iterator.return = jest.fn(iterator.return);

const req = await waitForRequest();

setTimeout(() => pong(pingKey), 0);

await expect(async () => {
for await (const val of iterator) {
expect(val).toMatchInlineSnapshot(`
{
"data": {
"ping": "pong",
},
}
`);
throw new Error(':)');
}
}).rejects.toBeDefined();

expect(iterator.return).toHaveBeenCalled();

expect(req.signal.aborted).toBeTruthy();
});

it('should complete subscription when calling return directly on iterator', async () => {
const { fetch, waitForRequest } = createTFetch();

const client = createClient({
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 0,
});

const pingKey = Math.random().toString();
const iterator = client.iterate({
query: `subscription { ping(key: "${pingKey}") }`,
});

const req = await waitForRequest();

pong(pingKey);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": {
"data": {
"ping": "pong",
},
},
}
`);

await expect(iterator.return?.()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);

await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
{
"done": true,
"value": undefined,
}
`);

expect(req.signal.aborted).toBeTruthy();
});
});
74 changes: 74 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ export interface Client {
request: RequestParams,
sink: Sink<ExecutionResult<Data, Extensions>>,
): () => void;
/**
* Subscribes and iterates over emitted results from an SSE connection
* through the returned async iterator.
*/
iterate<Data = Record<string, unknown>, Extensions = unknown>(
request: RequestParams,
): AsyncIterableIterator<ExecutionResult<Data, Extensions>>;
/**
* Dispose of the client, destroy connections and clean up resources.
*/
Expand Down Expand Up @@ -636,6 +643,73 @@ export function createClient<SingleConnection extends boolean = false>(

return () => control.abort();
},
iterate(request) {
const pending: ExecutionResult<
// TODO: how to not use `any` and not have a redundant function signature?
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any
>[] = [];
const deferred = {
done: false,
error: null as unknown,
resolve: () => {
// noop
},
};
const dispose = this.subscribe(request, {
next(val) {
pending.push(val);
deferred.resolve();
},
error(err) {
deferred.done = true;
deferred.error = err;
deferred.resolve();
},
complete() {
deferred.done = true;
deferred.resolve();
},
});

const iterator = (async function* iterator() {
for (;;) {
if (!pending.length) {
// only wait if there are no pending messages available
await new Promise<void>((resolve) => (deferred.resolve = resolve));
}
// first flush
while (pending.length) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
yield pending.shift()!;
}
// then error
if (deferred.error) {
throw deferred.error;
}
// or complete
if (deferred.done) {
return;
}
}
})();
iterator.throw = async (err) => {
if (!deferred.done) {
deferred.done = true;
deferred.error = err;
deferred.resolve();
}
return { done: true, value: undefined };
};
iterator.return = async () => {
dispose();
return { done: true, value: undefined };
};

return iterator;
},
dispose() {
client.dispose();
},
Expand Down
Loading

0 comments on commit fb8bf11

Please sign in to comment.