From 319ab87e2e7985644177d7fa6ff2625d24fb6403 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 14 Mar 2024 10:16:26 +0200 Subject: [PATCH 1/2] support handling invalidation push messages --- packages/client/lib/client/commands-queue.ts | 26 ++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index a4029779fc..9a2d6e1de3 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,7 +1,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types'; +import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: RedisArgument | null) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -109,13 +111,33 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - + switch (push[0].toString()) { + case "invalidate": { + console.log("invalidate push message"); + if (this.#invalidateCallback) { + if (push[1] !== null) { + for (const key of push[1]) { + console.log(`invalidating key ${key}`); + this.#invalidateCallback(key); + } + } else { + console.log(`invalidating all keys`); + this.#invalidateCallback(null); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: CommandArguments, options?: CommandOptions From 8d827d7b4f8a5c480d9cf0137a8c12def42f72d9 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Wed, 20 Mar 2024 10:47:12 +0200 Subject: [PATCH 2/2] update to be more generic - would argue that existing push message handlers should be handled in this manner, with the registration function not allowing one to overwrite "built in" handlers. --- packages/client/lib/client/commands-queue.ts | 34 ++++++++------------ 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 9a2d6e1de3..842725e427 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,7 +1,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; +import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types'; import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -56,7 +56,7 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } - #invalidateCallback?: (key: RedisArgument | null) => unknown; + #pushHandlers: Map unknown>; constructor( respVersion: RespVersions, @@ -67,6 +67,7 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; this.decoder = this.#initiateDecoder(); + this.#pushHandlers = new Map unknown>(); } #onReply(reply: ReplyUnion) { @@ -111,31 +112,24 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - switch (push[0].toString()) { - case "invalidate": { - console.log("invalidate push message"); - if (this.#invalidateCallback) { - if (push[1] !== null) { - for (const key of push[1]) { - console.log(`invalidating key ${key}`); - this.#invalidateCallback(key); - } - } else { - console.log(`invalidating all keys`); - this.#invalidateCallback(null); - } - } - break; - } + const handler = this.#pushHandlers.get(push[0].toString()); + if (handler === undefined) { + return; } + handler(push) } }, getTypeMapping: () => this.#getTypeMapping() }); } - setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { - this.#invalidateCallback = callback; + setPushCallback(type: string, callback?: (push: any[]) => unknown) { + if (callback === undefined) { + this.#pushHandlers.delete(type); + return; + } + + this.#pushHandlers.set(type, callback); } addCommand(