diff --git a/package-lock.json b/package-lock.json index 25a1dc9d51c..064b1158f50 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,9 @@ "workspaces": [ "./packages/*" ], + "dependencies": { + "ts-node": "^10.9.2" + }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "^1.0.2", "@types/mocha": "^10.0.6", @@ -664,6 +667,28 @@ "node": ">=6.9.0" } }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz", + "integrity": "sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==", + "license": "MIT", + "dependencies": { + "@jridgewell/trace-mapping": "0.3.9" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@cspotcode/source-map-support/node_modules/@jridgewell/trace-mapping": { + "version": "0.3.9", + "resolved": "https://registry.npmjs.org/@jridgewell/trace-mapping/-/trace-mapping-0.3.9.tgz", + "integrity": "sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==", + "license": "MIT", + "dependencies": { + "@jridgewell/resolve-uri": "^3.0.3", + "@jridgewell/sourcemap-codec": "^1.4.10" + } + }, "node_modules/@esbuild/linux-x64": { "version": "0.19.12", "cpu": [ @@ -804,7 +829,6 @@ }, "node_modules/@jridgewell/resolve-uri": { "version": "3.1.1", - "dev": true, "license": "MIT", "engines": { "node": ">=6.0.0" @@ -820,7 +844,6 @@ }, "node_modules/@jridgewell/sourcemap-codec": { "version": "1.4.15", - "dev": true, "license": "MIT" }, "node_modules/@jridgewell/trace-mapping": { @@ -1067,10 +1090,6 @@ "resolved": "packages/entraid", "link": true }, - "node_modules/@redis/graph": { - "resolved": "packages/graph", - "link": true - }, "node_modules/@redis/json": { "resolved": "packages/json", "link": true @@ -1164,6 +1183,30 @@ "dev": true, "license": "MIT" }, + "node_modules/@tsconfig/node10": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.11.tgz", + "integrity": "sha512-DcRjDCujK/kCk/cUe8Xz8ZSpm8mS3mNNpta+jGCA6USEDfktlNvm1+IuZ9eTcDbNk41BHwpHHeW+N1lKCz4zOw==", + "license": "MIT" + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.11.tgz", + "integrity": "sha512-cqefuRsh12pWyGsIoBKJA9luFu3mRxCA+ORZvA4ktLSzIuCUtWVxGIuXigEwO5/ywWFMZ2QEGKWvkZG1zDMTag==", + "license": "MIT" + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.3.tgz", + "integrity": "sha512-ysT8mhdixWK6Hw3i1V2AeRqZ5WfXg1G43mqoYlM2nc6388Fq5jcXyr5mRsqViLx/GJYdoL0bfXD8nmF+Zn/Iow==", + "license": "MIT" + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.4.tgz", + "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", + "license": "MIT" + }, "node_modules/@types/body-parser": { "version": "1.19.5", "resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.5.tgz", @@ -1247,7 +1290,6 @@ }, "node_modules/@types/node": { "version": "20.11.16", - "dev": true, "license": "MIT", "dependencies": { "undici-types": "~5.26.4" @@ -1330,6 +1372,30 @@ "node": ">= 0.6" } }, + "node_modules/acorn": { + "version": "8.14.1", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.1.tgz", + "integrity": "sha512-OvQ/2pUDKmgfCg++xsTX1wGxfTaszcHVcTctW4UJB4hibJx2HXxxO5UmVgyjMa+ZDsiaf5wWLXYpRWMmBI0QHg==", + "license": "MIT", + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.3.4", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.3.4.tgz", + "integrity": "sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==", + "license": "MIT", + "dependencies": { + "acorn": "^8.11.0" + }, + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/agent-base": { "version": "7.1.0", "license": "MIT", @@ -1448,6 +1514,12 @@ "dev": true, "license": "MIT" }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "license": "MIT" + }, "node_modules/argparse": { "version": "2.0.1", "dev": true, @@ -2315,6 +2387,12 @@ } } }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "license": "MIT" + }, "node_modules/cross-spawn": { "version": "7.0.3", "dev": true, @@ -5088,6 +5166,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "license": "ISC" + }, "node_modules/marked": { "version": "4.3.0", "dev": true, @@ -7576,6 +7660,58 @@ "node": ">=0.8.0" } }, + "node_modules/ts-node": { + "version": "10.9.2", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.9.2.tgz", + "integrity": "sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==", + "license": "MIT", + "dependencies": { + "@cspotcode/source-map-support": "^0.8.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.1", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-esm": "dist/bin-esm.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, + "node_modules/ts-node/node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "license": "BSD-3-Clause", + "engines": { + "node": ">=0.3.1" + } + }, "node_modules/tslib": { "version": "2.6.2", "license": "0BSD" @@ -7741,7 +7877,6 @@ }, "node_modules/typescript": { "version": "5.3.3", - "dev": true, "license": "Apache-2.0", "bin": { "tsc": "bin/tsc", @@ -7780,7 +7915,6 @@ }, "node_modules/undici-types": { "version": "5.26.5", - "dev": true, "license": "MIT" }, "node_modules/unicorn-magic": { @@ -7956,6 +8090,12 @@ "uuid": "dist/bin/uuid" } }, + "node_modules/v8-compile-cache-lib": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", + "integrity": "sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==", + "license": "MIT" + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", @@ -8324,6 +8464,15 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "dev": true, @@ -8353,7 +8502,7 @@ }, "packages/bloom": { "name": "@redis/bloom", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -8362,12 +8511,12 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.0.0-next.6" + "@redis/client": "^5.0.0-next.7" } }, "packages/client": { "name": "@redis/client", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "dependencies": { "cluster-key-slot": "1.1.2" @@ -8383,7 +8532,7 @@ }, "packages/entraid": { "name": "@redis/entraid", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "dependencies": { "@azure/identity": "4.7.0", @@ -8402,7 +8551,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.0.0-next.6" + "@redis/client": "^5.0.0-next.7" } }, "packages/entraid/node_modules/@types/node": { @@ -8425,6 +8574,7 @@ "packages/graph": { "name": "@redis/graph", "version": "5.0.0-next.6", + "extraneous": true, "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -8438,7 +8588,7 @@ }, "packages/json": { "name": "@redis/json", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -8447,19 +8597,18 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.0.0-next.6" + "@redis/client": "^5.0.0-next.7" } }, "packages/redis": { - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "dependencies": { - "@redis/bloom": "5.0.0-next.6", - "@redis/client": "5.0.0-next.6", - "@redis/graph": "5.0.0-next.6", - "@redis/json": "5.0.0-next.6", - "@redis/search": "5.0.0-next.6", - "@redis/time-series": "5.0.0-next.6" + "@redis/bloom": "5.0.0-next.7", + "@redis/client": "5.0.0-next.7", + "@redis/json": "5.0.0-next.7", + "@redis/search": "5.0.0-next.7", + "@redis/time-series": "5.0.0-next.7" }, "engines": { "node": ">= 18" @@ -8467,7 +8616,7 @@ }, "packages/search": { "name": "@redis/search", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -8476,7 +8625,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.0.0-next.6" + "@redis/client": "^5.0.0-next.7" } }, "packages/test-utils": { @@ -8545,7 +8694,7 @@ }, "packages/time-series": { "name": "@redis/time-series", - "version": "5.0.0-next.6", + "version": "5.0.0-next.7", "license": "MIT", "devDependencies": { "@redis/test-utils": "*" @@ -8554,7 +8703,7 @@ "node": ">= 18" }, "peerDependencies": { - "@redis/client": "^5.0.0-next.6" + "@redis/client": "^5.0.0-next.7" } } } diff --git a/package.json b/package.json index 0a29c71f831..a574893d906 100644 --- a/package.json +++ b/package.json @@ -22,5 +22,8 @@ "tsx": "^4.7.0", "typedoc": "^0.25.7", "typescript": "^5.3.3" + }, + "dependencies": { + "ts-node": "^10.9.2" } } diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5dae1271ecb..f48e03d0c19 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -299,6 +299,9 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; + // flag used to annotate that the client + // was in a watch transaction when + // a topology change occured #dirtyWatch?: string; #epoch: number; #watchEpoch?: number; @@ -325,6 +328,20 @@ export default class RedisClient< return this._self.#watchEpoch !== undefined; } + /** + * Indicates whether the client's WATCH command has been invalidated by a topology change. + * When this returns true, any transaction using WATCH will fail with a WatchError. + * @returns true if the watched keys have been modified, false otherwise + */ + get isDirtyWatch(): boolean { + return this._self.#dirtyWatch !== undefined + } + + /** + * Marks the client's WATCH command as invalidated due to a topology change. + * This will cause any subsequent EXEC in a transaction to fail with a WatchError. + * @param msg - The error message explaining why the WATCH is dirty + */ setDirtyWatch(msg: string) { this._self.#dirtyWatch = msg; } diff --git a/packages/client/lib/sentinel/index.spec.ts b/packages/client/lib/sentinel/index.spec.ts index be5522bdd8d..d9c77c5dfe8 100644 --- a/packages/client/lib/sentinel/index.spec.ts +++ b/packages/client/lib/sentinel/index.spec.ts @@ -1,29 +1,333 @@ import { strict as assert } from 'node:assert'; import { setTimeout } from 'node:timers/promises'; +import testUtils, { GLOBAL, MATH_FUNCTION } from '../test-utils'; +import { RESP_TYPES } from '../RESP/decoder'; import { WatchError } from "../errors"; import { RedisSentinelConfig, SentinelFramework } from "./test-util"; -import { RedisNode, RedisSentinelClientType, RedisSentinelEvent, RedisSentinelType } from "./types"; -import { RedisSentinelFactory } from '.'; -import { RedisClientType } from '../client'; +import { RedisSentinelEvent, RedisSentinelType, RedisSentinelClientType, RedisNode } from "./types"; import { RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping, NumberReply } from '../RESP/types'; - import { promisify } from 'node:util'; import { exec } from 'node:child_process'; -import { RESP_TYPES } from '../RESP/decoder'; -import { defineScript } from '../lua-script'; -import { MATH_FUNCTION } from '../commands/FUNCTION_LOAD.spec'; -import RedisBloomModules from '@redis/bloom'; -import { RedisTcpSocketOptions } from '../client/socket'; -import { SQUARE_SCRIPT } from '../client/index.spec'; - const execAsync = promisify(exec); -/* used to ensure test environment resets to normal state - i.e. - - all redis nodes are active and are part of the topology - before allowing things to continue. -*/ +[GLOBAL.SENTINEL.OPEN, GLOBAL.SENTINEL.PASSWORD].forEach(testOptions => { + describe(`test with password - ${testOptions.password}`, () => { + testUtils.testWithClientSentinel('client should be authenticated', async sentinel => { + await assert.doesNotReject(sentinel.set('x', 1)); + }, testOptions); + + testUtils.testWithClientSentinel('try to connect multiple times', async sentinel => { + await assert.rejects(sentinel.connect()); + }, testOptions); + + + testUtils.testWithClientSentinel('should respect type mapping', async sentinel => { + const typeMapped = sentinel.withTypeMapping({ + [RESP_TYPES.SIMPLE_STRING]: Buffer + }); + + const resp = await typeMapped.ping(); + assert.deepEqual(resp, Buffer.from('PONG')); + }, testOptions); + + testUtils.testWithClientSentinel('many readers', async sentinel => { + await sentinel.set("x", 1); + for (let i = 0; i < 10; i++) { + if (await sentinel.get("x") == "1") { + break; + } + await setTimeout(1000); + } + + const promises: Array> = []; + for (let i = 0; i < 500; i++) { + promises.push(sentinel.get("x")); + } + + const resp = await Promise.all(promises); + assert.equal(resp.length, 500); + for (let i = 0; i < 500; i++) { + assert.equal(resp[i], "1", `failed on match at ${i}`); + } + }, testOptions); + + testUtils.testWithClientSentinel('use', async sentinel => { + await sentinel.use( + async (client: any ) => { + await assert.doesNotReject(client.get('x')); + } + ); + }, testOptions); + + testUtils.testWithClientSentinel('watch does not carry over leases', async sentinel => { + assert.equal(await sentinel.use(client => client.watch("x")), 'OK') + assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); + assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); + }, testOptions); + + testUtils.testWithClientSentinel('plain pubsub - channel', async sentinel => { + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; + }); + + let tester = false; + await sentinel.subscribe('test', () => { + tester = true; + pubSubResolve(1); + }) + + await sentinel.publish('test', 'hello world'); + await pubSubPromise; + assert.equal(tester, true); + + // now unsubscribe + tester = false; + await sentinel.unsubscribe('test') + await sentinel.publish('test', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }, testOptions); + + testUtils.testWithClientSentinel('plain pubsub - pattern', async sentinel => { + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; + }); + + let tester = false; + await sentinel.pSubscribe('test*', () => { + tester = true; + pubSubResolve(1); + }) + + await sentinel.publish('testy', 'hello world'); + await pubSubPromise; + assert.equal(tester, true); + + // now unsubscribe + tester = false; + await sentinel.pUnsubscribe('test*'); + await sentinel.publish('testy', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }, testOptions) + }); +}); + +describe(`test with scripts`, () => { + testUtils.testWithClientSentinel('with script', async sentinel => { + const [, reply] = await Promise.all([ + sentinel.set('key', '2'), + sentinel.square('key') + ]); + + assert.equal(reply, 4); + }, GLOBAL.SENTINEL.WITH_SCRIPT); + + testUtils.testWithClientSentinel('with script multi', async sentinel => { + const reply = await sentinel.multi().set('key', 2).square('key').exec(); + assert.deepEqual(reply, ['OK', 4]); + }, GLOBAL.SENTINEL.WITH_SCRIPT); + + testUtils.testWithClientSentinel('use with script', async sentinel => { + const reply = await sentinel.use( + async (client: any) => { + assert.equal(await client.set('key', '2'), 'OK'); + assert.equal(await client.get('key'), '2'); + return client.square('key') + } + ); + }, GLOBAL.SENTINEL.WITH_SCRIPT) +}); + + +describe(`test with functions`, () => { + testUtils.testWithClientSentinel('with function', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + await sentinel.set('key', '2'); + const resp = await sentinel.math.square('key'); + + assert.equal(resp, 4); + }, GLOBAL.SENTINEL.WITH_FUNCTION); + + testUtils.testWithClientSentinel('with function multi', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); + assert.deepEqual(reply, ['OK', 4]); + }, GLOBAL.SENTINEL.WITH_FUNCTION); + + testUtils.testWithClientSentinel('use with function', async sentinel => { + await sentinel.functionLoad( + MATH_FUNCTION.code, + { REPLACE: true } + ); + + const reply = await sentinel.use( + async (client: any) => { + await client.set('key', '2'); + return client.math.square('key'); + } + ); + + assert.equal(reply, 4); + }, GLOBAL.SENTINEL.WITH_FUNCTION); +}); + +describe(`test with modules`, () => { + testUtils.testWithClientSentinel('with module', async sentinel => { + const resp = await sentinel.bf.add('key', 'item') + assert.equal(resp, true); + }, GLOBAL.SENTINEL.WITH_MODULE); + + testUtils.testWithClientSentinel('with module multi', async sentinel => { + const resp = await sentinel.multi().bf.add('key', 'item').exec(); + assert.deepEqual(resp, [true]); + }, GLOBAL.SENTINEL.WITH_MODULE); + + testUtils.testWithClientSentinel('use with module', async sentinel => { + const reply = await sentinel.use( + async (client: any) => { + return client.bf.add('key', 'item'); + } + ); + + assert.equal(reply, true); + }, GLOBAL.SENTINEL.WITH_MODULE); +}); + +describe(`test with replica pool size 1`, () => { + testUtils.testWithClientSentinel('client lease', async sentinel => { + sentinel.on("error", () => { }); + + const clientLease = await sentinel.aquire(); + clientLease.set('x', 456); + + let matched = false; + /* waits for replication */ + for (let i = 0; i < 15; i++) { + try { + assert.equal(await sentinel.get("x"), '456'); + matched = true; + break; + } catch (err) { + await setTimeout(1000); + } + } + + clientLease.release(); + assert.equal(matched, true); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); + + testUtils.testWithClientSentinel('block on pool', async sentinel => { + const promise = sentinel.use( + async client => { + await setTimeout(1000); + return await client.get("x"); + } + ) + + await sentinel.set("x", 1); + assert.equal(await promise, null); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); + + testUtils.testWithClientSentinel('pipeline', async sentinel => { + const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); + assert.deepEqual(resp, ['OK', '1']); + }, GLOBAL.SENTINEL.WITH_REPLICA_POOL_SIZE_1); +}); + +describe(`test with masterPoolSize 2, reserve client true`, () => { + // TODO: flaky test, sometimes fails with `promise1 === null` + testUtils.testWithClientSentinel('reserve client, takes a client out of pool', async sentinel => { + const promise1 = sentinel.use( + async client => { + const val = await client.get("x"); + await client.set("x", 2); + return val; + } + ) + + const promise2 = sentinel.use( + async client => { + return client.get("x"); + } + ) + + await sentinel.set("x", 1); + assert.equal(await promise1, "1"); + assert.equal(await promise2, "2"); + }, Object.assign(GLOBAL.SENTINEL.WITH_RESERVE_CLIENT_MASTER_POOL_SIZE_2, {skipTest: true})); +}); + +describe(`test with masterPoolSize 2`, () => { + testUtils.testWithClientSentinel('multple clients', async sentinel => { + sentinel.on("error", () => { }); + + const promise = sentinel.use( + async client => { + await sentinel!.set("x", 1); + await client.get("x"); + } + ) + + await assert.doesNotReject(promise); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('use - watch - clean', async sentinel => { + let promise = sentinel.use(async (client) => { + await client.set("x", 1); + await client.watch("x"); + return client.multi().get("x").exec(); + }); + + assert.deepEqual(await promise, ['1']); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('use - watch - dirty', async sentinel => { + let promise = sentinel.use(async (client) => { + await client.set('x', 1); + await client.watch('x'); + await sentinel!.set('x', 2); + return client.multi().get('x').exec(); + }); + + await assert.rejects(promise, new WatchError()); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('lease - watch - clean', async sentinel => { + const leasedClient = await sentinel.aquire(); + await leasedClient.set('x', 1); + await leasedClient.watch('x'); + assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); + + testUtils.testWithClientSentinel('lease - watch - dirty', async sentinel => { + const leasedClient = await sentinel.aquire(); + await leasedClient.set('x', 1); + await leasedClient.watch('x'); + await leasedClient.set('x', 2); + + await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); + }, GLOBAL.SENTINEL.WITH_MASTER_POOL_SIZE_2); +}); + + +// TODO: Figure out how to modify the test utils +// so it would have fine grained controll over +// sentinel +// it should somehow replicate the `SentinelFramework` object functionallities async function steadyState(frame: SentinelFramework) { let checkedMaster = false; let checkedReplicas = false; @@ -34,7 +338,6 @@ async function steadyState(frame: SentinelFramework) { checkedMaster = true; } } - if (!checkedReplicas) { const replicas = (await frame.sentinelReplicas()); checkedReplicas = true; @@ -43,17 +346,14 @@ async function steadyState(frame: SentinelFramework) { } } } - let nodeResolve, nodeReject; const nodePromise = new Promise((res, rej) => { nodeResolve = res; nodeReject = rej; }) - const seenNodes = new Set(); let sentinel: RedisSentinelType | undefined; const tracer = []; - try { sentinel = frame.getSentinelClient({ replicaPoolSize: 1, scanInterval: 2000 }, false) .on('topology-change', (event: RedisSentinelEvent) => { @@ -66,7 +366,6 @@ async function steadyState(frame: SentinelFramework) { }).on('error', err => { }); sentinel.setTracer(tracer); await sentinel.connect(); - await nodePromise; await sentinel.flushAll(); @@ -77,1189 +376,593 @@ async function steadyState(frame: SentinelFramework) { } } -["redis-sentinel-test-password", undefined].forEach(function (password) { - describe.skip(`Sentinel - password = ${password}`, () => { - const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password }; - const frame = new SentinelFramework(config); - let tracer = new Array(); - let stopMeasuringBlocking = false; - let longestDelta = 0; - let longestTestDelta = 0; - let last: number; - - before(async function () { - this.timeout(15000); - - last = Date.now(); - - function deltaMeasurer() { - const delta = Date.now() - last; - if (delta > longestDelta) { - longestDelta = delta; - } - if (delta > longestTestDelta) { - longestTestDelta = delta; +describe('legacy tests', () => { + const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: undefined }; + const frame = new SentinelFramework(config); + let tracer = new Array(); + let stopMeasuringBlocking = false; + let longestDelta = 0; + let longestTestDelta = 0; + let last: number; + + before(async function () { + this.timeout(15000); + + last = Date.now(); + + function deltaMeasurer() { + const delta = Date.now() - last; + if (delta > longestDelta) { + longestDelta = delta; + } + if (delta > longestTestDelta) { + longestTestDelta = delta; + } + if (!stopMeasuringBlocking) { + last = Date.now(); + setImmediate(deltaMeasurer); + } + } + setImmediate(deltaMeasurer); + await frame.spawnRedisSentinel(); + }); + + after(async function () { + this.timeout(15000); + + stopMeasuringBlocking = true; + + await frame.cleanup(); + }) + + describe('Sentinel Client', function () { + let sentinel: RedisSentinelType | undefined; + + beforeEach(async function () { + this.timeout(0); + + await frame.getAllRunning(); + await steadyState(frame); + longestTestDelta = 0; + }) + + afterEach(async function () { + this.timeout(60000); + // avoid errors in afterEach that end testing + if (sentinel !== undefined) { + sentinel.on('error', () => { }); + } + + if (this!.currentTest!.state === 'failed') { + console.log(`longest event loop blocked delta: ${longestDelta}`); + console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); + console.log("trace:"); + for (const line of tracer) { + console.log(line); } - if (!stopMeasuringBlocking) { - last = Date.now(); - setImmediate(deltaMeasurer); + console.log(`sentinel object state:`) + console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`) + console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`) + const results = await Promise.all([ + frame.sentinelSentinels(), + frame.sentinelMaster(), + frame.sentinelReplicas() + ]) + + console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); + console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); + console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); + const { stdout, stderr } = await execAsync("docker ps -a"); + console.log(`docker stdout:\n${stdout}`); + const ids = frame.getAllDockerIds(); + console.log("docker logs"); + for (const [id, port] of ids) { + console.log(`${id}/${port}\n`); + const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4}); + console.log(stdout); } } + tracer.length = 0; - setImmediate(deltaMeasurer); + if (sentinel !== undefined) { + await sentinel.destroy(); + sentinel = undefined; + } + }) + + it('use', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + sentinel.on("error", () => { }); + await sentinel.connect(); - await frame.spawnRedisSentinel(); + await sentinel.use( + async (client: RedisSentinelClientType, ) => { + const masterNode = sentinel!.getMasterNode(); + await frame.stopNode(masterNode!.port.toString()); + await assert.doesNotReject(client.get('x')); + } + ); }); - - after(async function () { - this.timeout(15000); - - stopMeasuringBlocking = true; - - await frame.cleanup(); - }) - - describe('Sentinel Client', function () { - let sentinel: RedisSentinelType< RedisModules, RedisFunctions, RedisScripts, RespVersions, TypeMapping> | undefined; - - beforeEach(async function () { - this.timeout(0); - - await frame.getAllRunning(); - await steadyState(frame); - longestTestDelta = 0; + // stops master to force sentinel to update + it('stop master', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push(`connected`); + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; }) - - afterEach(async function () { - this.timeout(30000); - // avoid errors in afterEach that end testing - if (sentinel !== undefined) { - sentinel.on('error', () => { }); - } - - if (this!.currentTest!.state === 'failed') { - console.log(`longest event loop blocked delta: ${longestDelta}`); - console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); - console.log("trace:"); - for (const line of tracer) { - console.log(line); - } - console.log(`sentinel object state:`) - console.log(`master: ${JSON.stringify(sentinel?.getMasterNode())}`) - console.log(`replicas: ${JSON.stringify(sentinel?.getReplicaNodes().entries)}`) - const results = await Promise.all([ - frame.sentinelSentinels(), - frame.sentinelMaster(), - frame.sentinelReplicas() - ]) - console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); - console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); - console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); - const { stdout, stderr } = await execAsync("docker ps -a"); - console.log(`docker stdout:\n${stdout}`); - - const ids = frame.getAllDockerIds(); - console.log("docker logs"); - - for (const [id, port] of ids) { - console.log(`${id}/${port}\n`); - const { stdout, stderr } = await execAsync(`docker logs ${id}`, {maxBuffer: 8192 * 8192 * 4}); - console.log(stdout); - } - } - tracer.length = 0; - - if (sentinel !== undefined) { - await sentinel.destroy(); - sentinel = undefined; + const masterNode = await sentinel.getMasterNode(); + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push(`got expected master change event`); + masterChangeResolve(event.node); } + }); + + tracer.push(`stopping master node`); + await frame.stopNode(masterNode!.port.toString()); + tracer.push(`stopped master node`); + + tracer.push(`waiting on master change promise`); + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got new master node of ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + }); + + // if master changes, client should make sure user knows watches are invalid + it('watch across master change', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push("connected"); + + const client = await sentinel.aquire(); + tracer.push("aquired lease"); + + await client.set("x", 1); + await client.watch("x"); + + tracer.push("did a watch on lease"); + + let resolve; + const promise = new Promise((res) => { + resolve = res; }) - - it('basic bootstrap', async function () { - sentinel = frame.getSentinelClient(); - await sentinel.connect(); - await assert.doesNotReject(sentinel.set('x', 1)); + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); - }); - - it('basic teardown worked', async function () { - const nodePorts = frame.getAllNodesPort(); - const sentinelPorts = frame.getAllSentinelsPort(); - - assert.notEqual(nodePorts.length, 0); - assert.notEqual(sentinelPorts.length, 0); - - sentinel = frame.getSentinelClient(); - await sentinel.connect(); - - await assert.doesNotReject(sentinel.get('x')); - }); - - it('try to connect multiple times', async function () { - sentinel = frame.getSentinelClient(); - const connectPromise = sentinel.connect(); - await assert.rejects(sentinel.connect()); - await connectPromise; - }); - - it('with type mapping', async function () { - const commandOptions = { - typeMapping: { - [RESP_TYPES.SIMPLE_STRING]: Buffer - } + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("resolving promise"); + resolve(event.node); } - sentinel = frame.getSentinelClient({ commandOptions: commandOptions }); - await sentinel.connect(); - - const resp = await sentinel.ping(); - assert.deepEqual(resp, Buffer.from('PONG')) + }); + + tracer.push("stopping master node"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master node and waiting on promise"); + + const newMaster = await promise as RedisNode; + tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); + assert.notEqual(masterNode!.port, newMaster.port); + tracer.push(`newMaster does not equal old master`); + + tracer.push(`waiting to assert that a multi/exec now fails`); + await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + tracer.push(`asserted that a multi/exec now fails`); + }); + + // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid + it('watch before and after master change', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push("connected"); + + const client = await sentinel.aquire(); + tracer.push("got leased client"); + await client.set("x", 1); + await client.watch("x"); + + tracer.push("set and watched x"); + + let resolve; + const promise = new Promise((res) => { + resolve = res; }) - - it('with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } + + const masterNode = sentinel.getMasterNode(); + tracer.push(`initial masterPort = ${masterNode!.port} `); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + resolve(event.node); } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const [, reply] = await Promise.all([ - sentinel.set('key', '2'), - sentinel.square('key') - ]); - - assert.equal(reply, 4); + }); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master"); + + tracer.push("waiting on master change promise"); + const newMaster = await promise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push("watching again, shouldn't matter"); + await client.watch("y"); + + tracer.push("expecting multi to be rejected"); + await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); + tracer.push("multi was rejected"); + }); + + + // pubsub continues to work, even with a master change + it('pubsub - channel - with master change', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push(`connected`); + + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; }) - it('multi with a script', async function () { - const options = { - scripts: { - square: SQUARE_SCRIPT - } + let tester = false; + await sentinel.subscribe('test', () => { + tracer.push(`got pubsub message`); + tester = true; + pubSubResolve(1); + }) + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; + }) + + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + masterChangeResolve(event.node); } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); + }); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master and waiting on change promise"); - const reply = await sentinel.multi().set('key', 2).square('key').exec(); + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); - assert.deepEqual(reply, ['OK', 4]); + tracer.push(`publishing pubsub message`); + await sentinel.publish('test', 'hello world'); + tracer.push(`published pubsub message and waiting pn pubsub promise`); + await pubSubPromise; + tracer.push(`got pubsub promise`); + + assert.equal(tester, true); + + // now unsubscribe + tester = false + await sentinel.unsubscribe('test') + await sentinel.publish('test', 'hello world'); + await setTimeout(1000); + + assert.equal(tester, false); + }); + + it('pubsub - pattern - with master change', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push(`connected`); + + let pubSubResolve; + const pubSubPromise = new Promise((res) => { + pubSubResolve = res; }) - - it('with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); - - await sentinel.set('key', '2'); - const resp = await sentinel.math.square('key'); - - assert.equal(resp, 4); + + let tester = false; + await sentinel.pSubscribe('test*', () => { + tracer.push(`got pubsub message`); + tester = true; + pubSubResolve(1); }) - it('multi with a function', async function () { - const options = { - functions: { - math: MATH_FUNCTION.library - } + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; + }) + + const masterNode = sentinel.getMasterNode(); + tracer.push(`got masterPort as ${masterNode!.port}`); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + tracer.push("got a master change event that is not the same as before"); + masterChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); + }); + + tracer.push("stopping master"); + await frame.stopNode(masterNode!.port.toString()); + tracer.push("stopped master and waiting on master change promise"); + + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got master change port as ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push(`publishing pubsub message`); + await sentinel.publish('testy', 'hello world'); + tracer.push(`published pubsub message and waiting on pubsub promise`); + await pubSubPromise; + tracer.push(`got pubsub promise`); + assert.equal(tester, true); + + // now unsubscribe + tester = false + await sentinel.pUnsubscribe('test*'); + await sentinel.publish('testy', 'hello world'); + await setTimeout(1000); - const reply = await sentinel.multi().set('key', 2).math.square('key').exec(); - assert.deepEqual(reply, ['OK', 4]); + assert.equal(tester, false); + }); + + // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology + it('command immeaditely after stopping master', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + + tracer.push("connected"); + + let masterChangeResolve; + const masterChangePromise = new Promise((res) => { + masterChangeResolve = res; }) - - it('with a module', async function () { - const options = { - modules: RedisBloomModules + + const masterNode = sentinel.getMasterNode(); + tracer.push(`original master port = ${masterNode!.port}`); + + let changeCount = 0; + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { + changeCount++; + tracer.push(`got topology-change event we expected`); + masterChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const resp = await sentinel.bf.add('key', 'item') - assert.equal(resp, true); + }); + + tracer.push(`stopping masterNode`); + await frame.stopNode(masterNode!.port.toString()); + tracer.push(`stopped masterNode`); + assert.equal(await sentinel.set('x', 123), 'OK'); + tracer.push(`did the set operation`); + const presumamblyNewMaster = sentinel.getMasterNode(); + tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); + + const newMaster = await masterChangePromise as RedisNode; + tracer.push(`got new masternode event saying master is at ${newMaster.port}`); + assert.notEqual(masterNode!.port, newMaster.port); + + tracer.push(`doing the get`); + const val = await sentinel.get('x'); + tracer.push(`did the get and got ${val}`); + const newestMaster = sentinel.getMasterNode() + tracer.push(`after get, we see master as ${newestMaster?.port}`); + + switch (changeCount) { + case 1: + // if we only changed masters once, we should have the proper value + assert.equal(val, '123'); + break; + case 2: + // we changed masters twice quickly, so probably didn't replicate + // therefore, this is soewhat flakey, but the above is the common case + assert(val == '123' || val == null); + break; + default: + assert(false, "unexpected case"); + } + }); + + it('shutdown sentinel node', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient(); + sentinel.setTracer(tracer); + sentinel.on("error", () => { }); + await sentinel.connect(); + tracer.push("connected"); + + let sentinelChangeResolve; + const sentinelChangePromise = new Promise((res) => { + sentinelChangeResolve = res; }) - it('multi with a module', async function () { - const options = { - modules: RedisBloomModules + const sentinelNode = sentinel.getSentinelNode(); + tracer.push(`sentinelNode = ${sentinelNode?.port}`) + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "SENTINEL_CHANGE") { + tracer.push("got sentinel change event"); + sentinelChangeResolve(event.node); } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const resp = await sentinel.multi().bf.add('key', 'item').exec(); - assert.deepEqual(resp, [true]); + }); + + tracer.push("Stopping sentinel node"); + await frame.stopSentinel(sentinelNode!.port.toString()); + tracer.push("Stopped sentinel node and waiting on sentinel change promise"); + const newSentinel = await sentinelChangePromise as RedisNode; + tracer.push("got sentinel change promise"); + assert.notEqual(sentinelNode!.port, newSentinel.port); + }); + + it('timer works, and updates sentinel list', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ scanInterval: 1000 }); + sentinel.setTracer(tracer); + await sentinel.connect(); + tracer.push("connected"); + + let sentinelChangeResolve; + const sentinelChangePromise = new Promise((res) => { + sentinelChangeResolve = res; }) - - it('many readers', async function () { - this.timeout(10000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 8 }); - await sentinel.connect(); - - await sentinel.set("x", 1); - for (let i = 0; i < 10; i++) { - if (await sentinel.get("x") == "1") { - break; - } - await setTimeout(1000); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { + tracer.push(`got sentinel list change event with right size`); + sentinelChangeResolve(event.size); } - - const promises: Array> = []; - for (let i = 0; i < 500; i++) { - promises.push(sentinel.get("x")); - } - - const resp = await Promise.all(promises); - assert.equal(resp.length, 500); - for (let i = 0; i < 500; i++) { - assert.equal(resp[i], "1", `failed on match at ${i}`); - } - }); - - it('use', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - await sentinel.use( - async (client: RedisSentinelClientType, ) => { - const masterNode = sentinel!.getMasterNode(); - await frame.stopNode(masterNode!.port.toString()); - await assert.doesNotReject(client.get('x')); - } - ); - }); - - it('use with script', async function () { - this.timeout(10000); - - const options = { - scripts: { - square: SQUARE_SCRIPT - } - } - - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - assert.equal(await client.set('key', '2'), 'OK'); - assert.equal(await client.get('key'), '2'); - return client.square('key') - } - ); - - assert.equal(reply, 4); - }) - - it('use with a function', async function () { - this.timeout(10000); - - const options = { - functions: { - math: MATH_FUNCTION.library - } - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - await sentinel.functionLoad( - MATH_FUNCTION.code, - { REPLACE: true } - ); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - await client.set('key', '2'); - return client.math.square('key'); - } - ); - - assert.equal(reply, 4); - }) - - it('use with a module', async function () { - const options = { - modules: RedisBloomModules - } - sentinel = frame.getSentinelClient(options); - await sentinel.connect(); - - const reply = await sentinel.use( - async (client: RedisSentinelClientType) => { - return client.bf.add('key', 'item'); - } - ); - - assert.equal(reply, true); - }) - - it('block on pool', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - const promise = sentinel.use( - async client => { - await setTimeout(1000); - return await client.get("x"); - } - ) - - await sentinel.set("x", 1); - assert.equal(await promise, null); }); - it('reserve client, takes a client out of pool', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2, reserveClient: true }); - await sentinel.connect(); - - const promise1 = sentinel.use( - async client => { - const val = await client.get("x"); - await client.set("x", 2); - return val; - } - ) + tracer.push(`adding sentinel`); + await frame.addSentinel(); + tracer.push(`added sentinel and waiting on sentinel change promise`); + const newSentinelSize = await sentinelChangePromise as number; - const promise2 = sentinel.use( - async client => { - return client.get("x"); - } - ) + assert.equal(newSentinelSize, 4); + }); + + it('stop replica, bring back replica', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); + sentinel.setTracer(tracer); + sentinel.on('error', err => { }); + await sentinel.connect(); + tracer.push("connected"); - await sentinel.set("x", 1); - assert.equal(await promise1, "1"); - assert.equal(await promise2, "2"); + let sentinelRemoveResolve; + const sentinelRemovePromise = new Promise((res) => { + sentinelRemoveResolve = res; }) - - it('multiple clients', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - let set = false; - - const promise = sentinel.use( - async client => { - await sentinel!.set("x", 1); - await client.get("x"); - } - ) - - await assert.doesNotReject(promise); - }); - - // by taking a lease, we know we will block on master as no clients are available, but as read occuring, means replica read occurs - it('replica reads', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.on("error", () => { }); - await sentinel.connect(); - - const clientLease = await sentinel.aquire(); - clientLease.set('x', 456); - - let matched = false; - /* waits for replication */ - for (let i = 0; i < 15; i++) { - try { - assert.equal(await sentinel.get("x"), '456'); - matched = true; - break; - } catch (err) { - await setTimeout(1000); + + const replicaPort = await frame.getRandonNonMasterNode(); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_REMOVE") { + if (event.node.port.toString() == replicaPort) { + tracer.push("got expected replica removed event"); + sentinelRemoveResolve(event.node); + } else { + tracer.push(`got replica removed event for a different node: ${event.node.port}`); } } - - clientLease.release(); - - assert.equal(matched, true); }); - - it('pipeline', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - await sentinel.connect(); - - const resp = await sentinel.multi().set('x', 1).get('x').execAsPipeline(); - - assert.deepEqual(resp, ['OK', '1']); + + tracer.push(`replicaPort = ${replicaPort} and stopping it`); + await frame.stopNode(replicaPort); + tracer.push("stopped replica and waiting on sentinel removed promise"); + const stoppedNode = await sentinelRemovePromise as RedisNode; + tracer.push("got removed promise"); + assert.equal(stoppedNode.port, Number(replicaPort)); + + let sentinelRestartedResolve; + const sentinelRestartedPromise = new Promise((res) => { + sentinelRestartedResolve = res; }) - - it('use - watch - clean', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - let promise = sentinel.use(async (client) => { - await client.set("x", 1); - await client.watch("x"); - return client.multi().get("x").exec(); - }); - - assert.deepEqual(await promise, ['1']); - }); - - it('use - watch - dirty', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - let promise = sentinel.use(async (client) => { - await client.set('x', 1); - await client.watch('x'); - await sentinel!.set('x', 2); - return client.multi().get('x').exec(); - }); - - await assert.rejects(promise, new WatchError()); - }); - - it('lease - watch - clean', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - assert.deepEqual(await leasedClient.multi().get('x').exec(), ['1']) - }); - - it('lease - watch - dirty', async function () { - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - await sentinel.connect(); - - const leasedClient = await sentinel.aquire(); - await leasedClient.set('x', 1); - await leasedClient.watch('x'); - await leasedClient.set('x', 2); - - await assert.rejects(leasedClient.multi().get('x').exec(), new WatchError()); - }); - - - it('watch does not carry through leases', async function () { - this.timeout(10000); - sentinel = frame.getSentinelClient(); - await sentinel.connect(); - - // each of these commands is an independent lease - assert.equal(await sentinel.use(client => client.watch("x")), 'OK') - assert.equal(await sentinel.use(client => client.set('x', 1)), 'OK'); - assert.deepEqual(await sentinel.use(client => client.multi().get('x').exec()), ['1']); - }); - - // stops master to force sentinel to update - it('stop master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push(`connected`); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = await sentinel.getMasterNode(); - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push(`got expected master change event`); - masterChangeResolve(event.node); - } - }); - - tracer.push(`stopping master node`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped master node`); - - tracer.push(`waiting on master change promise`); - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new master node of ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - }); - - // if master changes, client should make sure user knows watches are invalid - it('watch across master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("aquired lease"); - - await client.set("x", 1); - await client.watch("x"); - - tracer.push("did a watch on lease"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("resolving promise"); - resolve(event.node); - } - }); - - tracer.push("stopping master node"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master node and waiting on promise"); - - const newMaster = await promise as RedisNode; - tracer.push(`promise returned, newMaster = ${JSON.stringify(newMaster)}`); - assert.notEqual(masterNode!.port, newMaster.port); - tracer.push(`newMaster does not equal old master`); - - tracer.push(`waiting to assert that a multi/exec now fails`); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push(`asserted that a multi/exec now fails`); - }); - - // same as above, but set a watch before and after master change, shouldn't change the fact that watches are invalid - it('watch before and after master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ masterPoolSize: 2 }); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - const client = await sentinel.aquire(); - tracer.push("got leased client"); - await client.set("x", 1); - await client.watch("x"); - - tracer.push("set and watched x"); - - let resolve; - const promise = new Promise((res) => { - resolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`initial masterPort = ${masterNode!.port} `); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - resolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master"); - - tracer.push("waiting on master change promise"); - const newMaster = await promise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push("watching again, shouldn't matter"); - await client.watch("y"); - - tracer.push("expecting multi to be rejected"); - await assert.rejects(async () => { await client.multi().get("x").exec() }, new Error("sentinel config changed in middle of a WATCH Transaction")); - tracer.push("multi was rejected"); - }); - - it('plain pubsub - channel', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.unsubscribe('test') - tracer.push(`pubishing pubsub message`); - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - it('plain pubsub - pattern', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }); - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - tracer.push(`unsubscribing pubsub listener`); - await sentinel.pUnsubscribe('test*'); - tracer.push(`pubishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - tracer.push(`ensuring pubsub was unsubscribed via an assert`); - assert.equal(tester, false); - }); - - // pubsub continues to work, even with a master change - it('pubsub - channel - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.subscribe('test', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('test', 'hello world'); - tracer.push(`published pubsub message and waiting pn pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.unsubscribe('test') - await sentinel.publish('test', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - it('pubsub - pattern - with master change', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push(`connected`); - - let pubSubResolve; - const pubSubPromise = new Promise((res) => { - pubSubResolve = res; - }) - - let tester = false; - await sentinel.pSubscribe('test*', () => { - tracer.push(`got pubsub message`); - tester = true; - pubSubResolve(1); - }) - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`got masterPort as ${masterNode!.port}`); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - tracer.push("got a master change event that is not the same as before"); - masterChangeResolve(event.node); - } - }); - - tracer.push("stopping master"); - await frame.stopNode(masterNode!.port.toString()); - tracer.push("stopped master and waiting on master change promise"); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got master change port as ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`publishing pubsub message`); - await sentinel.publish('testy', 'hello world'); - tracer.push(`published pubsub message and waiting on pubsub promise`); - await pubSubPromise; - tracer.push(`got pubsub promise`); - assert.equal(tester, true); - - // now unsubscribe - tester = false - await sentinel.pUnsubscribe('test*'); - await sentinel.publish('testy', 'hello world'); - await setTimeout(1000); - - assert.equal(tester, false); - }); - - // if we stop a node, the comand should "retry" until we reconfigure topology and execute on new topology - it('command immeaditely after stopping master', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - - tracer.push("connected"); - - let masterChangeResolve; - const masterChangePromise = new Promise((res) => { - masterChangeResolve = res; - }) - - const masterNode = sentinel.getMasterNode(); - tracer.push(`original master port = ${masterNode!.port}`); - - let changeCount = 0; - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "MASTER_CHANGE" && event.node.port != masterNode!.port) { - changeCount++; - tracer.push(`got topology-change event we expected`); - masterChangeResolve(event.node); - } - }); - - tracer.push(`stopping masterNode`); - await frame.stopNode(masterNode!.port.toString()); - tracer.push(`stopped masterNode`); - assert.equal(await sentinel.set('x', 123), 'OK'); - tracer.push(`did the set operation`); - const presumamblyNewMaster = sentinel.getMasterNode(); - tracer.push(`new master node seems to be ${presumamblyNewMaster?.port} and waiting on master change promise`); - - const newMaster = await masterChangePromise as RedisNode; - tracer.push(`got new masternode event saying master is at ${newMaster.port}`); - assert.notEqual(masterNode!.port, newMaster.port); - - tracer.push(`doing the get`); - const val = await sentinel.get('x'); - tracer.push(`did the get and got ${val}`); - const newestMaster = sentinel.getMasterNode() - tracer.push(`after get, we see master as ${newestMaster?.port}`); - - switch (changeCount) { - case 1: - // if we only changed masters once, we should have the proper value - assert.equal(val, '123'); - break; - case 2: - // we changed masters twice quickly, so probably didn't replicate - // therefore, this is soewhat flakey, but the above is the common case - assert(val == '123' || val == null); - break; - default: - assert(false, "unexpected case"); + + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_ADD") { + tracer.push("got replica added event"); + sentinelRestartedResolve(event.node); } }); - - it('shutdown sentinel node', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient(); - sentinel.setTracer(tracer); - sentinel.on("error", () => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - const sentinelNode = sentinel.getSentinelNode(); - tracer.push(`sentinelNode = ${sentinelNode?.port}`) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINEL_CHANGE") { - tracer.push("got sentinel change event"); - sentinelChangeResolve(event.node); - } - }); - - tracer.push("Stopping sentinel node"); - await frame.stopSentinel(sentinelNode!.port.toString()); - tracer.push("Stopped sentinel node and waiting on sentinel change promise"); - const newSentinel = await sentinelChangePromise as RedisNode; - tracer.push("got sentinel change promise"); - assert.notEqual(sentinelNode!.port, newSentinel.port); - }); - - it('timer works, and updates sentinel list', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ scanInterval: 1000 }); - sentinel.setTracer(tracer); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelChangeResolve; - const sentinelChangePromise = new Promise((res) => { - sentinelChangeResolve = res; - }) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "SENTINE_LIST_CHANGE" && event.size == 4) { - tracer.push(`got sentinel list change event with right size`); - sentinelChangeResolve(event.size); - } - }); - - tracer.push(`adding sentinel`); - await frame.addSentinel(); - tracer.push(`added sentinel and waiting on sentinel change promise`); - const newSentinelSize = await sentinelChangePromise as number; - - assert.equal(newSentinelSize, 4); - }); - - it('stop replica, bring back replica', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); - - let sentinelRemoveResolve; - const sentinelRemovePromise = new Promise((res) => { - sentinelRemoveResolve = res; - }) - - const replicaPort = await frame.getRandonNonMasterNode(); - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_REMOVE") { - if (event.node.port.toString() == replicaPort) { - tracer.push("got expected replica removed event"); - sentinelRemoveResolve(event.node); - } else { - tracer.push(`got replica removed event for a different node: ${event.node.port}`); - } - } - }); - - tracer.push(`replicaPort = ${replicaPort} and stopping it`); - await frame.stopNode(replicaPort); - tracer.push("stopped replica and waiting on sentinel removed promise"); - const stoppedNode = await sentinelRemovePromise as RedisNode; - tracer.push("got removed promise"); - assert.equal(stoppedNode.port, Number(replicaPort)); - - let sentinelRestartedResolve; - const sentinelRestartedPromise = new Promise((res) => { - sentinelRestartedResolve = res; - }) - - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - tracer.push("got replica added event"); - sentinelRestartedResolve(event.node); - } - }); - - tracer.push("restarting replica"); - await frame.restartNode(replicaPort); - tracer.push("restarted replica and waiting on restart promise"); - const restartedNode = await sentinelRestartedPromise as RedisNode; - tracer.push("got restarted promise"); - assert.equal(restartedNode.port, Number(replicaPort)); - }) - - it('add a node / new replica', async function () { - this.timeout(30000); - - sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); - sentinel.setTracer(tracer); - // need to handle errors, as the spawning a new docker node can cause existing connections to time out - sentinel.on('error', err => { }); - await sentinel.connect(); - tracer.push("connected"); - - let nodeAddedResolve: (value: RedisNode) => void; - const nodeAddedPromise = new Promise((res) => { - nodeAddedResolve = res as (value: RedisNode) => void; - }); - - const portSet = new Set(); - for (const port of frame.getAllNodesPort()) { - portSet.add(port); - } - - // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one - sentinel.on('topology-change', (event: RedisSentinelEvent) => { - tracer.push(`got topology-change event: ${JSON.stringify(event)}`); - if (event.type === "REPLICA_ADD") { - if (!portSet.has(event.node.port)) { - tracer.push("got expected replica added event"); - nodeAddedResolve(event.node); - } - } - }); - - tracer.push("adding node"); - await frame.addNode(); - tracer.push("added node and waiting on added promise"); - await nodeAddedPromise; - }) + + tracer.push("restarting replica"); + await frame.restartNode(replicaPort); + tracer.push("restarted replica and waiting on restart promise"); + const restartedNode = await sentinelRestartedPromise as RedisNode; + tracer.push("got restarted promise"); + assert.equal(restartedNode.port, Number(replicaPort)); }) - - describe('Sentinel Factory', function () { - let master: RedisClientType | undefined; - let replica: RedisClientType | undefined; - - beforeEach(async function () { - this.timeout(0); - await frame.getAllRunning(); - - await steadyState(frame); - longestTestDelta = 0; - }) - - afterEach(async function () { - if (this!.currentTest!.state === 'failed') { - console.log(`longest event loop blocked delta: ${longestDelta}`); - console.log(`longest event loop blocked in failing test: ${longestTestDelta}`); - console.log("trace:"); - for (const line of tracer) { - console.log(line); - } - const results = await Promise.all([ - frame.sentinelSentinels(), - frame.sentinelMaster(), - frame.sentinelReplicas() - ]) - console.log(`sentinel sentinels:\n${JSON.stringify(results[0], undefined, '\t')}`); - console.log(`sentinel master:\n${JSON.stringify(results[1], undefined, '\t')}`); - console.log(`sentinel replicas:\n${JSON.stringify(results[2], undefined, '\t')}`); - const { stdout, stderr } = await execAsync("docker ps -a"); - console.log(`docker stdout:\n${stdout}`); - console.log(`docker stderr:\n${stderr}`); - } - tracer.length = 0; - - if (master !== undefined) { - if (master.isOpen) { - master.destroy(); - } - master = undefined; - } - - if (replica !== undefined) { - if (replica.isOpen) { - replica.destroy(); + it('add a node / new replica', async function () { + this.timeout(60000); + + sentinel = frame.getSentinelClient({ scanInterval: 2000, replicaPoolSize: 1 }); + sentinel.setTracer(tracer); + // need to handle errors, as the spawning a new docker node can cause existing connections to time out + sentinel.on('error', err => { }); + await sentinel.connect(); + tracer.push("connected"); + + let nodeAddedResolve: (value: RedisNode) => void; + const nodeAddedPromise = new Promise((res) => { + nodeAddedResolve = res as (value: RedisNode) => void; + }); + + const portSet = new Set(); + for (const port of frame.getAllNodesPort()) { + portSet.add(port); + } + + // "on" and not "once" as due to connection timeouts, can happen multiple times, and want right one + sentinel.on('topology-change', (event: RedisSentinelEvent) => { + tracer.push(`got topology-change event: ${JSON.stringify(event)}`); + if (event.type === "REPLICA_ADD") { + if (!portSet.has(event.node.port)) { + tracer.push("got expected replica added event"); + nodeAddedResolve(event.node); } - replica = undefined; - } - }) - - it('sentinel factory - master', async function () { - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await factory.updateSentinelRootNodes(); - - master = await factory.getMasterClient(); - await master.connect(); - - assert.equal(await master.set("x", 1), 'OK'); - }) - - it('sentinel factory - replica', async function () { - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await factory.updateSentinelRootNodes(); - - const masterNode = await factory.getMasterNode(); - replica = await factory.getReplicaClient(); - const replicaSocketOptions = replica.options?.socket as unknown as RedisTcpSocketOptions | undefined; - assert.notEqual(masterNode.port, replicaSocketOptions?.port) - }) - - it('sentinel factory - bad node', async function () { - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: [{ host: "locahost", port: 1 }] }); - await assert.rejects(factory.updateSentinelRootNodes(), new Error("Couldn't connect to any sentinel node")); - }) - - it('sentinel factory - invalid db name', async function () { - this.timeout(15000); - - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); - } - - const factory = new RedisSentinelFactory({ name: "invalid-name", sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - await assert.rejects(factory.updateSentinelRootNodes(), new Error("ERR No such master with that name")); - }) - - it('sentinel factory - no available nodes', async function () { - this.timeout(15000); - - const sentinelPorts = frame.getAllSentinelsPort(); - const sentinels: Array = []; - - for (const port of sentinelPorts) { - sentinels.push({ host: "localhost", port: port }); } - - const factory = new RedisSentinelFactory({ name: frame.config.sentinelName, sentinelRootNodes: sentinels, sentinelClientOptions: {password: password}, nodeClientOptions: {password: password} }) - - for (const node of frame.getAllNodesPort()) { - await frame.stopNode(node.toString()); - } - - await setTimeout(1000); - - await assert.rejects(factory.getMasterNode(), new Error("Master Node Not Enumerated")); - }) + }); + + tracer.push("adding node"); + await frame.addNode(); + tracer.push("added node and waiting on added promise"); + await nodeAddedPromise; }) - }) + }); }); + + + diff --git a/packages/client/lib/sentinel/index.ts b/packages/client/lib/sentinel/index.ts index 92a87fbb145..73c4fffd070 100644 --- a/packages/client/lib/sentinel/index.ts +++ b/packages/client/lib/sentinel/index.ts @@ -345,9 +345,12 @@ export default class RedisSentinel< key: K, value: V ) { - const proxy = Object.create(this._self); - proxy._commandOptions = Object.create(this._self.#commandOptions ?? null); - proxy._commandOptions[key] = value; + const proxy = Object.create(this); + // Create new commandOptions object with the inherited properties + proxy._self.#commandOptions = { + ...(this._self.#commandOptions || {}), + [key]: value + }; return proxy as RedisSentinelType< M, F, @@ -682,9 +685,10 @@ class RedisSentinelInternal< async #connect() { let count = 0; - while (true) { + while (true) { this.#trace("starting connect loop"); + count+=1; if (this.#destroy) { this.#trace("in #connect and want to destroy") return; @@ -1109,7 +1113,7 @@ class RedisSentinelInternal< this.#trace(`transform: destroying old masters if open`); for (const client of this.#masterClients) { - masterWatches.push(client.isWatching); + masterWatches.push(client.isWatching || client.isDirtyWatch); if (client.isOpen) { client.destroy() @@ -1460,4 +1464,4 @@ export class RedisSentinelFactory extends EventEmitter { } }); } -} +} \ No newline at end of file diff --git a/packages/client/lib/sentinel/test-util.ts b/packages/client/lib/sentinel/test-util.ts index 25dd4c4371a..2bc8bfca900 100644 --- a/packages/client/lib/sentinel/test-util.ts +++ b/packages/client/lib/sentinel/test-util.ts @@ -55,7 +55,7 @@ export interface RedisServerDocker { abstract class DockerBase { async spawnRedisServerDocker({ image, version }: RedisServerDockerConfig, serverArguments: Array, environment?: string): Promise { const port = (await portIterator.next()).value; - let cmdLine = `docker run --init -d --network host `; + let cmdLine = `docker run --init -d --network host -e PORT=${port.toString()} `; if (environment !== undefined) { cmdLine += `-e ${environment} `; } @@ -80,7 +80,7 @@ abstract class DockerBase { async dockerRemove(dockerId: string): Promise { try { - await this.dockerStop(dockerId); `` + await this.dockerStop(dockerId); } catch (err) { // its ok if stop failed, as we are just going to remove, will just be slower console.log(`dockerStop failed in remove: ${err}`); @@ -180,9 +180,14 @@ export class SentinelFramework extends DockerBase { RedisScripts, RespVersions, TypeMapping>>, errors = true) { - if (opts?.sentinelRootNodes !== undefined) { - throw new Error("cannot specify sentinelRootNodes here"); - } + // remove this safeguard + // in order to test the case when + // connecting to sentinel fails + + // if (opts?.sentinelRootNodes !== undefined) { + // throw new Error("cannot specify sentinelRootNodes here"); + // } + if (opts?.name !== undefined) { throw new Error("cannot specify sentinel db name here"); } @@ -245,13 +250,13 @@ export class SentinelFramework extends DockerBase { } protected async spawnRedisSentinelNodeDocker() { - const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redis/redis-stack-server", version: "latest" }; + const imageInfo: RedisServerDockerConfig = this.config.nodeDockerConfig ?? { image: "redislabs/client-libs-test", version: "8.0-M05-pre" }; const serverArguments: Array = this.config.nodeServerArguments ?? []; let environment; if (this.config.password !== undefined) { - environment = `REDIS_ARGS="{port} --requirepass ${this.config.password}"`; + environment = `REDIS_PASSWORD=${this.config.password}`; } else { - environment = 'REDIS_ARGS="{port}"'; + environment = undefined; } const docker = await this.spawnRedisServerDocker(imageInfo, serverArguments, environment); @@ -276,9 +281,6 @@ export class SentinelFramework extends DockerBase { for (let i = 0; i < (this.config.numberOfNodes ?? 0) - 1; i++) { promises.push( this.spawnRedisSentinelNodeDocker().then(async node => { - if (this.config.password !== undefined) { - await node.client.configSet({'masterauth': this.config.password}) - } await node.client.replicaOf('127.0.0.1', master.docker.port); return node; }) @@ -347,34 +349,156 @@ export class SentinelFramework extends DockerBase { ); } - return [ - ...await Promise.all(promises) - ] + const sentinels = await Promise.all(promises); + + await this.validateSentinelConfiguration(sentinels); + + return sentinels; } - async getAllRunning() { + // Add this new method to validate sentinel configuration +private async validateSentinelConfiguration( + sentinels: Awaited>[] +): Promise { + const maxRetries = 10; + const retryDelay = 2000; + + // Wait for all sentinels to recognize each other + for (let retry = 0; retry < maxRetries; retry++) { + try { + let allConfigured = true; + + // Check that each sentinel recognizes all other sentinels + for (const sentinel of sentinels) { + if (!sentinel.client.isReady) { + allConfigured = false; + break; + } + + // Check if this sentinel can see the master + const masterInfo = await sentinel.client.sentinel.sentinelMaster(this.config.sentinelName) + .catch(() => null); + + if (!masterInfo) { + allConfigured = false; + break; + } + + // Check if this sentinel can see other sentinels + const knownSentinels = await sentinel.client.sentinel.sentinelSentinels(this.config.sentinelName) + .catch(() => []); + + // Ensure this sentinel knows about all other sentinels (minus itself) + if (knownSentinels.length < sentinels.length - 1) { + allConfigured = false; + break; + } + } + + if (allConfigured) { + // All sentinels are properly configured + return; + } + + // Wait before retrying + await setTimeout(retryDelay); + } catch (err) { + // Wait before retrying after an error + await setTimeout(retryDelay); + } + } + + throw new Error('Sentinel configuration did not propagate correctly within the timeout period'); +} + + async getAllRunning(): Promise { + const MAX_RETRIES = 5; + const RETRY_DELAY = 500; + + // Fix for Redis nodes for (const port of this.getAllNodesPort()) { - let first = true; - while (await isPortAvailable(port)) { - if (!first) { - console.log(`problematic restart ${port}`); - await setTimeout(500); - } else { - first = false; + let retries = 0; + + while (await isPortAvailable(port)) { + if (retries >= MAX_RETRIES) { + throw new Error(`Failed to restart Redis node at port ${port} after ${MAX_RETRIES} attempts`); } - await this.restartNode(port.toString()); + + try { + await this.restartNode(port.toString()); + await setTimeout(RETRY_DELAY); // Give the node time to start + } catch (err) { + console.error(`Error restarting Redis node at port ${port}:`, err); + } + + retries++; } } + // Fix for Sentinel nodes for (const port of this.getAllSentinelsPort()) { - let first = true; - while (await isPortAvailable(port)) { - if (!first) { - await setTimeout(500); - } else { - first = false; + let retries = 0; + + while (await isPortAvailable(port)) { + if (retries >= MAX_RETRIES) { + throw new Error(`Failed to restart Sentinel node at port ${port} after ${MAX_RETRIES} attempts`); + } + + try { + await this.restartSentinel(port.toString()); + await setTimeout(RETRY_DELAY); // Give the sentinel time to start + } catch (err) { + console.error(`Error restarting Sentinel at port ${port}:`, err); + } + + retries++; + } + } + + // Verify all nodes are actually responsive + await this.verifyNodesResponsive(); + } + + // Add a method to verify nodes are responsive + private async verifyNodesResponsive(): Promise { + const MAX_ATTEMPTS = 10; + const ATTEMPT_DELAY = 2000; + + // Check Redis nodes + for (const nodeInfo of this.#nodeMap.values()) { + if (!nodeInfo.client.isReady) { + // Try to reconnect client if not ready + for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + await nodeInfo.client.connect(); + await nodeInfo.client.ping(); + break; + } catch (err) { + if (attempt === MAX_ATTEMPTS - 1) { + throw new Error(`Node at port ${nodeInfo.docker.port} is not responsive after ${MAX_ATTEMPTS} attempts`); + } + await setTimeout(ATTEMPT_DELAY); + } + } + } + } + + // Check Sentinel nodes + for (const sentinelInfo of this.#sentinelMap.values()) { + if (!sentinelInfo.client.isReady) { + // Try to reconnect client if not ready + for (let attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + try { + await sentinelInfo.client.connect(); + await sentinelInfo.client.ping(); + break; + } catch (err) { + if (attempt === MAX_ATTEMPTS - 1) { + throw new Error(`Sentinel at port ${sentinelInfo.docker.port} is not responsive after ${MAX_ATTEMPTS} attempts`); + } + await setTimeout(ATTEMPT_DELAY); + } } - await this.restartSentinel(port.toString()); } } } @@ -401,9 +525,6 @@ export class SentinelFramework extends DockerBase { const masterPort = await this.getMasterPort(); const newNode = await this.spawnRedisSentinelNodeDocker(); - if (this.config.password !== undefined) { - await newNode.client.configSet({'masterauth': this.config.password}) - } await newNode.client.replicaOf('127.0.0.1', masterPort); this.#nodeList.push(newNode); @@ -487,8 +608,16 @@ export class SentinelFramework extends DockerBase { if (node === undefined) { throw new Error("unknown node: " + id); } - + + let masterPort: number | null = null; + try { + masterPort = await this.getMasterPort(); + } catch (err) { + console.log(`Could not determine master before restarting node ${id}: ${err}`); + } + await this.dockerStart(node.docker.dockerId); + if (!node.client.isOpen) { node.client = await RedisClient.create({ password: this.config.password, @@ -497,6 +626,17 @@ export class SentinelFramework extends DockerBase { } }).on("error", () => { }).connect(); } + + // Wait for node to be ready + await setTimeout(500); + + if (masterPort && node.docker.port !== masterPort) { + try { + await node.client.replicaOf('127.0.0.1', masterPort); + } catch (err) { + console.error(`Failed to reconfigure node ${id} as replica: ${err}`); + } + } } async stopSentinel(id: string) { diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index f7862a9d685..2b27e702050 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -2,9 +2,10 @@ import TestUtils from '@redis/test-utils'; import { SinonSpy } from 'sinon'; import { setTimeout } from 'node:timers/promises'; import { CredentialsProvider } from './authx'; -import { Command } from './RESP/types'; -import { BasicCommandParser } from './client/parser'; - +import { Command, NumberReply } from './RESP/types'; +import { BasicCommandParser, CommandParser } from './client/parser'; +import { defineScript } from './lua-script'; +import RedisBloomModules from '@redis/bloom'; const utils = TestUtils.createFromConfig({ dockerImageName: 'redislabs/client-libs-test', dockerImageVersionArgument: 'redis-version', @@ -42,6 +43,45 @@ const streamingCredentialsProvider: CredentialsProvider = } as const; +const SQUARE_SCRIPT = defineScript({ + SCRIPT: + `local number = redis.call('GET', KEYS[1]) + return number * number`, + NUMBER_OF_KEYS: 1, + FIRST_KEY_INDEX: 0, + parseCommand(parser: CommandParser, key: string) { + parser.pushKey(key); + }, + transformReply: undefined as unknown as () => NumberReply +}); + +export const MATH_FUNCTION = { + name: 'math', + engine: 'LUA', + code: + `#!LUA name=math + redis.register_function { + function_name = "square", + callback = function(keys, args) + local number = redis.call('GET', keys[1]) + return number * number + end, + flags = { "no-writes" } + }`, + library: { + square: { + NAME: 'square', + IS_READ_ONLY: true, + NUMBER_OF_KEYS: 1, + FIRST_KEY_INDEX: 0, + parseCommand(parser: CommandParser, key: string) { + parser.pushKey(key); + }, + transformReply: undefined as unknown as () => NumberReply + } + } +}; + export const GLOBAL = { SERVERS: { OPEN: { @@ -86,6 +126,51 @@ export const GLOBAL = { useReplicas: true } } + }, + SENTINEL: { + OPEN: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + }, + PASSWORD: { + serverArguments: [...DEBUG_MODE_ARGS], + password: 'test_password', + }, + WITH_SCRIPT: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + scripts: { + square: SQUARE_SCRIPT, + }, + }, + WITH_FUNCTION: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + functions: { + math: MATH_FUNCTION.library, + }, + }, + WITH_MODULE: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + modules: RedisBloomModules, + }, + WITH_REPLICA_POOL_SIZE_1: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + replicaPoolSize: 1, + }, + WITH_RESERVE_CLIENT_MASTER_POOL_SIZE_2: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + masterPoolSize: 2, + reserveClient: true, + }, + WITH_MASTER_POOL_SIZE_2: { + serverArguments: [...DEBUG_MODE_ARGS], + password: undefined, + masterPoolSize: 2, + } } }; diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index e3ff5edc38b..88a707bff70 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -4,9 +4,11 @@ import { once } from 'node:events'; import { createClient } from '@redis/client/index'; import { setTimeout } from 'node:timers/promises'; // import { ClusterSlotsReply } from '@redis/client/dist/lib/commands/CLUSTER_SLOTS'; - import { execFile as execFileCallback } from 'node:child_process'; import { promisify } from 'node:util'; +import * as fs from 'node:fs'; +import * as os from 'node:os'; +import * as path from 'node:path'; const execAsync = promisify(execFileCallback); @@ -38,37 +40,68 @@ const portIterator = (async function* (): AsyncIterableIterator { throw new Error('All ports are in use'); })(); -export interface RedisServerDockerConfig { +interface RedisServerDockerConfig { image: string; version: string; } +interface SentinelConfig { + mode: "sentinel"; + mounts: Array; + port: number; +} + +interface ServerConfig { + mode: "server"; +} + +export type RedisServerDockerOptions = RedisServerDockerConfig & (SentinelConfig | ServerConfig) + export interface RedisServerDocker { port: number; dockerId: string; } -async function spawnRedisServerDocker({ - image, - version -}: RedisServerDockerConfig, serverArguments: Array): Promise { - const port = (await portIterator.next()).value; +async function spawnRedisServerDocker( +options: RedisServerDockerOptions, serverArguments: Array, dockerEnv?: Map): Promise { + let port; + if (options.mode == "sentinel") { + port = options.port; + } else { + port = (await portIterator.next()).value; + } + const portStr = port.toString(); const dockerArgs = [ 'run', - '-e', `PORT=${portStr}`, + '--init', + '-e', `PORT=${portStr}` + ]; + + if (options.mode == "sentinel") { + options.mounts.forEach(mount => { + dockerArgs.push('-v', mount); + }); + } + + dockerEnv?.forEach((key: string, value: string) => { + dockerArgs.push('-e', `${key}:${value}`); + }); + + dockerArgs.push( '-d', '--network', 'host', - `${image}:${version}`, - '--port', portStr - ]; + `${options.image}:${options.version}` + ); if (serverArguments.length > 0) { - dockerArgs.push(...serverArguments); + for (let i = 0; i < serverArguments.length; i++) { + dockerArgs.push(serverArguments[i]) + } } - console.log(`[Docker] Spawning Redis container - Image: ${image}:${version}, Port: ${port}`); + console.log(`[Docker] Spawning Redis container - Image: ${options.image}:${options.version}, Port: ${port}, Mode: ${options.mode}`); const { stdout, stderr } = await execAsync('docker', dockerArgs); @@ -87,7 +120,7 @@ async function spawnRedisServerDocker({ } const RUNNING_SERVERS = new Map, ReturnType>(); -export function spawnRedisServer(dockerConfig: RedisServerDockerConfig, serverArguments: Array): Promise { +export function spawnRedisServer(dockerConfig: RedisServerDockerOptions, serverArguments: Array): Promise { const runningServer = RUNNING_SERVERS.get(serverArguments); if (runningServer) { return runningServer; @@ -113,7 +146,7 @@ after(() => { ); }); -export interface RedisClusterDockersConfig extends RedisServerDockerConfig { +export type RedisClusterDockersConfig = RedisServerDockerOptions & { numberOfMasters?: number; numberOfReplicas?: number; } @@ -178,7 +211,7 @@ async function spawnRedisClusterNodeDockers( } async function spawnRedisClusterNodeDocker( - dockersConfig: RedisClusterDockersConfig, + dockersConfig: RedisServerDockerOptions, serverArguments: Array, clientConfig?: Partial ) { @@ -291,3 +324,109 @@ after(() => { }) ); }); + + +const RUNNING_NODES = new Map, Array>(); +const RUNNING_SENTINELS = new Map, Array>(); + +export async function spawnRedisSentinel( + dockerConfigs: RedisServerDockerOptions, + serverArguments: Array, + password?: string, +): Promise> { + const runningNodes = RUNNING_SENTINELS.get(serverArguments); + if (runningNodes) { + return runningNodes; + } + + const dockerEnv: Map = new Map(); + + if (password !== undefined) { + dockerEnv.set("REDIS_PASSWORD", password); + serverArguments.push("--requirepass", password) + } + + const master = await spawnRedisServerDocker(dockerConfigs, serverArguments, dockerEnv); + const redisNodes: Array = [master]; + const replicaPromises: Array> = []; + + const replicasCount = 2; + for (let i = 0; i < replicasCount; i++) { + replicaPromises.push((async () => { + const replica = await spawnRedisServerDocker(dockerConfigs, serverArguments, dockerEnv); + const client = createClient({ + socket: { + port: replica.port + }, + password: password + }); + + await client.connect(); + await client.replicaOf("127.0.0.1", master.port); + await client.close(); + + return replica; + })()); + } + + const replicas = await Promise.all(replicaPromises); + redisNodes.push(...replicas); + RUNNING_NODES.set(serverArguments, redisNodes); + + const sentinelPromises: Array> = []; + const sentinelCount = 3; + + const appPrefix = 'sentinel-config-dir'; + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), appPrefix)); + + for (let i = 0; i < sentinelCount; i++) { + sentinelPromises.push((async () => { + const port = (await portIterator.next()).value; + + let sentinelConfig = `port ${port} +sentinel monitor mymaster 127.0.0.1 ${master.port} 2 +sentinel down-after-milliseconds mymaster 5000 +sentinel failover-timeout mymaster 6000 +`; + if (password !== undefined) { + sentinelConfig += `requirepass ${password}\n`; + sentinelConfig += `sentinel auth-pass mymaster ${password}\n`; + } + + const dir = fs.mkdtempSync(path.join(tmpDir, i.toString())); + fs.writeFile(`${dir}/redis.conf`, sentinelConfig, err => { + if (err) { + console.error("failed to create temporary config file", err); + } + }); + + return await spawnRedisServerDocker( + { + image: dockerConfigs.image, + version: dockerConfigs.version, + mode: "sentinel", + mounts: [`${dir}/redis.conf:/redis/config/node-sentinel-1/redis.conf`], + port: port, + }, serverArguments, dockerEnv); + })()); + } + + const sentinelNodes = await Promise.all(sentinelPromises); + RUNNING_SENTINELS.set(serverArguments, sentinelNodes); + + if (tmpDir) { + fs.rmSync(tmpDir, { recursive: true }); + } + + return sentinelNodes; +} + +after(() => { + return Promise.all( + [...RUNNING_NODES.values(), ...RUNNING_SENTINELS.values()].map(async dockersPromise => { + return Promise.all( + dockersPromise.map(({ dockerId }) => dockerRemove(dockerId)) + ); + }) + ); +}); diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 1c564749ff2..bf3c01d627a 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -6,8 +6,11 @@ import { TypeMapping, // CommandPolicies, createClient, + createSentinel, RedisClientOptions, RedisClientType, + RedisSentinelOptions, + RedisSentinelType, RedisPoolOptions, RedisClientPoolType, createClientPool, @@ -15,7 +18,8 @@ import { RedisClusterOptions, RedisClusterType } from '@redis/client/index'; -import { RedisServerDockerConfig, spawnRedisServer, spawnRedisCluster } from './dockers'; +import { RedisNode } from '@redis/client/lib/sentinel/types' +import { spawnRedisServer, spawnRedisCluster, spawnRedisSentinel, RedisServerDockerOptions } from './dockers'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; @@ -68,6 +72,25 @@ interface ClientTestOptions< disableClientSetup?: boolean; } +interface SentinelTestOptions< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts, + RESP extends RespVersions, + TYPE_MAPPING extends TypeMapping +> extends CommonTestOptions { + sentinelOptions?: Partial>; + clientOptions?: Partial>; + scripts?: S; + functions?: F; + modules?: M; + password?: string; + disableClientSetup?: boolean; + replicaPoolSize?: number; + masterPoolSize?: number; + reserveClient?: boolean; +} + interface ClientPoolTestOptions< M extends RedisModules, F extends RedisFunctions, @@ -148,13 +171,14 @@ export default class TestUtils { } readonly #VERSION_NUMBERS: Array; - readonly #DOCKER_IMAGE: RedisServerDockerConfig; + readonly #DOCKER_IMAGE: RedisServerDockerOptions; constructor({ string, numbers }: Version, dockerImageName: string) { this.#VERSION_NUMBERS = numbers; this.#DOCKER_IMAGE = { image: dockerImageName, - version: string + version: string, + mode: "server" }; } @@ -179,7 +203,6 @@ export default class TestUtils { } isVersionGreaterThanHook(minimumVersion: Array | undefined): void { - const isVersionGreaterThanHook = this.isVersionGreaterThan.bind(this); const versionNumber = this.#VERSION_NUMBERS.join('.'); const minimumVersionString = minimumVersion?.join('.'); @@ -274,6 +297,76 @@ export default class TestUtils { }); } + testWithClientSentinel< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} + >( + title: string, + fn: (sentinel: RedisSentinelType) => unknown, + options: SentinelTestOptions + ): void { + let dockerPromises: ReturnType; + + if (this.isVersionGreaterThan(options.minimumDockerVersion)) { + const dockerImage = this.#DOCKER_IMAGE; + before(function () { + this.timeout(30000); + + dockerPromises = spawnRedisSentinel(dockerImage, options.serverArguments, options?.password); + return dockerPromises; + }); + } + + it(title, async function () { + this.timeout(30000); + if (options.skipTest) return this.skip(); + if (!dockerPromises) return this.skip(); + + + const promises = await dockerPromises; + const rootNodes: Array = promises.map(promise => ({ + host: "127.0.0.1", + port: promise.port + })); + + const sentinel = createSentinel({ + name: 'mymaster', + sentinelRootNodes: rootNodes, + nodeClientOptions: { + password: options?.password || undefined, + }, + sentinelClientOptions: { + password: options?.password || undefined, + }, + replicaPoolSize: options?.replicaPoolSize || 0, + scripts: options?.scripts || {}, + modules: options?.modules || {}, + functions: options?.functions || {}, + masterPoolSize: options?.masterPoolSize || undefined, + reserveClient: options?.reserveClient || false, + }) as RedisSentinelType; + + if (options.disableClientSetup) { + return fn(sentinel); + } + + await sentinel.connect(); + + try { + await sentinel.flushAll(); + await fn(sentinel); + } finally { + if (sentinel.isOpen) { + await sentinel.flushAll(); + sentinel.destroy(); + } + } + }); + } + testWithClientIfVersionWithinRange< M extends RedisModules = {}, F extends RedisFunctions = {}, @@ -292,8 +385,27 @@ export default class TestUtils { } else { console.warn(`Skipping test ${title} because server version ${this.#VERSION_NUMBERS.join('.')} is not within range ${range[0].join(".")} - ${range[1] !== 'LATEST' ? range[1].join(".") : 'LATEST'}`) } + } + testWithClienSentineltIfVersionWithinRange< + M extends RedisModules = {}, + F extends RedisFunctions = {}, + S extends RedisScripts = {}, + RESP extends RespVersions = 2, + TYPE_MAPPING extends TypeMapping = {} +>( + range: ([minVersion: Array, maxVersion: Array] | [minVersion: Array, 'LATEST']), + title: string, + fn: (sentinel: RedisSentinelType) => unknown, + options: SentinelTestOptions +): void { + + if (this.isVersionInRange(range[0], range[1] === 'LATEST' ? [Infinity, Infinity, Infinity] : range[1])) { + return this.testWithClientSentinel(`${title} [${range[0].join('.')}] - [${(range[1] === 'LATEST') ? range[1] : range[1].join(".")}] `, fn, options) + } else { + console.warn(`Skipping test ${title} because server version ${this.#VERSION_NUMBERS.join('.')} is not within range ${range[0].join(".")} - ${range[1] !== 'LATEST' ? range[1].join(".") : 'LATEST'}`) } +} testWithClientPool< M extends RedisModules = {},