Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): Respect retry attempts when server goes away after connecting in single connection mode #59

Merged
merged 4 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 76 additions & 34 deletions src/__tests__/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,40 +155,6 @@ it('should report error to sink if server goes away', async () => {
);
});

it('should respect retry attempts when server goes away after connecting', async () => {
const { fetch, waitForRequest, dispose } = createTFetch();

const client = createClient({
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 2,
retry: () => Promise.resolve(),
});

const sub = tsubscribe(client, {
query: `subscription { ping(key: "${Math.random()}") }`,
});

// start
await waitForRequest();
await dispose();

// 1st retry
await waitForRequest();
await dispose();

// 2nd retry
await waitForRequest();
await dispose();

// no more retries
await expect(
Promise.race([waitForRequest(), sub.waitForError()]),
).resolves.toMatchInlineSnapshot(
`[NetworkError: Connection closed while having active streams]`,
);
});

it('should report error to sink if server goes away during generator emission', async () => {
const { fetch, dispose } = createTFetch();

Expand Down Expand Up @@ -384,6 +350,47 @@ describe('single connection mode', () => {
expect(stream.signal.aborted).toBeTruthy();
});
});

it('should respect retry attempts when server goes away after connecting', async () => {
const { fetch, waitForRequest, dispose } = createTFetch();

const client = createClient({
singleConnection: true,
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 2,
retry: () => Promise.resolve(),
});

const sub = tsubscribe(client, {
query: `subscription { ping(key: "${Math.random()}") }`,
});

// start
await waitForRequest(); // reservation
await waitForRequest(); // connect
await waitForRequest(); // operation
await dispose();

// 1st retry
await waitForRequest(); // reservation
await waitForRequest(); // connect
await waitForRequest(); // operation
await dispose();

// 2nd retry
await waitForRequest(); // reservation
await waitForRequest(); // connect
await waitForRequest(); // operation
await dispose();

// no more retries
await expect(
Promise.race([waitForRequest(), sub.waitForError()]),
).resolves.toMatchInlineSnapshot(
`[NetworkError: Connection closed while having active streams]`,
);
});
});

describe('distinct connections mode', () => {
Expand Down Expand Up @@ -468,6 +475,41 @@ describe('distinct connections mode', () => {
expect(stream1.signal.aborted).toBeTruthy();
expect(stream2.signal.aborted).toBeTruthy();
});

it('should respect retry attempts when server goes away after connecting', async () => {
const { fetch, waitForRequest, dispose } = createTFetch();

const client = createClient({
singleConnection: false,
fetchFn: fetch,
url: 'http://localhost',
retryAttempts: 2,
retry: () => Promise.resolve(),
});

const sub = tsubscribe(client, {
query: `subscription { ping(key: "${Math.random()}") }`,
});

// start
await waitForRequest();
await dispose();

// 1st retry
await waitForRequest();
await dispose();

// 2nd retry
await waitForRequest();
await dispose();

// no more retries
await expect(
Promise.race([waitForRequest(), sub.waitForError()]),
).resolves.toMatchInlineSnapshot(
`[NetworkError: Connection closed while having active streams]`,
);
});
});

describe('retries', () => {
Expand Down
10 changes: 8 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,6 @@ export function createClient<SingleConnection extends boolean = false>(
fetchFn,
onMessage,
});
retryingErr = null; // future connects are not retries
retries = 0; // reset the retries on connect

connected.waitForThrow().catch(() => (conn = undefined));

Expand Down Expand Up @@ -577,6 +575,14 @@ export function createClient<SingleConnection extends boolean = false>(
signal: control.signal,
operationId,
})) {
// only after receiving results are future connects not considered retries.
// this is because a client might successfully connect, but the server
// ends up terminating the connection afterwards before streaming anything.
// of course, if the client completes the subscription, this loop will
// break and therefore stop the stream (it wont reconnect)
retryingErr = null;
retries = 0;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
sink.next(result as any);
}
Expand Down