Skip to content

Commit

Permalink
fix(client): Time retries and socket change waits
Browse files Browse the repository at this point in the history
Closes: enisdenjo#85
  • Loading branch information
enisdenjo committed Dec 11, 2020
1 parent bdd8a34 commit 7c707db
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,17 @@ export function createClient(options: ClientOptions): Client {
cancellerRef: CancellerRef,
callDepth = 0,
): ConnectReturn {
// prevents too many recursive calls when reavaluating/re-connecting
if (callDepth > 10) {
throw new Error('Kept trying to connect but the socket never settled.');
if (callDepth) {
// prevents too many recursive calls when reavaluating/re-connecting
if (callDepth > 10) {
throw new Error('Kept trying to connect but the socket never settled.');
}
// wait a bit for socket state changes in recursive calls
await new Promise((resolve) => setTimeout(resolve, callDepth * 50));
}

// retry wait strategy only on root caller
if (state.retrying && callDepth === 0) {
// retry wait strategy for all callers
if (state.retrying) {
if (retryWaiting.length) {
// if others are waiting for retry, I'll wait too
await new Promise<void>((resolve) => retryWaiting.push(resolve));
Expand All @@ -283,27 +287,24 @@ export function createClient(options: ClientOptions): Client {
});
// use retry wait strategy
await retryWait(state.retries);
state = {
...state,
retrying: false, // avoid reading to waiting queue
retries: state.retries + 1, // is about to create a new WebSocket
};
// complete all waiting and clear the queue
while (retryWaiting.length) {
retryWaiting.pop()?.();
}
}
}

// if recursive call, wait a bit for socket change
if (callDepth) {
await new Promise((resolve) => setTimeout(resolve, callDepth * 50));
}

// socket already exists. can be ready or pending, check and behave accordingly
if (state.socket) {
switch (state.socket.readyState) {
case WebSocketImpl.OPEN: {
// if the socket is not acknowledged, wait a bit and reavaluate
if (!state.acknowledged) {
return connect(cancellerRef, callDepth + 1);
}

if (!state.acknowledged) return connect(cancellerRef, callDepth + 1);
return makeConnectReturn(state.socket, cancellerRef);
}
case WebSocketImpl.CONNECTING: {
Expand All @@ -327,7 +328,6 @@ export function createClient(options: ClientOptions): Client {
...state,
acknowledged: false,
socket,
retries: state.retries + (state.retrying ? 1 : 0),
};
emitter.emit('connecting');

Expand Down
49 changes: 49 additions & 0 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,55 @@ describe('reconnecting', () => {
EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset
});

it('should resubscribe all subscribers on silent reconnect when using retry wait delay', async () => {
const defaultMaxListeners = EventEmitter.defaultMaxListeners;
EventEmitter.defaultMaxListeners = 50; // for test

const { url, ...server } = await startTServer();

const client = createClient({
url,
retryAttempts: 3,
retryWait: () => new Promise((resolve) => setTimeout(resolve, 500)),
});

// add subscribers
const subs: TSubscribe<unknown>[] = [];
for (let i = 0; i < EventEmitter.defaultMaxListeners - 1; i++) {
subs.push(
tsubscribe(client, {
query: `subscription Sub${i} { ping(key: "${i}") }`,
}),
);
await server.waitForOperation();
}

// connected
await server.waitForClient((client) => {
client.close();
});

// reconnected
await server.waitForClient((client) => {
client.close();
});
// once more
await server.waitForClient((client) => {
client.close();
});

await server.waitForClient();

// wait for all active subscribers to reconnect
for (const _ of subs) {
await server.waitForOperation();
}

client.dispose();

EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset
});

it('should report some close events immediately and not reconnect', async () => {
const { url, ...server } = await startTServer();

Expand Down

0 comments on commit 7c707db

Please sign in to comment.