diff --git a/src/client.ts b/src/client.ts index 3b2b54f7..9ea82372 100644 --- a/src/client.ts +++ b/src/client.ts @@ -267,13 +267,17 @@ export function createClient(options: ClientOptions): Client { cancellerRef: CancellerRef, callDepth = 0, ): ConnectReturn { - // prevents too many recursive calls when reavaluating/re-connecting - if (callDepth > 10) { - throw new Error('Kept trying to connect but the socket never settled.'); + if (callDepth) { + // prevents too many recursive calls when reavaluating/re-connecting + if (callDepth > 10) { + throw new Error('Kept trying to connect but the socket never settled.'); + } + // wait a bit for socket state changes in recursive calls + await new Promise((resolve) => setTimeout(resolve, callDepth * 50)); } - // retry wait strategy only on root caller - if (state.retrying && callDepth === 0) { + // retry wait strategy for all callers + if (state.retrying) { if (retryWaiting.length) { // if others are waiting for retry, I'll wait too await new Promise((resolve) => retryWaiting.push(resolve)); @@ -283,6 +287,11 @@ export function createClient(options: ClientOptions): Client { }); // use retry wait strategy await retryWait(state.retries); + state = { + ...state, + retrying: false, // avoid reading to waiting queue + retries: state.retries + 1, // is about to create a new WebSocket + }; // complete all waiting and clear the queue while (retryWaiting.length) { retryWaiting.pop()?.(); @@ -290,20 +299,12 @@ export function createClient(options: ClientOptions): Client { } } - // if recursive call, wait a bit for socket change - if (callDepth) { - await new Promise((resolve) => setTimeout(resolve, callDepth * 50)); - } - // socket already exists. can be ready or pending, check and behave accordingly if (state.socket) { switch (state.socket.readyState) { case WebSocketImpl.OPEN: { // if the socket is not acknowledged, wait a bit and reavaluate - if (!state.acknowledged) { - return connect(cancellerRef, callDepth + 1); - } - + if (!state.acknowledged) return connect(cancellerRef, callDepth + 1); return makeConnectReturn(state.socket, cancellerRef); } case WebSocketImpl.CONNECTING: { @@ -327,7 +328,6 @@ export function createClient(options: ClientOptions): Client { ...state, acknowledged: false, socket, - retries: state.retries + (state.retrying ? 1 : 0), }; emitter.emit('connecting'); diff --git a/src/tests/client.ts b/src/tests/client.ts index 4757c9c6..4b596217 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -754,6 +754,55 @@ describe('reconnecting', () => { EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset }); + it('should resubscribe all subscribers on silent reconnect when using retry wait delay', async () => { + const defaultMaxListeners = EventEmitter.defaultMaxListeners; + EventEmitter.defaultMaxListeners = 50; // for test + + const { url, ...server } = await startTServer(); + + const client = createClient({ + url, + retryAttempts: 3, + retryWait: () => new Promise((resolve) => setTimeout(resolve, 500)), + }); + + // add subscribers + const subs: TSubscribe[] = []; + for (let i = 0; i < EventEmitter.defaultMaxListeners - 1; i++) { + subs.push( + tsubscribe(client, { + query: `subscription Sub${i} { ping(key: "${i}") }`, + }), + ); + await server.waitForOperation(); + } + + // connected + await server.waitForClient((client) => { + client.close(); + }); + + // reconnected + await server.waitForClient((client) => { + client.close(); + }); + // once more + await server.waitForClient((client) => { + client.close(); + }); + + await server.waitForClient(); + + // wait for all active subscribers to reconnect + for (const _ of subs) { + await server.waitForOperation(); + } + + client.dispose(); + + EventEmitter.defaultMaxListeners = defaultMaxListeners; // reset + }); + it('should report some close events immediately and not reconnect', async () => { const { url, ...server } = await startTServer();