diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 8af200314b8..addc29e5afe 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -53,16 +53,6 @@ interface PubSubListeners { type PubSubListenersMap = Map; -interface PubSubState { - subscribing: number; - subscribed: number; - unsubscribing: number; - listeners: { - channels: PubSubListenersMap; - patterns: PubSubListenersMap; - }; -} - export default class RedisCommandsQueue { static #flushQueue(queue: LinkedList, err: Error): void { while (queue.length) { @@ -98,7 +88,16 @@ export default class RedisCommandsQueue { readonly #waitingForReply = new LinkedList(); - #pubSubState: PubSubState | undefined; + readonly #pubSubState = { + isActive: false, + subscribing: 0, + subscribed: 0, + unsubscribing: 0, + listeners: { + channels: new Map(), + patterns: new Map() + } + }; static readonly #PUB_SUB_MESSAGES = { message: Buffer.from('message'), @@ -111,7 +110,7 @@ export default class RedisCommandsQueue { readonly #parser = new RedisParser({ returnReply: (reply: unknown) => { - if (this.#pubSubState && Array.isArray(reply)) { + if (this.#pubSubState.isActive && Array.isArray(reply)) { if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { return RedisCommandsQueue.#emitPubSubMessage( this.#pubSubState.listeners.channels, @@ -150,7 +149,7 @@ export default class RedisCommandsQueue { } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions): Promise { - if (this.#pubSubState && !options?.ignorePubSubMode) { + if (this.#pubSubState.isActive && !options?.ignorePubSubMode) { return Promise.reject(new Error('Cannot send commands in PubSub mode')); } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); @@ -190,27 +189,16 @@ export default class RedisCommandsQueue { }); } - #initiatePubSubState(): PubSubState { - return this.#pubSubState ??= { - subscribed: 0, - subscribing: 0, - unsubscribing: 0, - listeners: { - channels: new Map(), - patterns: new Map() - } - }; - } - subscribe( command: PubSubSubscribeCommands, channels: RedisCommandArgument | Array, listener: PubSubListener, returnBuffers?: T ): Promise { - const pubSubState = this.#initiatePubSubState(), - channelsToSubscribe: Array = [], - listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns; + const channelsToSubscribe: Array = [], + listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? + this.#pubSubState.listeners.channels : + this.#pubSubState.listeners.patterns; for (const channel of (Array.isArray(channels) ? channels : [channels])) { const channelString = typeof channel === 'string' ? channel : channel.toString(); let listeners = listenersMap.get(channelString); @@ -230,6 +218,7 @@ export default class RedisCommandsQueue { if (!channelsToSubscribe.length) { return Promise.resolve(); } + return this.#pushPubSubCommand(command, channelsToSubscribe); } @@ -239,10 +228,6 @@ export default class RedisCommandsQueue { listener?: PubSubListener, returnBuffers?: T ): Promise { - if (!this.#pubSubState) { - return Promise.resolve(); - } - const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? this.#pubSubState.listeners.channels : this.#pubSubState.listeners.patterns; @@ -280,8 +265,7 @@ export default class RedisCommandsQueue { #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { return new Promise((resolve, reject) => { - const pubSubState = this.#initiatePubSubState(), - isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, + const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', commandArgs: Array = [command]; @@ -293,38 +277,42 @@ export default class RedisCommandsQueue { channelsCounter = channels.length; } - pubSubState[inProgressKey] += channelsCounter; + this.#pubSubState.isActive = true; + this.#pubSubState[inProgressKey] += channelsCounter; this.#waitingToBeSent.push({ args: commandArgs, channelsCounter, returnBuffers: true, resolve: () => { - pubSubState[inProgressKey] -= channelsCounter; - if (isSubscribe) { - pubSubState.subscribed += channelsCounter; - } else { - pubSubState.subscribed -= channelsCounter; - if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) { - this.#pubSubState = undefined; - } - } + this.#pubSubState[inProgressKey] -= channelsCounter; + this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); + this.#updatePubSubActiveState(); resolve(); }, reject: err => { - pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); + this.#pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); + this.#updatePubSubActiveState(); reject(err); } }); }); } - resubscribe(): Promise | undefined { - if (!this.#pubSubState) { - return; + #updatePubSubActiveState(): void { + if ( + !this.#pubSubState.subscribed && + !this.#pubSubState.subscribing && + !this.#pubSubState.subscribed + ) { + this.#pubSubState.isActive = false; } + } + resubscribe(): Promise | undefined { this.#pubSubState.subscribed = 0; + this.#pubSubState.subscribing = 0; + this.#pubSubState.unsubscribing = 0; const promises = [], { channels, patterns } = this.#pubSubState.listeners; @@ -369,8 +357,7 @@ export default class RedisCommandsQueue { #setReturnBuffers() { this.#parser.setReturnBuffers( !!this.#waitingForReply.head?.value.returnBuffers || - !!this.#pubSubState?.subscribed || - !!this.#pubSubState?.subscribing + !!this.#pubSubState.isActive ); } @@ -390,6 +377,7 @@ export default class RedisCommandsQueue { } flushWaitingForReply(err: Error): void { + this.#parser.reset(); RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); if (!this.#chainInExecution) return; diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index c48505c7586..09b974c910b 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -3,7 +3,7 @@ import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisClient, { RedisClientType } from '.'; import { RedisClientMultiCommandType } from './multi-command'; import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands'; -import { AbortError, AuthError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; +import { AbortError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy } from 'sinon'; import { once } from 'events'; @@ -87,30 +87,6 @@ describe('Client', () => { ); }, GLOBAL.SERVERS.PASSWORD); - testUtils.testWithClient('should not retry connecting if failed due to wrong auth', async client => { - let message; - if (testUtils.isVersionGreaterThan([6, 2])) { - message = 'WRONGPASS invalid username-password pair or user is disabled.'; - } else if (testUtils.isVersionGreaterThan([6])) { - message = 'WRONGPASS invalid username-password pair'; - } else { - message = 'ERR invalid password'; - } - - await assert.rejects( - client.connect(), - new AuthError(message) - ); - - assert.equal(client.isOpen, false); - }, { - ...GLOBAL.SERVERS.PASSWORD, - clientOptions: { - password: 'wrongpassword' - }, - disableClientSetup: true - }); - testUtils.testWithClient('should execute AUTH before SELECT', async client => { assert.equal( (await client.clientInfo()).db, @@ -300,7 +276,8 @@ describe('Client', () => { await client.multi() .sAdd('a', ['b', 'c']) .v4.exec(), - [2]) + [2] + ); }, { ...GLOBAL.SERVERS.OPEN, clientOptions: { @@ -681,10 +658,6 @@ describe('Client', () => { const listener = spy(); await subscriber.subscribe('channel', listener); - subscriber.on('error', err => { - console.error('subscriber err', err.message); - }); - await Promise.all([ once(subscriber, 'error'), publisher.clientKill({ diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 1dd74fa1afe..25535e0728e 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -11,7 +11,7 @@ import { ScanCommandOptions } from '../commands/SCAN'; import { HScanTuple } from '../commands/HSCAN'; import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander'; import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; -import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors'; +import { ClientClosedError, DisconnectsClientError } from '../errors'; import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; @@ -254,9 +254,7 @@ export default class RedisClient password: this.#options.password ?? '' }), { asap: true } - ).catch(err => { - throw new AuthError(err.message); - }) + ) ); } diff --git a/packages/client/lib/client/socket.spec.ts b/packages/client/lib/client/socket.spec.ts index 4c5cfd1d9b3..54f84eb9fe0 100644 --- a/packages/client/lib/client/socket.spec.ts +++ b/packages/client/lib/client/socket.spec.ts @@ -21,10 +21,13 @@ describe('Socket', () => { return time; }); - const socket = new RedisSocket(undefined, { - host: 'error', - reconnectStrategy - }); + const socket = new RedisSocket( + () => Promise.resolve(), + { + host: 'error', + reconnectStrategy + } + ); socket.on('error', () => { // ignore errors diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 0366b2b86e1..b04950a0724 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -3,7 +3,7 @@ import * as net from 'net'; import * as tls from 'tls'; import { encodeCommand } from '../commander'; import { RedisCommandArguments } from '../commands'; -import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError, ReconnectStrategyError } from '../errors'; +import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError } from '../errors'; import { promiseTimeout } from '../utils'; export interface RedisSocketCommonOptions { @@ -53,7 +53,7 @@ export default class RedisSocket extends EventEmitter { return (options as RedisTlsSocketOptions).tls === true; } - readonly #initiator?: RedisSocketInitiator; + readonly #initiator: RedisSocketInitiator; readonly #options: RedisSocketOptions; @@ -79,7 +79,7 @@ export default class RedisSocket extends EventEmitter { return this.#writableNeedDrain; } - constructor(initiator?: RedisSocketInitiator, options?: RedisSocketOptions) { + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); this.#initiator = initiator; @@ -91,70 +91,40 @@ export default class RedisSocket extends EventEmitter { throw new Error('Socket already opened'); } - return this.#connect(); + return this.#connect(0); } - async #connect(hadError?: boolean): Promise { + async #connect(retries: number, hadError?: boolean): Promise { + if (retries > 0 || hadError) { + this.emit('reconnecting'); + } + try { this.#isOpen = true; - this.#socket = await this.#retryConnection(0, hadError); + this.#socket = await this.#createSocket(); this.#writableNeedDrain = false; - } catch (err) { - this.#isOpen = false; - this.emit('error', err); - this.emit('end'); - throw err; - } + this.emit('connect'); - if (!this.#isOpen) { - this.disconnect(); - return; - } - - this.emit('connect'); - - if (this.#initiator) { try { await this.#initiator(); } catch (err) { this.#socket.destroy(); this.#socket = undefined; - - if (err instanceof AuthError) { - this.#isOpen = false; - } - throw err; } - - if (!this.#isOpen) return; - } - - this.#isReady = true; - - this.emit('ready'); - } - - async #retryConnection(retries: number, hadError?: boolean): Promise { - if (retries > 0 || hadError) { - this.emit('reconnecting'); - } - - try { - return await this.#createSocket(); + this.#isReady = true; + this.emit('ready'); } catch (err) { - if (!this.#isOpen) { - throw err; - } + this.emit('error', err); const retryIn = (this.#options?.reconnectStrategy ?? RedisSocket.#defaultReconnectStrategy)(retries); if (retryIn instanceof Error) { + this.#isOpen = false; throw new ReconnectStrategyError(retryIn, err); } - this.emit('error', err); await promiseTimeout(retryIn); - return this.#retryConnection(retries + 1); + return this.#connect(retries + 1); } } @@ -212,7 +182,7 @@ export default class RedisSocket extends EventEmitter { this.#isReady = false; this.emit('error', err); - this.#connect(true).catch(() => { + this.#connect(0, true).catch(() => { // the error was already emitted, silently ignore it }); } diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index e43dbc81422..01dff992290 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -34,12 +34,6 @@ export class SocketClosedUnexpectedlyError extends Error { } } -export class AuthError extends Error { - constructor(message: string) { - super(message); - } -} - export class RootNodesUnavailableError extends Error { constructor() { super('All the root nodes are unavailable');