Skip to content

Commit 96d6445

Browse files
authored
fix(ssubscribe): properly resubscribe in case of shard failover (redis#3098)
* fix(ssubscribe): properly resubscribe in case of shard failover 1) when RE failover happens, there is a disconnect 2) affected Client reconnects and tries to resubscribe all existing listeners ISSUE #1: CROSSSLOT Error - client was doing ssubscribe ch1 ch2.. chN which, after the failover could result in CROSSSLOT ( naturally, becasuse now some slots could be owned by other shards ) FIX: send one ssubscribe command per channel instead of one ssubscribe for all channels ISSUE #2: MOVED Error - some/all of the channels might be moved somewhere else FIX: 1: Propagate the error to the Cluster. 2: Cluster rediscovers topology. 3: Cluster resubscribes all listeners of the failed client ( possibly some/all of those will end up in a different client after the rediscovery ) fixes: redis#2902
1 parent bd11e38 commit 96d6445

File tree

9 files changed

+824
-21
lines changed

9 files changed

+824
-21
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,10 @@ export default class RedisCommandsQueue {
338338
return this.#addPubSubCommand(command);
339339
}
340340

341+
removeAllPubSubListeners() {
342+
return this.#pubSub.removeAllListeners();
343+
}
344+
341345
resubscribe(chainId?: symbol) {
342346
const commands = this.#pubSub.resubscribe();
343347
if (!commands.length) return;

packages/client/lib/client/index.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -765,17 +765,17 @@ export default class RedisClient<
765765
}
766766
});
767767
}
768-
768+
769769
if (this.#clientSideCache) {
770770
commands.push({cmd: this.#clientSideCache.trackingOn()});
771771
}
772772

773773
if (this.#options?.emitInvalidate) {
774774
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
775775
}
776-
776+
777777
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options);
778-
778+
779779
if(maintenanceHandshakeCmd) {
780780
commands.push(maintenanceHandshakeCmd);
781781
};
@@ -818,6 +818,11 @@ export default class RedisClient<
818818
chainId = Symbol('Socket Initiator');
819819

820820
const resubscribePromise = this.#queue.resubscribe(chainId);
821+
resubscribePromise?.catch(error => {
822+
if (error.message && error.message.startsWith('MOVED')) {
823+
this.emit('__MOVED', this._self.#queue.removeAllPubSubListeners());
824+
}
825+
});
821826
if (resubscribePromise) {
822827
promises.push(resubscribePromise);
823828
}

packages/client/lib/client/pub-sub.ts

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,25 +323,50 @@ export class PubSub {
323323
}
324324

325325
resubscribe() {
326-
const commands = [];
326+
const commands: PubSubCommand[] = [];
327327
for (const [type, listeners] of Object.entries(this.listeners)) {
328328
if (!listeners.size) continue;
329329

330330
this.#isActive = true;
331+
332+
if(type === PUBSUB_TYPE.SHARDED) {
333+
this.#shardedResubscribe(commands, listeners);
334+
} else {
335+
this.#normalResubscribe(commands, type, listeners);
336+
}
337+
}
338+
339+
return commands;
340+
}
341+
342+
#normalResubscribe(commands: PubSubCommand[], type: string, listeners: PubSubTypeListeners) {
343+
this.#subscribing++;
344+
const callback = () => this.#subscribing--;
345+
commands.push({
346+
args: [
347+
COMMANDS[type as PubSubType].subscribe,
348+
...listeners.keys()
349+
],
350+
channelsCounter: listeners.size,
351+
resolve: callback,
352+
reject: callback
353+
});
354+
}
355+
356+
#shardedResubscribe(commands: PubSubCommand[], listeners: PubSubTypeListeners) {
357+
const callback = () => this.#subscribing--;
358+
for(const channel of listeners.keys()) {
331359
this.#subscribing++;
332-
const callback = () => this.#subscribing--;
333360
commands.push({
334361
args: [
335-
COMMANDS[type as PubSubType].subscribe,
336-
...listeners.keys()
362+
COMMANDS[PUBSUB_TYPE.SHARDED].subscribe,
363+
channel
337364
],
338-
channelsCounter: listeners.size,
365+
channelsCounter: 1,
339366
resolve: callback,
340367
reject: callback
341-
} satisfies PubSubCommand);
368+
})
342369
}
343-
344-
return commands;
345370
}
346371

