diff --git a/packages/client/index.ts b/packages/client/index.ts index 56cdf703ca..d4dceb357f 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -20,6 +20,9 @@ import RedisSentinel from './lib/sentinel'; export { RedisSentinelOptions, RedisSentinelType } from './lib/sentinel/types'; export const createSentinel = RedisSentinel.create; +import { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache'; +export { BasicClientSideCache, BasicPooledClientSideCache }; + // export { GeoReplyWith } from './lib/commands/generic-transformers'; // export { SetOptions } from './lib/commands/SET'; diff --git a/packages/client/lib/client/README-cache.md b/packages/client/lib/client/README-cache.md new file mode 100644 index 0000000000..01a43cf459 --- /dev/null +++ b/packages/client/lib/client/README-cache.md @@ -0,0 +1,64 @@ +# Client Side Caching Support + +Client Side Caching enables Redis Servers and Clients to work together to enable a client to cache results from command sent to a server and be informed by the server when the cached result is no longer valid. + +## Usage + +node-redis supports two ways of instantiating client side caching support + +Note: Client Side Caching is only supported with RESP3. + +### Anonymous Cache + +```javascript +const client = createClient({RESP: 3, clientSideCache: {ttl: 0, maxEntries: 0, lru: false}}) +``` + +In this instance, the cache is opaque to the user, and they have no control over it. + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicClientSideCache(ttl, maxEntries, lru); +const client = createClient({RESP: 3, clientSideCache: cache}); +``` + +In this instance, the user has full control over the cache, as they have access to the cache object. + +They can manually invalidate keys + +```javascript +cache.invalidate(key); +``` + +they can clear the entire cache +g +```javascript +cache.clear(); +``` + +as well as get cache metrics + +```typescript +const hits: number = cache.cacheHits(); +const misses: number = cache.cacheMisses(); +``` + +## Pooled Caching + +Similar to individual clients, node-redis also supports caching for its pooled client object, with the cache being able to be instantiated in an anonymous manner or a controllable manner. + +### Anonymous Cache + +```javascript +const client = createClientPool({RESP: 3}, {clientSideCache: {ttl: 0, maxEntries: 0, lru: false}, minimum: 8}); +``` + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicPooledClientSideCache(ttl, maxEntries, lru); +const client = createClientPool({RESP: 3}, {clientSideCache: cache, minimum: 8}); +``` \ No newline at end of file diff --git a/packages/client/lib/client/cache.spec.ts b/packages/client/lib/client/cache.spec.ts new file mode 100644 index 0000000000..258104ed4c --- /dev/null +++ b/packages/client/lib/client/cache.spec.ts @@ -0,0 +1,460 @@ +import assert from "assert"; +import testUtils, { GLOBAL } from "../test-utils" +import { BasicClientSideCache, BasicPooledClientSideCache } from "./cache" +import { REDIS_FLUSH_MODES } from "../commands/FLUSHALL"; +import { once } from 'events'; + +describe("Client Side Cache", () => { + describe('Basic Cache', () => { + const csc = new BasicClientSideCache({ maxEntries: 10 }); + + /* cacheNotEmpty */ + testUtils.testWithClient('Basic Cache Miss', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheUsedTest */ + testUtils.testWithClient('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 1, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Max Cache Entries', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 12, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('LRU works correctly', async client => { + csc.clear(); + + await client.set('1', 1); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('2'), null); + assert.equal(await client.get('3'), null); + assert.equal(await client.get('4'), null); + assert.equal(await client.get('5'), null); + assert.equal(await client.get('1'), '1'); + assert.equal(await client.get('6'), null); + assert.equal(await client.get('7'), null); + assert.equal(await client.get('8'), null); + assert.equal(await client.get('9'), null); + assert.equal(await client.get('10'), null); + assert.equal(await client.get('11'), null); + assert.equal(await client.get('1'), '1'); + + assert.equal(csc.cacheMisses(), 11, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.clear(); + await client.get("x"); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Null Invalidate acts as clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + csc.invalidate(null); + await client.get("x"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('flushdb causes an invalidate null', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.flushDb(REDIS_FLUSH_MODES.SYNC); + assert.equal(await client.get("x"), null); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + testUtils.testWithClient('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1', 'first get'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2', 'second get'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3', 'third get'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* immutableCacheEntriesTest */ + testUtils.testWithClient("Cached Replies Don't Mutate", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + ret1[0] = '4'; + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + ret2[0] = '8'; + const ret3 = await client.mGet(['x', 'y']); + assert.deepEqual(ret3, ['1', '2'], 'third mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* clearOnDisconnectTest */ + testUtils.testWithClient("Cached cleared on disconnect", async client => { + csc.clear(); + + await client.set("x", 1); + await client.set('y', 2); + const ret1 = await client.mGet(['x', 'y']); + assert.deepEqual(ret1, ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "first Cache Misses"); + assert.equal(csc.cacheHits(), 0, "first Cache Hits"); + + await client.close(); + + await client.connect(); + + const ret2 = await client.mGet(['x', 'y']); + assert.deepEqual(ret2, ['1', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 1, "second Cache Misses"); + assert.equal(csc.cacheHits(), 0, "second Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + }); + + describe("Pooled Cache", () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithClient('Virtual Pool Disconnect', async client1 => { + const client2 = client1.duplicate(); + await client2.connect() + + assert.equal(await client2.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(1, csc.cacheHits(), "Cache Hits"); + + await client2.close(); + + assert.equal(await client1.get("x"), null); + assert.equal(await client1.get("x"), null); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(2, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + clientSideCache: csc + } + }); + + /* cacheNotEmpty */ + testUtils.testWithClientPool('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + + assert.equal(1, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* cacheUsedTest */ + testUtils.testWithClientPool('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 2, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 1 */ + testUtils.testWithClientPool('Basic Cache Manually Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + + assert.equal(await client.get("x"), '1', 'first get'); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 2); + let [i] = await p; + + assert.equal(await client.get("x"), '2', 'second get'); + + p = once(csc, 'invalidate'); + await client.set("x", 3); + [i] = await p; + + assert.equal(await client.get("x"), '3'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + + /* invalidationTest 2 */ + testUtils.testWithClientPool('Basic Cache Invalidate via message', async client => { + csc.clear(); + + await client.set('x', 1); + await client.set('y', 2); + + assert.deepEqual(await client.mGet(['x', 'y']), ['1', '2'], 'first mGet'); + + assert.equal(csc.cacheMisses(), 1, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + let p: Promise> = once(csc, 'invalidate'); + await client.set("x", 3); + let [i] = await p; + + assert.equal(i, 'x'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '2'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 2, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + + p = once(csc, 'invalidate'); + await client.set("y", 4); + [i] = await p; + + assert.equal(i, 'y'); + + assert.deepEqual(await client.mGet(['x', 'y']), ['3', '4'], 'second mGet'); + + assert.equal(csc.cacheMisses(), 3, "Cache Misses"); + assert.equal(csc.cacheHits(), 0, "Cache Hits"); + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3, + }, + poolOptions: { + minimum: 5, + maximum: 5, + acquireTimeout: 0, + cleanupDelay: 1, + clientSideCache: csc + } + }) + }); + + describe('Cluster Caching', () => { + const csc = new BasicPooledClientSideCache(); + + testUtils.testWithCluster('Basic Cache Miss and Clear', async client => { + csc.clear(); + + await client.set("x", 1); + await client.get("x"); + await client.set("y", 1); + await client.get("y"); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Hit', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + assert.equal(await client.get("x"), '1'); + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + assert.equal(await client.get("y"), '1'); + + assert.equal(2, csc.cacheMisses(), "Cache Misses"); + assert.equal(4, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + + testUtils.testWithCluster('Basic Cache Invalidate', async client => { + csc.clear(); + + await client.set("x", 1); + assert.equal(await client.get("x"), '1'); + await client.set("x", 2); + assert.equal(await client.get("x"), '2'); + await client.set("x", 3); + assert.equal(await client.get("x"), '3'); + + await client.set("y", 1); + assert.equal(await client.get("y"), '1'); + await client.set("y", 2); + assert.equal(await client.get("y"), '2'); + await client.set("y", 3); + assert.equal(await client.get("y"), '3'); + + assert.equal(6, csc.cacheMisses(), "Cache Misses"); + assert.equal(0, csc.cacheHits(), "Cache Hits"); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + RESP: 3, + clientSideCache: csc + } + }) + }); +}); \ No newline at end of file diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts new file mode 100644 index 0000000000..f17743ff07 --- /dev/null +++ b/packages/client/lib/client/cache.ts @@ -0,0 +1,451 @@ +import { EventEmitter } from 'stream'; +import RedisClient from '.'; +import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types'; +import { BasicCommandParser } from './parser'; + +type CachingClient = RedisClient; +type CmdFunc = () => Promise; + +type EvictionPolicy = "LRU" | "FIFO" + +export interface ClientSideCacheConfig { + ttl?: number; + maxEntries?: number; + evictPolocy?: EvictionPolicy; +} + +type CacheCreator = { + epoch: number; + client: CachingClient; +}; + +interface ClientSideCacheEntry { + invalidate(): void; + validate(): boolean; +} + +function generateCacheKey(redisArgs: ReadonlyArray): string { + const tmp = new Array(redisArgs.length*2); + + for (let i = 0; i < redisArgs.length; i++) { + tmp[i] = redisArgs[i].length; + tmp[i+redisArgs.length] = redisArgs[i]; + } + + return tmp.join('_'); +} + +abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { + #invalidated = false; + readonly #expireTime: number; + + constructor(ttl: number) { + if (ttl == 0) { + this.#expireTime = 0; + } else { + this.#expireTime = Date.now() + ttl; + } + } + + invalidate(): void { + this.#invalidated = true; + } + + validate(): boolean { + return !this.#invalidated && (this.#expireTime == 0 || (Date.now() < this.#expireTime)) + } +} + +class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { + readonly #value: any; + + get value() { + return this.#value; + } + + constructor(ttl: number, value: any) { + super(ttl); + this.#value = value; + } +} + +class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { + readonly #sendCommandPromise: Promise; + + get promise() { + return this.#sendCommandPromise; + } + + constructor(ttl: number, sendCommandPromise: Promise) { + super(ttl); + this.#sendCommandPromise = sendCommandPromise; + } +} + +export abstract class ClientSideCacheProvider extends EventEmitter { + abstract handleCache(client: CachingClient, parser: BasicCommandParser, fn: CmdFunc, transformReply: TransformReply | undefined, typeMapping: TypeMapping | undefined): Promise; + abstract trackingOn(): Array; + abstract invalidate(key: RedisArgument | null): void; + abstract clear(): void; + abstract cacheHits(): number; + abstract cacheMisses(): number; + abstract onError(): void; + abstract onClose(): void; +} + +export class BasicClientSideCache extends ClientSideCacheProvider { + #cacheKeyToEntryMap: Map; + #keyToCacheKeySetMap: Map>; + readonly ttl: number; + readonly maxEntries: number; + readonly lru: boolean; + #cacheHits = 0; + #cacheMisses = 0; + + constructor(config?: ClientSideCacheConfig) { + super(); + + this.#cacheKeyToEntryMap = new Map(); + this.#keyToCacheKeySetMap = new Map>(); + this.ttl = config?.ttl ?? 0; + this.maxEntries = config?.maxEntries ?? 0; + this.lru = config?.evictPolocy !== "FIFO" + } + + /* logic of how caching works: + + 1. commands use a CommandParser + it enables us to define/retrieve + cacheKey - a unique key that corresponds to this command and its arguments + redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached + 2. check if cacheKey is in our cache + 2b1. if its a value cacheEntry - return it + 2b2. if it's a promise cache entry - wait on promise and then go to 3c. + 3. if cacheEntry is not in cache + 3a. send the command save the promise into a a cacheEntry and then wait on result + 3b. transform reply (if required) based on transformReply + 3b. check the cacheEntry is still valid - in cache and hasn't been deleted) + 3c. if valid - overwrite with value entry + 4. return previously non cached result + */ + override async handleCache( + client: CachingClient, + parser: BasicCommandParser, + fn: CmdFunc, + transformReply: TransformReply | undefined, + typeMapping: TypeMapping | undefined + ) { + let reply: ReplyUnion; + + const cacheKey = generateCacheKey(parser.redisArgs); + + // "2" + let cacheEntry = this.get(cacheKey); + if (cacheEntry) { + // If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters. + if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1" + this.#cacheHit(); + + return structuredClone(cacheEntry.value); + } else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2 + // unsure if this should be considered a cache hit, a miss, or neither? + reply = await cacheEntry.promise; + } else { + throw new Error("unknown cache entry type"); + } + } else { // 3/3a + this.#cacheMiss(); + + const promise = fn(); + + cacheEntry = this.createPromiseEntry(client, promise); + this.set(cacheKey, cacheEntry, parser.keys); + + try { + reply = await promise; + } catch (err) { + if (cacheEntry.validate()) { // on error, have to remove promise from cache + this.delete(cacheKey!); + } + + throw err; + } + } + + // 3b + let val; + if (transformReply) { + val = transformReply(reply, parser.preserve, typeMapping); + } else { + val = reply; + } + + // 3c + if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated) + // 3d + cacheEntry = this.createValueEntry(client, val); + this.set(cacheKey, cacheEntry, parser.keys); + this.emit("cached-key", cacheKey); + } else { +// console.log("cache entry for key got invalidated between execution and saving, so not saving"); + } + + return structuredClone(val); + } + + override trackingOn() { + return ['CLIENT', 'TRACKING', 'ON']; + } + + override invalidate(key: RedisArgument | null) { + if (key === null) { + this.clear(false); + this.emit("invalidate", key); + + return; + } + + const keySet = this.#keyToCacheKeySetMap.get(key.toString()); + if (keySet) { + for (const cacheKey of keySet) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(cacheKey); + } + this.#keyToCacheKeySetMap.delete(key.toString()); + } + + this.emit('invalidate', key); + } + + override clear(resetStats = true) { + this.#cacheKeyToEntryMap.clear(); + this.#keyToCacheKeySetMap.clear(); + if (resetStats) { + this.#cacheHits = 0; + this.#cacheMisses = 0; + } + } + + get(cacheKey: string) { + const val = this.#cacheKeyToEntryMap.get(cacheKey); + + if (val && !val.validate()) { + this.delete(cacheKey); + this.emit("cache-evict", cacheKey); + + return undefined; + } + + if (val !== undefined && this.lru) { + this.#cacheKeyToEntryMap.delete(cacheKey); + this.#cacheKeyToEntryMap.set(cacheKey, val); + } + + return val; + } + + delete(cacheKey: string) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + this.#cacheKeyToEntryMap.delete(cacheKey); + } + } + + has(cacheKey: string) { + return this.#cacheKeyToEntryMap.has(cacheKey); + } + + set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { + let count = this.#cacheKeyToEntryMap.size; + const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + + if (oldEntry) { + count--; // overwriting, so not incrementig + oldEntry.invalidate(); + } + + if (this.maxEntries > 0 && count >= this.maxEntries) { + this.deleteOldest(); + } + + this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry); + + for (const key of keys) { + if (!this.#keyToCacheKeySetMap.has(key.toString())) { + this.#keyToCacheKeySetMap.set(key.toString(), new Set()); + } + + const cacheKeySet = this.#keyToCacheKeySetMap.get(key.toString()); + cacheKeySet!.add(cacheKey); + } + } + + size() { + return this.#cacheKeyToEntryMap.size; + } + + createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + return new ClientSideCacheEntryValue(this.ttl, value); + } + + createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise); + } + + #cacheHit(): void { + this.#cacheHits++; + } + + #cacheMiss(): void { + this.#cacheMisses++; + } + + override cacheHits(): number { + return this.#cacheHits; + } + + override cacheMisses(): number { + return this.#cacheMisses; + } + + override onError(): void { + this.clear(); + } + + override onClose() { + this.clear(); + } + + /** + * @internal + */ + deleteOldest() { + const it = this.#cacheKeyToEntryMap[Symbol.iterator](); + const n = it.next(); + if (!n.done) { + this.#cacheKeyToEntryMap.delete(n.value[0]); + } + } + + /** + * @internal + */ + entryEntries() { + return this.#cacheKeyToEntryMap.entries(); + } + + /** + * @internal + */ + keySetEntries() { + return this.#keyToCacheKeySetMap.entries(); + } +} + +export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { + #disabled = false; + + disable() { + this.#disabled = true; + } + + enable() { + this.#disabled = false; + } + + override get(cacheKey: string) { + if (this.#disabled) { + return undefined; + } + + return super.get(cacheKey); + } + + override has(cacheKey: string) { + if (this.#disabled) { + return false; + } + + return super.has(cacheKey); + } + + onPoolClose() { + this.clear(); + }; +} + +// doesn't do anything special in pooling, clears cache on every client disconnect +export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { + override onError() { + this.clear(false); + } + + override onClose() { + this.clear(false); + } +} + +class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, value: any) { + super(ttl, value); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise) { + super(ttl, sendCommandPromise); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + + return ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } +} + +// Doesn't clear cache on client disconnect, validates entries on retrieval +export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache { + override createValueEntry(client: CachingClient, value: any): ClientSideCacheEntryValue { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryValue(this.ttl, creator, value); + } + + override createPromiseEntry(client: CachingClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise); + } + + // don't clear cache on error here + override onError() {} + + override onClose() {} +} \ No newline at end of file diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 15e8a747b9..0af8113506 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: RedisArgument | null) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -109,13 +111,31 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - + // currently only supporting "invalidate" over RESP3 push messages + switch (push[0].toString()) { + case "invalidate": { + if (this.#invalidateCallback) { + if (push[1] !== null) { + for (const key of push[1]) { + this.#invalidateCallback(key); + } + } else { + this.#invalidateCallback(null); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: ReadonlyArray, options?: CommandOptions diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 55355a133d..437f9f2e43 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,6 +15,7 @@ import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; import { RedisVariadicArgument, parseArgs, pushVariadicArguments } from '../commands/generic-transformers'; +import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache'; import { BasicCommandParser, CommandParser } from './parser'; export interface RedisClientOptions< @@ -72,6 +73,10 @@ export interface RedisClientOptions< * TODO */ commandOptions?: CommandOptions; + /** + * TODO + */ + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } type WithCommands< @@ -280,10 +285,15 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; + #dirtyWatch?: string; - #epoch: number; #watchEpoch?: number; + #clientSideCache?: ClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } + get options(): RedisClientOptions | undefined { return this._self.#options; } @@ -300,6 +310,10 @@ export default class RedisClient< return this._self.#queue.isPubSubActive; } + get socketEpoch() { + return this._self.#socket.socketEpoch; + } + get isWatching() { return this._self.#watchEpoch !== undefined; } @@ -310,10 +324,20 @@ export default class RedisClient< constructor(options?: RedisClientOptions) { super(); + this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#epoch = 0; + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof ClientSideCacheProvider) { + this.#clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = new BasicClientSideCache(cscConfig); + } + this.#queue.setInvalidateCallback(this.#clientSideCache.invalidate.bind(this.#clientSideCache)); + } } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { @@ -347,7 +371,6 @@ export default class RedisClient< #handshake(selectedDB: number) { const commands = []; - if (this.#options?.RESP) { const hello: HelloOptions = {}; @@ -392,6 +415,10 @@ export default class RedisClient< ); } + if (this.#clientSideCache) { + commands.push(this.#clientSideCache.trackingOn()); + } + return commands; } @@ -445,6 +472,7 @@ export default class RedisClient< }) .on('error', err => { this.emit('error', err); + this.#clientSideCache?.onError(); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { @@ -453,7 +481,6 @@ export default class RedisClient< }) .on('connect', () => this.emit('connect')) .on('ready', () => { - this.#epoch++; this.emit('ready'); this.#setPingTimer(); this.#maybeScheduleWrite(); @@ -581,13 +608,21 @@ export default class RedisClient< commandOptions: CommandOptions | undefined, transformReply: TransformReply | undefined, ) { - const reply = await this.sendCommand(parser.redisArgs, commandOptions); + const csc = this._self.#clientSideCache; + const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; - if (transformReply) { - return transformReply(reply, parser.preserve, commandOptions?.typeMapping); - } + const fn = () => { return this.sendCommand(parser.redisArgs, commandOptions) }; - return reply; + if (csc && command.CACHEABLE && defaultTypeMapping) { + return await csc.handleCache(this._self, parser as BasicCommandParser, fn, transformReply, commandOptions?.typeMapping); + } else { + const reply = await fn(); + + if (transformReply) { + return transformReply(reply, parser.preserve, commandOptions?.typeMapping); + } + return reply; + } } /** @@ -752,7 +787,7 @@ export default class RedisClient< const reply = await this._self.sendCommand( pushVariadicArguments(['WATCH'], key) ); - this._self.#watchEpoch ??= this._self.#epoch; + this._self.#watchEpoch ??= this._self.socketEpoch; return reply as unknown as ReplyWithTypeMapping, TYPE_MAPPING>; } @@ -855,7 +890,7 @@ export default class RedisClient< throw new WatchError(dirtyWatch); } - if (watchEpoch && watchEpoch !== this._self.#epoch) { + if (watchEpoch && watchEpoch !== this._self.socketEpoch) { throw new WatchError('Client reconnected after WATCH'); } @@ -1075,6 +1110,7 @@ export default class RedisClient< return new Promise(resolve => { clearTimeout(this._self.#pingTimer); this._self.#socket.close(); + this._self.#clientSideCache?.onClose(); if (this._self.#queue.isEmpty()) { this._self.#socket.destroySocket(); @@ -1099,6 +1135,7 @@ export default class RedisClient< clearTimeout(this._self.#pingTimer); this._self.#queue.flushAll(new DisconnectsClientError()); this._self.#socket.destroy(); + this._self.#clientSideCache?.onClose(); } ref() { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index ac1d021be9..29678f027b 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -114,6 +114,7 @@ export class DoublyLinkedList { export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; + removed: boolean; } export class SinglyLinkedList { @@ -140,7 +141,8 @@ export class SinglyLinkedList { const node = { value, - next: undefined + next: undefined, + removed: false }; if (this.#head === undefined) { @@ -151,6 +153,9 @@ export class SinglyLinkedList { } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + if (node.removed) { + throw new Error("node already removed"); + } --this.#length; if (this.#head === node) { @@ -165,6 +170,8 @@ export class SinglyLinkedList { } else { parent!.next = node.next; } + + node.removed = true; } shift() { @@ -177,6 +184,7 @@ export class SinglyLinkedList { this.#head = node.next; } + node.removed = true; return node.value; } diff --git a/packages/client/lib/client/parser.ts b/packages/client/lib/client/parser.ts index 12eec45773..3e82023042 100644 --- a/packages/client/lib/client/parser.ts +++ b/packages/client/lib/client/parser.ts @@ -33,6 +33,17 @@ export class BasicCommandParser implements CommandParser { return this.#keys[0]; } + get cacheKey() { + const tmp = new Array(this.#redisArgs.length*2); + + for (let i = 0; i < this.#redisArgs.length; i++) { + tmp[i] = this.#redisArgs[i].length; + tmp[i+this.#redisArgs.length] = this.#redisArgs[i]; + } + + return tmp.join('_'); + } + push(...arg: Array) { this.#redisArgs.push(...arg); }; diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index a08377e3d3..07553775ec 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,6 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache'; import { BasicCommandParser } from './parser'; export interface RedisPoolOptions { @@ -26,6 +27,10 @@ export interface RedisPoolOptions { * TODO */ cleanupDelay: number; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; /** * TODO */ @@ -117,7 +122,7 @@ export class RedisClientPool< RESP extends RespVersions, TYPE_MAPPING extends TypeMapping = {} >( - clientOptions?: RedisClientOptions, + clientOptions?: Omit, "clientSideCache">, options?: Partial ) { const Pool = attachConfig({ @@ -135,7 +140,7 @@ export class RedisClientPool< // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( - RedisClient.factory(clientOptions).bind(undefined, clientOptions), + clientOptions, options ) ) as RedisClientPoolType; @@ -209,22 +214,41 @@ export class RedisClientPool< return this._self.#isClosing; } + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this._self.#clientSideCache; + } + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... */ constructor( - clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, options?: Partial ) { super(); - this.#clientFactory = clientFactory; this.#options = { ...RedisClientPool.#DEFAULTS, ...options }; + if (options?.clientSideCache) { + if (clientOptions === undefined) { + clientOptions = {}; + } + + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); +// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + } + } + + this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType; } private _self = this; @@ -288,7 +312,6 @@ export class RedisClientPool< async connect() { if (this._self.#isOpen) return; // TODO: throw error? - this._self.#isOpen = true; const promises = []; @@ -298,11 +321,12 @@ export class RedisClientPool< try { await Promise.all(promises); - return this as unknown as RedisClientPoolType; } catch (err) { this.destroy(); throw err; } + + return this as unknown as RedisClientPoolType; } async #create() { @@ -312,7 +336,8 @@ export class RedisClientPool< ); try { - await node.value.connect(); + const client = node.value; + await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); throw err; @@ -401,7 +426,8 @@ export class RedisClientPool< const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this.#idleClients.shift()!.destroy(); + const client = this.#idleClients.shift()! + client.destroy(); } } @@ -439,8 +465,10 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { promises.push(client.close()); } - + await Promise.all(promises); + + this.#clientSideCache?.onPoolClose(); this._self.#idleClients.reset(); this._self.#clientsInUse.reset(); @@ -460,6 +488,9 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { client.destroy(); } + + this._self.#clientSideCache?.onPoolClose(); + this._self.#clientsInUse.reset(); this._self.#isOpen = false; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 36afa36c04..603416cf9e 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -72,6 +72,12 @@ export default class RedisSocket extends EventEmitter { #isSocketUnrefed = false; + #socketEpoch = 0; + + get socketEpoch() { + return this.#socketEpoch; + } + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -212,6 +218,7 @@ export default class RedisSocket extends EventEmitter { throw err; } this.#isReady = true; + this.#socketEpoch++; this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 824cf2ae81..8eef696aeb 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -6,6 +6,7 @@ import { ChannelListeners, PUBSUB_TYPE, PubSubTypeListeners } from '../client/pu import { RedisArgument, RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from '../RESP/types'; import calculateSlot from 'cluster-key-slot'; import { RedisSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface NodeAddress { host: string; @@ -111,6 +112,7 @@ export default class RedisClusterSlots< replicas = new Array>(); readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; + clientSideCache?: PooledClientSideCacheProvider; #isOpen = false; @@ -123,7 +125,16 @@ export default class RedisClusterSlots< emit: EventEmitter['emit'] ) { this.#options = options; - this.#clientFactory = RedisClient.factory(options); + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.clientSideCache = options.clientSideCache; + } else { + this.clientSideCache = new BasicPooledClientSideCache(options.clientSideCache) + } + } + + this.#clientFactory = RedisClient.factory(this.#options); this.#emit = emit; } @@ -164,6 +175,8 @@ export default class RedisClusterSlots< } async #discover(rootNode: RedisClusterClientOptions) { + this.clientSideCache?.clear(); + this.clientSideCache?.disable(); this.#resetSlots(); try { const addressesInUse = new Set(), @@ -218,6 +231,7 @@ export default class RedisClusterSlots< } await Promise.all(promises); + this.clientSideCache?.enable(); return true; } catch (err) { @@ -313,6 +327,8 @@ export default class RedisClusterSlots< #createClient(node: ShardNode, readonly = node.readonly) { return this.#clientFactory( this.#clientOptionsDefaults({ + clientSideCache: this.clientSideCache, + RESP: this.#options.RESP, socket: this.#getNodeAddress(node.address) ?? { host: node.host, port: node.port diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 12928e71f1..ae1cec0f08 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -9,9 +9,9 @@ import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi- import { PubSubListener } from '../client/pub-sub'; import { ErrorReply } from '../errors'; import { RedisTcpSocketOptions } from '../client/socket'; -import ASKING from '../commands/ASKING'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; import { BasicCommandParser } from '../client/parser'; -import { parseArgs } from '../commands/generic-transformers'; +import { ASKING_CMD } from '../commands/ASKING'; interface ClusterCommander< M extends RedisModules, @@ -66,6 +66,10 @@ export interface RedisClusterOptions< * Useful when the cluster is running on another network */ nodeAddressMap?: NodeAddressMap; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } // remove once request & response policies are ready @@ -148,6 +152,7 @@ export default class RedisCluster< > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: ProxyCluster, ...args: Array) { const parser = new BasicCommandParser(); command.parseCommand(parser, ...args); @@ -265,6 +270,10 @@ export default class RedisCluster< return this._self.#slots.slots; } + get clientSideCache() { + return this._self.#slots.clientSideCache; + } + /** * An array of the cluster masters. * Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node. @@ -382,6 +391,27 @@ export default class RedisCluster< // return this._commandOptionsProxy('policies', policies); // } + #handleAsk( + fn: (client: RedisClientType, opts?: ClusterCommandOptions) => Promise + ) { + return async (client: RedisClientType, options?: ClusterCommandOptions) => { + const chainId = Symbol("asking chain"); + const opts = options ? {...options} : {}; + opts.chainId = chainId; + + + + const ret = await Promise.all( + [ + client.sendCommand([ASKING_CMD], {chainId: chainId}), + fn(client, opts) + ] + ); + + return ret[1]; + }; + } + async #execute( firstKey: RedisArgument | undefined, isReadonly: boolean | undefined, @@ -391,14 +421,15 @@ export default class RedisCluster< const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); let i = 0; - let myOpts = options; + + let myFn = fn; while (true) { try { - return await fn(client, myOpts); + return await myFn(client, options); } catch (err) { - // reset to passed in options, if changed by an ask request - myOpts = options; + myFn = fn; + // TODO: error class if (++i > maxCommandRedirections || !(err instanceof Error)) { throw err; @@ -417,13 +448,7 @@ export default class RedisCluster< } client = redirectTo; - - const chainId = Symbol('Asking Chain'); - myOpts = options ? {...options} : {}; - myOpts.chainId = chainId; - - client.sendCommand(parseArgs(ASKING), {chainId: chainId}).catch(err => { console.log(`Asking Failed: ${err}`) } ); - + myFn = this.#handleAsk(fn); continue; } @@ -574,10 +599,12 @@ export default class RedisCluster< } close() { + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.close(); } destroy() { + this._self.#slots.clientSideCache?.onPoolClose(); return this._self.#slots.destroy(); } diff --git a/packages/client/lib/commands/GEOSEARCH.ts b/packages/client/lib/commands/GEOSEARCH.ts index 8c77fd8923..869dc60bec 100644 --- a/packages/client/lib/commands/GEOSEARCH.ts +++ b/packages/client/lib/commands/GEOSEARCH.ts @@ -29,12 +29,7 @@ export function parseGeoSearchArguments( from: GeoSearchFrom, by: GeoSearchBy, options?: GeoSearchOptions, - store?: RedisArgument ) { - if (store !== undefined) { - parser.pushKey(store); - } - parser.pushKey(key); if (typeof from === 'string' || from instanceof Buffer) { diff --git a/packages/client/lib/commands/GEOSEARCHSTORE.ts b/packages/client/lib/commands/GEOSEARCHSTORE.ts index eb8e12abe6..34c6e0988e 100644 --- a/packages/client/lib/commands/GEOSEARCHSTORE.ts +++ b/packages/client/lib/commands/GEOSEARCHSTORE.ts @@ -17,7 +17,12 @@ export default { options?: GeoSearchStoreOptions ) { parser.push('GEOSEARCHSTORE'); - parseGeoSearchArguments(parser, source, from, by, options, destination); + + if (destination !== undefined) { + parser.pushKey(destination); + } + + parseGeoSearchArguments(parser, source, from, by, options); if (options?.STOREDIST) { parser.push('STOREDIST'); diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index be5522bdd8..0cf8d80abe 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -10,11 +10,12 @@ import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, import { promisify } from 'node:util'; import { exec } from 'node:child_process'; import { RESP_TYPES } from '../RESP/decoder'; -import { defineScript } from '../lua-script'; import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; import RedisBloomModules from '@redis/bloom'; +import { BasicPooledClientSideCache } from '../client/cache'; import { RedisTcpSocketOptions } from '../client/socket'; import { SQUARE_SCRIPT } from '../client/index.spec'; +import { once } from 'node:events'; const execAsync = promisify(exec); @@ -78,7 +79,7 @@ async function steadyState(frame: SentinelFramework) { } ["redis-sentinel-test-password", undefined].forEach(function (password) { - describe.skip(`Sentinel - password = ${password}`, () => { + describe(`Sentinel - password = ${password}`, () => { const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; const frame = new SentinelFramework(config); let tracer = new Array(); @@ -1136,6 +1137,34 @@ async function steadyState(frame: SentinelFramework) { tracer.push("added node and waiting on added promise"); await nodeAddedPromise; }) + + it('with client side caching', async function() { + this.timeout(30000); + const csc = new BasicPooledClientSideCache(); + + sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5}); + await sentinel.connect(); + + await sentinel.set('x', 1); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(1, csc.cacheMisses()); + assert.equal(3, csc.cacheHits()); + + const invalidatePromise = once(csc, 'invalidate'); + await sentinel.set('x', 2); + await invalidatePromise; + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + await sentinel.get('x'); + + assert.equal(csc.cacheMisses(), 2); + assert.equal(csc.cacheHits(), 6); + }) }) describe('Sentinel Factory', function () { diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index d25fa03e55..9d9674478f 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -16,6 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers'; import { WaitQueue } from './wait-queue'; import { TcpNetConnectOpts } from 'node:net'; import { RedisTcpSocketOptions } from '../client/socket'; +import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache'; interface ClientInfo { id: number; @@ -265,6 +266,10 @@ export default class RedisSentinel< #masterClientCount = 0; #masterClientInfo?: ClientInfo; + get clientSideCache() { + return this._self.#internal.clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -272,7 +277,7 @@ export default class RedisSentinel< this.#options = options; - if (options?.commandOptions) { + if (options.commandOptions) { this.#commandOptions = options.commandOptions; } @@ -557,7 +562,7 @@ class RedisSentinelInternal< readonly #name: string; readonly #nodeClientOptions: RedisClientOptions; - readonly #sentinelClientOptions: RedisClientOptions; + readonly #sentinelClientOptions: RedisClientOptions; readonly #scanInterval: number; readonly #passthroughClientErrorEvents: boolean; @@ -590,6 +595,11 @@ class RedisSentinelInternal< #trace: (msg: string) => unknown = () => { }; + #clientSideCache?: PooledClientSideCacheProvider; + get clientSideCache() { + return this.#clientSideCache; + } + constructor(options: RedisSentinelOptions) { super(); @@ -602,11 +612,21 @@ class RedisSentinelInternal< this.#scanInterval = options.scanInterval ?? 0; this.#passthroughClientErrorEvents = options.passthroughClientErrorEvents ?? false; - this.#nodeClientOptions = options.nodeClientOptions ? Object.assign({} as RedisClientOptions, options.nodeClientOptions) : {}; + this.#nodeClientOptions = (options.nodeClientOptions ? {...options.nodeClientOptions} : {}) as RedisClientOptions; if (this.#nodeClientOptions.url !== undefined) { throw new Error("invalid nodeClientOptions for Sentinel"); } + if (options.clientSideCache) { + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig); +// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig); + } + } + this.#sentinelClientOptions = options.sentinelClientOptions ? Object.assign({} as RedisClientOptions, options.sentinelClientOptions) : {}; this.#sentinelClientOptions.modules = RedisSentinelModule; @@ -827,6 +847,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; @@ -875,6 +897,8 @@ class RedisSentinelInternal< this.#isReady = false; + this.#clientSideCache?.onPoolClose(); + if (this.#scanTimer) { clearInterval(this.#scanTimer); this.#scanTimer = undefined; diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 25dd4c4371..de6c90a70b 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -188,18 +188,22 @@ export class SentinelFramework extends DockerBase { } const options: RedisSentinelOptions = { + ...opts, name: this.config.sentinelName, sentinelRootNodes: this.#sentinelList.map((sentinel) => { return { host: '127.0.0.1', port: sentinel.docker.port } }), passthroughClientErrorEvents: errors } if (this.config.password !== undefined) { - options.nodeClientOptions = {password: this.config.password}; - options.sentinelClientOptions = {password: this.config.password}; - } + if (!options.nodeClientOptions) { + options.nodeClientOptions = {}; + } + options.nodeClientOptions.password = this.config.password; - if (opts) { - Object.assign(options, opts); + if (!options.sentinelClientOptions) { + options.sentinelClientOptions = {}; + } + options.sentinelClientOptions = {password: this.config.password}; } return RedisSentinel.create(options); diff --git a/packages/client/lib/sentinel/types.ts b/packages/client/lib/sentinel/types.ts index 1f868ec517..05fe77060c 100644 --- a/packages/client/lib/sentinel/types.ts +++ b/packages/client/lib/sentinel/types.ts @@ -4,6 +4,7 @@ import { CommandSignature, CommanderConfig, RedisFunctions, RedisModules, RedisS import COMMANDS from '../commands'; import RedisSentinel, { RedisSentinelClient } from '.'; import { RedisTcpSocketOptions } from '../client/socket'; +import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache'; export interface RedisNode { host: string; @@ -59,6 +60,10 @@ export interface RedisSentinelOptions< * When `false`, the sentinel object will wait for the first available client from the pool. */ reserveClient?: boolean; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export interface SentinelCommander< diff --git a/packages/client/lib/sentinel/utils.ts b/packages/client/lib/sentinel/utils.ts index 90b789ddca..7e2404c2f7 100644 --- a/packages/client/lib/sentinel/utils.ts +++ b/packages/client/lib/sentinel/utils.ts @@ -1,5 +1,5 @@ -import { BasicCommandParser } from '../client/parser'; import { ArrayReply, Command, RedisFunction, RedisScript, RespVersions, UnwrapReply } from '../RESP/types'; +import { BasicCommandParser } from '../client/parser'; import { RedisSocketOptions, RedisTcpSocketOptions } from '../client/socket'; import { functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { NamespaceProxySentinel, NamespaceProxySentinelClient, ProxySentinel, ProxySentinelClient, RedisNode } from './types';