Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - fix #2141 - cluster redirect if client closed #2701

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ export type OnShardedChannelMovedError = (
listeners?: ChannelListeners
) => void;

type RedisClusterSlotsState<
M extends RedisModules,
F extends RedisFunctions,
S extends RedisScripts
> = {
slots: Array<Shard<M, F, S>>;
shards: Array<Shard<M, F, S>>;
masters: Array<ShardNode<M, F, S>>;
replicas: Array<ShardNode<M, F, S>>;
randomNodeIterator?: IterableIterator<ShardNode<M, F, S>>;
}

export default class RedisClusterSlots<
M extends RedisModules,
F extends RedisFunctions,
Expand All @@ -100,13 +112,20 @@ export default class RedisClusterSlots<
readonly #options: RedisClusterOptions<M, F, S>;
readonly #Client: InstantiableRedisClient<M, F, S>;
readonly #emit: EventEmitter['emit'];
slots = new Array<Shard<M, F, S>>(RedisClusterSlots.#SLOTS);
shards = new Array<Shard<M, F, S>>();
masters = new Array<ShardNode<M, F, S>>();
replicas = new Array<ShardNode<M, F, S>>();

state: RedisClusterSlotsState<M, F, S> = {
slots: new Array(RedisClusterSlots.#SLOTS),
shards: [],
masters: [],
replicas: [],
randomNodeIterator: undefined
}

readonly nodeByAddress = new Map<string, MasterNode<M, F, S> | ShardNode<M, F, S>>();
pubSubNode?: PubSubNode<M, F, S>;

lastDiscovered = Date.now();

#isOpen = false;

get isOpen() {
Expand Down Expand Up @@ -150,15 +169,24 @@ export default class RedisClusterSlots<
}

#resetSlots() {
this.slots = new Array(RedisClusterSlots.#SLOTS);
this.shards = [];
this.masters = [];
this.replicas = [];
this.#randomNodeIterator = undefined;
this.state = {
slots: new Array(RedisClusterSlots.#SLOTS),
shards: [],
masters: [],
replicas: [],
randomNodeIterator: undefined
}
}

async #discover(rootNode?: RedisClusterClientOptions) {
this.#resetSlots();
this.lastDiscovered = Date.now();
const newState: RedisClusterSlotsState<M, F, S> = {
slots: new Array(RedisClusterSlots.#SLOTS),
shards: [],
masters: [],
replicas: [],
randomNodeIterator: undefined,
}
const addressesInUse = new Set<string>();

try {
Expand All @@ -167,19 +195,19 @@ export default class RedisClusterSlots<
eagerConnect = this.#options.minimizeConnections !== true;
for (const { from, to, master, replicas } of shards) {
const shard: Shard<M, F, S> = {
master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises)
master: this.#initiateSlotNode(newState, master, false, eagerConnect, addressesInUse, promises)
};

if (this.#options.useReplicas) {
shard.replicas = replicas.map(replica =>
this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises)
this.#initiateSlotNode(newState,replica, true, eagerConnect, addressesInUse, promises)
);
}

this.shards.push(shard);
newState.shards.push(shard);

for (let i = from; i <= to; i++) {
this.slots[i] = shard;
newState.slots[i] = shard;
}
}

Expand All @@ -197,7 +225,7 @@ export default class RedisClusterSlots<

if (channelsListeners.size || patternsListeners.size) {
promises.push(
this.#initiatePubSubClient({
this.#initiatePubSubClient(newState, {
[PubSubType.CHANNELS]: channelsListeners,
[PubSubType.PATTERNS]: patternsListeners
})
Expand Down Expand Up @@ -227,6 +255,7 @@ export default class RedisClusterSlots<

await Promise.all(promises);

this.state = newState;
return true;
} catch (err) {
this.#emit('error', err);
Expand Down Expand Up @@ -296,6 +325,7 @@ export default class RedisClusterSlots<
}

#initiateSlotNode(
state: RedisClusterSlotsState<M, F, S>,
{ id, ip, port }: ClusterSlotsNode,
readonly: boolean,
eagerConnent: boolean,
Expand Down Expand Up @@ -323,7 +353,7 @@ export default class RedisClusterSlots<
this.nodeByAddress.set(address, node);
}

(readonly ? this.replicas : this.masters).push(node);
(readonly ? state.replicas : state.masters).push(node);

return node;
}
Expand Down Expand Up @@ -392,7 +422,7 @@ export default class RedisClusterSlots<
this.#isOpen = false;