347372
handleMessageReply(reply: Array<Buffer>): boolean {
@@ -379,6 +404,22 @@ export class PubSub {
379404
return listeners;
380405
}
381406

407+
removeAllListeners() {
408+
const result = {
409+
[PUBSUB_TYPE.CHANNELS]: this.listeners[PUBSUB_TYPE.CHANNELS],
410+
[PUBSUB_TYPE.PATTERNS]: this.listeners[PUBSUB_TYPE.PATTERNS],
411+
[PUBSUB_TYPE.SHARDED]: this.listeners[PUBSUB_TYPE.SHARDED]
412+
}
413+
414+
this.#updateIsActive();
415+
416+
this.listeners[PUBSUB_TYPE.CHANNELS] = new Map();
417+
this.listeners[PUBSUB_TYPE.PATTERNS] = new Map();
418+
this.listeners[PUBSUB_TYPE.SHARDED] = new Map();
419+
420+
return result;
421+
}
422+
382423
#emitPubSubMessage(
383424
type: PubSubType,
384425
message: Buffer,

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { RedisClusterClientOptions, RedisClusterOptions } from '.';
22
import { RootNodesUnavailableError } from '../errors';
33
import RedisClient, { RedisClientOptions, RedisClientType } from '../client';
44
import { EventEmitter } from 'node:stream';
5-
import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pub-sub';
5+
import { ChannelListeners, PUBSUB_TYPE, PubSubListeners, PubSubTypeListeners } from '../client/pub-sub';
66
import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types';
77
import calculateSlot from 'cluster-key-slot';
88
import { RedisSocketOptions } from '../client/socket';
@@ -185,6 +185,7 @@ export default class RedisClusterSlots<
185185
async #discover(rootNode: RedisClusterClientOptions) {
186186
this.clientSideCache?.clear();
187187
this.clientSideCache?.disable();
188+
188189
try {
189190
const addressesInUse = new Set<string>(),
190191
promises: Array<Promise<unknown>> = [],
@@ -224,6 +225,7 @@ export default class RedisClusterSlots<
224225
}
225226
}
226227

228+
//Keep only the nodes that are still in use
227229
for (const [address, node] of this.nodeByAddress.entries()) {
228230
if (addressesInUse.has(address)) continue;
229231

@@ -337,23 +339,29 @@ export default class RedisClusterSlots<
337339
const socket =
338340
this.#getNodeAddress(node.address) ??
339341
{ host: node.host, port: node.port, };
340-
const client = Object.freeze({
342+
const clientInfo = Object.freeze({
341343
host: socket.host,
342344
port: socket.port,
343345
});
344346
const emit = this.#emit;
345-
return this.#clientFactory(
347+
const client = this.#clientFactory(
346348
this.#clientOptionsDefaults({
347349
clientSideCache: this.clientSideCache,
348350
RESP: this.#options.RESP,
349351
socket,
350352
readonly,
351353
}))
352-
.on('error', error => emit('node-error', error, client))
353-
.on('reconnecting', () => emit('node-reconnecting', client))
354-
.once('ready', () => emit('node-ready', client))
355-
.once('connect', () => emit('node-connect', client))
356-
.once('end', () => emit('node-disconnect', client));
354+
.on('error', error => emit('node-error', error, clientInfo))
355+
.on('reconnecting', () => emit('node-reconnecting', clientInfo))
356+
.once('ready', () => emit('node-ready', clientInfo))
357+
.once('connect', () => emit('node-connect', clientInfo))
358+
.once('end', () => emit('node-disconnect', clientInfo))
359+
.on('__MOVED', async (allPubSubListeners: PubSubListeners) => {
360+
await this.rediscover(client);
361+
this.#emit('__resubscribeAllPubSubListeners', allPubSubListeners);
362+
});
363+
364+
return client;
357365
}
358366

359367
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
@@ -374,7 +382,9 @@ export default class RedisClusterSlots<
374382

375383
async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
376384
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
377-
.finally(() => this.#runningRediscoverPromise = undefined);
385+
.finally(() => {
386+
this.#runningRediscoverPromise = undefined
387+
});
378388
return this.#runningRediscoverPromise;
379389
}
380390

packages/client/lib/cluster/index.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { EventEmitter } from 'node:events';
66
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
77
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
88
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
9-
import { PubSubListener } from '../client/pub-sub';
9+
import { PubSubListener, PubSubListeners } from '../client/pub-sub';
1010
import { ErrorReply } from '../errors';
1111
import { RedisTcpSocketOptions } from '../client/socket';
1212
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
@@ -310,6 +310,7 @@ export default class RedisCluster<
310310

311311
this._options = options;
312312
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
313+
this.on('__resubscribeAllPubSubListeners', this.resubscribeAllPubSubListeners.bind(this));
313314

314315
if (options?.commandOptions) {
315316
this._commandOptions = options.commandOptions;
@@ -584,6 +585,33 @@ export default class RedisCluster<
584585
);
585586
}
586587

588+
resubscribeAllPubSubListeners(allListeners: PubSubListeners) {
589+
for(const [channel, listeners] of allListeners.CHANNELS) {
590+
listeners.buffers.forEach(bufListener => {
591+
this.subscribe(channel, bufListener, true);
592+
});
593+
listeners.strings.forEach(strListener => {
594+
this.subscribe(channel, strListener);
595+
});
596+
};
597+
for (const [channel, listeners] of allListeners.PATTERNS) {
598+
listeners.buffers.forEach(bufListener => {
599+
this.pSubscribe(channel, bufListener, true);
600+
});
601+
listeners.strings.forEach(strListener => {
602+
this.pSubscribe(channel, strListener);
603+
});
604+
};
605+
for (const [channel, listeners] of allListeners.SHARDED) {
606+
listeners.buffers.forEach(bufListener => {
607+
this.sSubscribe(channel, bufListener, true);
608+
});
609+
listeners.strings.forEach(strListener => {
610+
this.sSubscribe(channel, strListener);
611+
});
612+
};
613+
}
614+
587615
sUnsubscribe = this.SUNSUBSCRIBE;
588616

589617
/**

0 commit comments

Comments
 (0)