From 7a2876d79aee52b0ec35899dd1c4f66154ee1d03 Mon Sep 17 00:00:00 2001 From: Tuan Le <23419763+tumile@users.noreply.github.com> Date: Wed, 8 Jul 2020 12:06:13 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=95=B8=EF=B8=8F=20Implement=20cluster=20c?= =?UTF-8?q?ommands=20(#110)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 +++ command.ts | 32 +++++++++-- redis.ts | 105 ++++++++++++++++++++++++++++++++- redis_test.ts | 1 + testdata/redis.conf | 6 ++ tests/cluster_test.ts | 131 ++++++++++++++++++++++++++++++++++++++++++ tests/test_util.ts | 119 ++++++++++++++++++++++++++++++++++---- 7 files changed, 387 insertions(+), 16 deletions(-) create mode 100644 testdata/redis.conf create mode 100644 tests/cluster_test.ts diff --git a/README.md b/README.md index cd722061..e047d92a 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,15 @@ const plz = msgFV.get("yes"); const thx = msgFV.get("no"); ``` +**Cluster** + +```ts +await redis.meet("127.0.0.1", 6380); +await redis.nodes(); +// ... 127.0.0.1:6379@16379 myself,master - 0 1593978765000 0 connected +// ... 127.0.0.1:6380@16380 master - 0 1593978766503 1 connected +``` + ## Advanced Usage ### Retriable connection diff --git a/command.ts b/command.ts index a357f435..3b0f787f 100644 --- a/command.ts +++ b/command.ts @@ -291,9 +291,6 @@ export type RedisCommands = { pubsub_channels(pattern: string): Promise; pubsub_numsubs(...channels: string[]): Promise<[BulkString, Integer][]>; pubsub_numpat(): Promise; - // Cluster - readonly(): Promise; - readwrite(): Promise; // Set sadd(key: string, ...members: string[]): Promise; scard(key: string): Promise; @@ -505,7 +502,7 @@ XGROUP DESTROY test-man-000 test-group
 XGROUP SETID mystream consumer-group-name 0
 
- * + * * @param key stream key * @param groupName the consumer group * @param xid the XId to use for the next message delivered @@ -768,7 +765,32 @@ XRANGE somestream - + }, ): Promise; // Cluster - // cluster // + cluster_addslots(...slots: number[]): Promise; + cluster_countfailurereports(node_id: string): Promise; + cluster_countkeysinslot(slot: number): Promise; + cluster_delslots(...slots: number[]): Promise; + cluster_failover(opt?: "FORCE" | "TAKEOVER"): Promise; + cluster_flushslots(): Promise; + cluster_forget(node_id: string): Promise; + cluster_getkeysinslot(slot: number, count: number): Promise; + cluster_info(): Promise; + cluster_keyslot(key: string): Promise; + cluster_meet(ip: string, port: number): Promise; + cluster_myid(): Promise; + cluster_nodes(): Promise; + cluster_replicas(node_id: string): Promise; + cluster_replicate(node_id: string): Promise; + cluster_reset(opt?: "HARD" | "SOFT"): Promise; + cluster_saveconfig(): Promise; + cluster_setslot( + slot: number, + subcommand: "IMPORTING" | "MIGRATING" | "NODE" | "STABLE", + node_id?: string, + ): Promise; + cluster_slaves(node_id: string): Promise; + cluster_slots(): Promise; + readonly(): Promise; + readwrite(): Promise; // Server acl_cat(parameter?: string): Promise; acl_deluser(parameter: string): Promise; diff --git a/redis.ts b/redis.ts index 4aac9f33..2c207ade 100644 --- a/redis.ts +++ b/redis.ts @@ -293,6 +293,110 @@ class RedisImpl implements RedisCommands { } } + cluster_addslots(...slots: number[]): Promise { + return this.execStatusReply("CLUSTER", "ADDSLOTS", ...slots); + } + + cluster_countfailurereports(node_id: string): Promise { + return this.execIntegerReply("CLUSTER", "COUNT-FAILURE-REPORTS", node_id); + } + + cluster_countkeysinslot(slot: number): Promise { + return this.execIntegerReply("CLUSTER", "COUNTKEYSINSLOT", slot); + } + + cluster_delslots(...slots: number[]): Promise { + return this.execStatusReply("CLUSTER", "DELSLOTS", ...slots); + } + + cluster_failover(opt?: "FORCE" | "TAKEOVER"): Promise { + if (opt) { + return this.execStatusReply("CLUSTER", "FAILOVER", opt); + } + return this.execStatusReply("CLUSTER", "FAILOVER"); + } + + cluster_flushslots(): Promise { + return this.execStatusReply("CLUSTER", "FLUSHSLOTS"); + } + + cluster_forget(node_id: string): Promise { + return this.execStatusReply("CLUSTER", "FORGET", node_id); + } + + cluster_getkeysinslot(slot: number, count: number): Promise { + return this.execArrayReply( + "CLUSTER", + "GETKEYSINSLOT", + slot, + count, + ); + } + + cluster_info(): Promise { + return this.execStatusReply("CLUSTER", "INFO"); + } + + cluster_keyslot(key: string): Promise { + return this.execIntegerReply("CLUSTER", "KEYSLOT", key); + } + + cluster_meet(ip: string, port: number): Promise { + return this.execStatusReply("CLUSTER", "MEET", ip, port); + } + + cluster_myid(): Promise { + return this.execStatusReply("CLUSTER", "MYID"); + } + + cluster_nodes(): Promise { + return this.execBulkReply("CLUSTER", "NODES"); + } + + cluster_replicas(node_id: string): Promise { + return this.execArrayReply("CLUSTER", "REPLICAS", node_id); + } + + cluster_replicate(node_id: string): Promise { + return this.execStatusReply("CLUSTER", "REPLICATE", node_id); + } + + cluster_reset(opt?: "HARD" | "SOFT"): Promise { + if (opt) { + return this.execStatusReply("CLUSTER", "RESET", opt); + } + return this.execStatusReply("CLUSTER", "RESET"); + } + + cluster_saveconfig(): Promise { + return this.execStatusReply("CLUSTER", "SAVECONFIG"); + } + + cluster_setslot( + slot: number, + subcommand: "IMPORTING" | "MIGRATING" | "NODE" | "STABLE", + node_id?: string, + ): Promise { + if (node_id) { + return this.execStatusReply( + "CLUSTER", + "SETSLOT", + slot, + subcommand, + node_id, + ); + } + return this.execStatusReply("CLUSTER", "SETSLOT", slot, subcommand); + } + + cluster_slaves(node_id: string): Promise { + return this.execArrayReply("CLUSTER", "SLAVES", node_id); + } + + cluster_slots(): Promise { + return this.execArrayReply("CLUSTER", "SLOTS"); + } + command() { return this.execArrayReply("COMMAND") as Promise< [BulkString, Integer, BulkString[], Integer, Integer, Integer] @@ -2036,7 +2140,6 @@ export async function connect({ ); await connection.connect(); - return new RedisImpl(connection); } diff --git a/redis_test.ts b/redis_test.ts index 50b4b317..5d7dc24e 100644 --- a/redis_test.ts +++ b/redis_test.ts @@ -10,3 +10,4 @@ import "./tests/string_test.ts"; import "./tests/key_test.ts"; import "./tests/stream_test.ts"; import "./tests/acl_cmd_test.ts"; +import "./tests/cluster_test.ts"; diff --git a/testdata/redis.conf b/testdata/redis.conf new file mode 100644 index 00000000..66ffd813 --- /dev/null +++ b/testdata/redis.conf @@ -0,0 +1,6 @@ +# Base configuration +daemonize no +appendonly yes +cluster-config-file nodes.conf +cluster-node-timeout 30000 +maxclients 1001 diff --git a/tests/cluster_test.ts b/tests/cluster_test.ts new file mode 100644 index 00000000..f060379e --- /dev/null +++ b/tests/cluster_test.ts @@ -0,0 +1,131 @@ +import { + assert, + assertEquals, + assertStringContains, +} from "../vendor/https/deno.land/std/testing/asserts.ts"; +import { newClient, startRedis, stopRedis, TestSuite } from "./test_util.ts"; + +const suite = new TestSuite("cluster"); + +const s7000 = await startRedis({ port: 7000, clusterEnabled: true }); +const s7001 = await startRedis({ port: 7001, clusterEnabled: true }); +const client = await newClient(7000); + +suite.afterAll(() => { + stopRedis(s7000); + stopRedis(s7001); + client.close(); +}); + +suite.test("addslots", async () => { + await client.cluster_flushslots(); + assertEquals(await client.cluster_addslots(1, 2, 3), "OK"); +}); + +suite.test("myid", async () => { + assert(!!(await client.cluster_myid())); +}); + +suite.test("countfailurereports", async () => { + const node_id = await client.cluster_myid(); + assertEquals(await client.cluster_countfailurereports(node_id), 0); +}); + +suite.test("countkeysinslot", async () => { + assertEquals(await client.cluster_countkeysinslot(1), 0); +}); + +suite.test("delslots", async () => { + await client.cluster_flushslots(); + assertEquals(await client.cluster_delslots(1, 2, 3), "OK"); +}); + +suite.test("getkeysinslot", async () => { + assertEquals(await client.cluster_getkeysinslot(1, 1), []); +}); + +suite.test("flushslots", async () => { + assertEquals(await client.cluster_flushslots(), "OK"); +}); + +suite.test("info", async () => { + assertStringContains(await client.cluster_info(), "cluster_state"); +}); + +suite.test("keyslot", async () => { + assertEquals(await client.cluster_keyslot("somekey"), 11058); +}); + +suite.test("meet", async () => { + assertEquals(await client.cluster_meet("127.0.0.1", 7001), "OK"); +}); + +suite.test("nodes", async () => { + const node_id = await client.cluster_myid(); + const nodes = await client.cluster_nodes(); + assertStringContains(nodes, node_id); +}); + +suite.test("replicas", async () => { + const node_id = await client.cluster_myid(); + assertEquals(await client.cluster_replicas(node_id), []); +}); + +suite.test("slaves", async () => { + const node_id = await client.cluster_myid(); + assertEquals(await client.cluster_slaves(node_id), []); +}); + +suite.test("forget", async () => { + const node_id = await client.cluster_myid(); + const other_node = (await client.cluster_nodes()) + .split("\n") + .find((n) => !n.startsWith(node_id)) + ?.split(" ")[0]; + if (other_node) { + assertEquals(await client.cluster_forget(other_node), "OK"); + } +}); + +suite.test("saveconfig", async () => { + assertEquals(await client.cluster_saveconfig(), "OK"); +}); + +suite.test("setslot", async () => { + const node_id = await client.cluster_myid(); + assertEquals(await client.cluster_setslot(1, "NODE", node_id), "OK"); + assertEquals(await client.cluster_setslot(1, "MIGRATING", node_id), "OK"); + assertEquals(await client.cluster_setslot(1, "STABLE"), "OK"); +}); + +suite.test("slots", async () => { + assert(Array.isArray(await client.cluster_slots())); +}); + +suite.test("replicate", async () => { + const node_id = await client.cluster_myid(); + const other_node = (await client.cluster_nodes()) + .split("\n") + .find((n) => !n.startsWith(node_id)) + ?.split(" ")[0]; + if (other_node) { + assertEquals(await client.cluster_replicate(other_node), "OK"); + } +}); + +suite.test("failover", async () => { + const node_id = await client.cluster_myid(); + const other_node = (await client.cluster_nodes()) + .split("\n") + .find((n) => !n.startsWith(node_id)) + ?.split(" ")[0]; + if (other_node) { + assertEquals(await client.cluster_failover(), "OK"); + } +}); + +suite.test("reset", async () => { + assertEquals(await client.cluster_reset(), "OK"); +}); + +await suite.runTests(); diff --git a/tests/test_util.ts b/tests/test_util.ts index 76b7d0af..db7084c6 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -1,6 +1,104 @@ -import { Redis, connect, RedisConnectOptions } from "../redis.ts"; -import { assert } from "../vendor/https/deno.land/std/testing/asserts.ts"; +import { connect, Redis, RedisConnectOptions } from "../redis.ts"; import { delay } from "../vendor/https/deno.land/std/async/mod.ts"; +import { assert } from "../vendor/https/deno.land/std/testing/asserts.ts"; + +type TestFunc = () => void | Promise; +type TestServer = { + path: string; + process: Deno.Process; +}; + +export class TestSuite { + private tests: { name: string; func: TestFunc }[] = []; + private beforeEachs: TestFunc[] = []; + private afterAlls: TestFunc[] = []; + + constructor(private prefix: string) {} + + beforeEach(func: TestFunc): void { + this.beforeEachs.push(func); + } + + afterAll(func: TestFunc): void { + this.afterAlls.push(func); + } + + test(name: string, func: TestFunc): void { + this.tests.push({ name, func }); + } + + runTests = async (): Promise => { + const promises: Promise[] = []; + + this.tests.forEach((test) => { + const promise = test.func(); + promises.push(Promise.resolve(promise)); + + Deno.test(`[${this.prefix}] ${test.name}`, () => { + this.beforeEachs.forEach(async (f) => await f()); + return promise; + }); + }); + + try { + await Promise.allSettled(promises); + } finally { + this.afterAlls.forEach(async (f) => await f()); + } + }; +} + +const encoder = new TextEncoder(); + +export async function startRedis({ + port = 6379, + clusterEnabled = false, +}): Promise { + const path = `testdata/${port}`; + + if (!(await exists(path))) { + Deno.mkdirSync(path); + Deno.copyFileSync(`testdata/redis.conf`, `${path}/redis.conf`); + + let config = `dir ${path}\nport ${port}\n`; + config += clusterEnabled ? "cluster-enabled yes" : ""; + + Deno.writeFileSync(`${path}/redis.conf`, encoder.encode(config), { + append: true, + }); + } + + const process = Deno.run({ + cmd: ["redis-server", `testdata/${port}/redis.conf`], + stdin: "null", + stdout: "null", + }); + + // Ample time for server to finish startup + await delay(500); + return { path, process }; +} + +export function stopRedis(server: TestServer): void { + Deno.removeSync(server.path, { recursive: true }); + server.process.close(); +} + +export function newClient(port: number): Promise { + return connect({ hostname: "127.0.0.1", port }); +} + +async function exists(path: string): Promise { + try { + await Deno.stat(path); + return true; + } catch (err) { + if (err instanceof Deno.errors.NotFound) { + return false; + } + throw err; + } +} function* dbIndex() { let i = 0; @@ -10,10 +108,12 @@ function* dbIndex() { if (i > 15) i = 0; } } + const it = dbIndex(); + function db(): number { const { value } = it.next(); - assert(value != undefined); + assert(value !== undefined); return value; } @@ -41,17 +141,16 @@ export async function makeTest( interface StartRedisServerOptions { port: number; } + export async function startRedisServer( options: StartRedisServerOptions, ): Promise { const { port } = options; - const process = Deno.run( - { - cmd: ["redis-server", "--port", port.toString()], - stdin: "null", - stdout: "null", - }, - ); + const process = Deno.run({ + cmd: ["redis-server", "--port", port.toString()], + stdin: "null", + stdout: "null", + }); await waitForPort(port); return process; }