const promises = [];
for (const { master, replicas } of this.shards) {
for (const { master, replicas } of this.state.shards) {
if (master.client) {
promises.push(
this.#execOnNodeClient(master.client, fn)
Expand Down Expand Up @@ -446,45 +476,43 @@ export default class RedisClusterSlots<

const slotNumber = calculateSlot(firstKey);
if (!isReadonly) {
return this.nodeClient(this.slots[slotNumber].master);
return this.nodeClient(this.state.slots[slotNumber].master);
}

return this.nodeClient(this.getSlotRandomNode(slotNumber));
}

*#iterateAllNodes() {
let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length));
if (i < this.masters.length) {
let i = Math.floor(Math.random() * (this.state.masters.length + this.state.replicas.length));
if (i < this.state.masters.length) {
do {
yield this.masters[i];
} while (++i < this.masters.length);
yield this.state.masters[i];
} while (++i < this.state.masters.length);

for (const replica of this.replicas) {
for (const replica of this.state.replicas) {
yield replica;
}
} else {
i -= this.masters.length;
i -= this.state.masters.length;
do {
yield this.replicas[i];
} while (++i < this.replicas.length);
yield this.state.replicas[i];
} while (++i < this.state.replicas.length);
}

while (true) {
for (const master of this.masters) {
for (const master of this.state.masters) {
yield master;
}

for (const replica of this.replicas) {
for (const replica of this.state.replicas) {
yield replica;
}
}
}

#randomNodeIterator?: IterableIterator<ShardNode<M, F, S>>;

getRandomNode() {
this.#randomNodeIterator ??= this.#iterateAllNodes();
return this.#randomNodeIterator.next().value as ShardNode<M, F, S>;
this.state.randomNodeIterator ??= this.#iterateAllNodes();
return this.state.randomNodeIterator.next().value as ShardNode<M, F, S>;
}

*#slotNodesIterator(slot: ShardWithReplicas<M, F, S>) {
Expand All @@ -505,7 +533,7 @@ export default class RedisClusterSlots<
}

getSlotRandomNode(slotNumber: number) {
const slot = this.slots[slotNumber];
const slot = this.state.slots[slotNumber];
if (!slot.replicas?.length) {
return slot.master;
}
Expand All @@ -524,14 +552,14 @@ export default class RedisClusterSlots<
getPubSubClient() {
return this.pubSubNode ?
this.pubSubNode.client :
this.#initiatePubSubClient();
this.#initiatePubSubClient(this.state);
}

async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) {
const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)),
node = index < this.masters.length ?
this.masters[index] :
this.replicas[index - this.masters.length];
async #initiatePubSubClient(state: RedisClusterSlotsState<M,F,S>, toResubscribe?: PubSubToResubscribe) {
const index = Math.floor(Math.random() * (state.masters.length + state.replicas.length)),
node = index < state.masters.length ?
state.masters[index] :
state.replicas[index - state.masters.length];

this.pubSubNode = {
address: node.address,
Expand Down Expand Up @@ -569,7 +597,7 @@ export default class RedisClusterSlots<
}

getShardedPubSubClient(channel: string) {
const { master } = this.slots[calculateSlot(channel)];
const { master } = this.state.slots[calculateSlot(channel)];
return master.pubSubClient ?? this.#initiateShardedPubSubClient(master);
}

Expand Down Expand Up @@ -607,7 +635,7 @@ export default class RedisClusterSlots<
channel: string,
unsubscribe: (client: RedisClientType<M, F, S>) => Promise<void>
): Promise<void> {
const { master } = this.slots[calculateSlot(channel)];
const { master } = this.state.slots[calculateSlot(channel)];
if (!master.pubSubClient) return Promise.resolve();

const client = await master.pubSubClient;
Expand Down
31 changes: 26 additions & 5 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { EventEmitter } from 'events';
import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors';
import { ClientClosedError, ErrorReply, ConnectionTimeoutError } from '../errors';

export type RedisClusterClientOptions = Omit<
RedisClientOptions,
Expand Down Expand Up @@ -100,19 +100,19 @@ export default class RedisCluster<
readonly #slots: RedisClusterSlots<M, F, S>;

get slots() {
return this.#slots.slots;
return this.#slots.state.slots;
}

get shards() {
return this.#slots.shards;
return this.#slots.state.shards;
}

get masters() {
return this.#slots.masters;
return this.#slots.state.masters;
}

get replicas() {
return this.#slots.replicas;
return this.#slots.state.replicas;
}

get nodeByAddress() {
Expand Down Expand Up @@ -243,12 +243,33 @@ export default class RedisCluster<
isReadonly: boolean | undefined,
executor: (client: RedisClientType<M, F, S>) => Promise<Reply>
): Promise<Reply> {
if (this.#slots.lastDiscovered + 10000 < Date.now()) {
const discover = async () => {
let client = await this.#slots.nodeClient(this.#slots.getRandomNode());
this.#slots.rediscover(client).catch(() => { /* ignore */ })
}
discover(); // don't wait for it
}
const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16;
let client = await this.#slots.getClient(firstKey, isReadonly);
for (let i = 0;; i++) {
try {
return await executor(client);
} catch (err) {
if (err instanceof ClientClosedError) {
i-- // don't count this attempt

// attempt to reconnect closed client
client.connect().catch(() => { /* ignore */ });

// try again with a different client
let oldClient = client
while (client === oldClient) {
client = await this.#slots.getClient(undefined, isReadonly)
}
continue;
}

if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) {
throw err;
}
Expand Down