From cdedf0ed65e69ebe076f96a7d54336b2a35d7c5c Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 25 Jun 2021 01:10:30 +0900 Subject: [PATCH 01/64] WIP: Basic implementation for Redis Cluster client --- command.ts | 1 + experimental/cluster/mod.ts | 278 ++++++++++++++++++++++++++++++++++++ redis.ts | 4 + 3 files changed, 283 insertions(+) create mode 100644 experimental/cluster/mod.ts diff --git a/command.ts b/command.ts index cd306b6f..8e55ff8f 100644 --- a/command.ts +++ b/command.ts @@ -1101,6 +1101,7 @@ XRANGE somestream - + clientUnpause(): Promise; // Cluster + asking(): Promise; clusterAddSlots(...slots: number[]): Promise; clusterCountFailureReports(node_id: string): Promise; clusterCountKeysInSlot(slot: number): Promise; diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts new file mode 100644 index 00000000..a8190a06 --- /dev/null +++ b/experimental/cluster/mod.ts @@ -0,0 +1,278 @@ +/** + * Base on https://github.com/antirez/redis-rb-cluster which is licensed as follows: + * + * Copyright (C) 2013 Salvatore Sanfilippo + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import { connect, RedisImpl } from "../../redis.ts"; +import { Connection } from "../../connection.ts"; +import type { Redis } from "../../redis.ts"; +import uniqBy from "https://cdn.skypack.dev/lodash.uniqby@4.5.0"; // TODO: Import `lodash.uniqby` from `vendor` directory +import type { RedisReply, RedisValue } from "../../protocol/mod.ts"; +import calculateSlot from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; +import shuffle from "https://cdn.skypack.dev/lodash.shuffle@4.2.0"; +import sample from "https://cdn.skypack.dev/lodash.sample@4.2.1"; +import { ErrorReplyError } from "../../errors.ts"; + +export interface ClusterConnectOptions { + nodes: Array; + maxConnections: number; +} + +export interface NodeOptions { + hostname: string; + port?: number; +} + +interface SlotMap { + [slot: number]: ClusterNode; +} + +class ClusterNode { + readonly name: string; + + constructor(readonly hostname: string, readonly port: number) { + this.name = `${hostname}:${port}`; + } +} + +const kRedisClusterRequestTTL = 16; + +class ClusterError extends Error {} + +// TODO: This class should implement CommandExecutor interface. +class ClusterExecutor { + #nodes!: ClusterNode[]; + #slots!: SlotMap; + #startupNodes: ClusterNode[]; + #refreshTableASAP?: boolean; + #maxConnections: number; + #connectionByNodeName: { [name: string]: Redis } = {}; + + constructor(opts: ClusterConnectOptions) { + this.#startupNodes = opts.nodes.map((node) => + new ClusterNode(node.hostname, node.port ?? 6379) + ); + this.#maxConnections = opts.maxConnections; + } + + get connection(): Connection { + throw new Error("Not implemented yet"); + } + + async exec(command: string, ...args: RedisValue[]): Promise { + if (this.#refreshTableASAP) { + await this.initializeSlotsCache(); + } + const key = getKeyFromCommand(command, args); + if (key == null) { + throw new ClusterError( + "No way to dispatch this command to Redis Cluster.", + ); + } + const slot = calculateSlot(key); + let asking = false; + let tryRandomNode = false; + let ttl = kRedisClusterRequestTTL; + let lastError: null | Error; + while (ttl > 0) { + ttl -= 1; + let r: Redis; + if (tryRandomNode) { + r = await this.#getRandomConnection(); + tryRandomNode = false; + } else { + r = await this.#getConnectionBySlot(slot); + } + + try { + if (asking) { + await r.asking(); + } + asking = false; + const reply = await r.executor.exec(command, ...args); + return reply; + } catch (err) { + lastError = err; + if (err instanceof Deno.errors.BadResource) { + tryRandomNode = true; + continue; + } else if (err instanceof ErrorReplyError) { + const [code, newSlot, ipAndPort] = err.message.split(/\s+/); + if (code === "MOVED" || code === "ASK") { + if (code === "ASK") { + asking = true; + } else { + // Serve replied with MOVED. It's better for us to + // ask for CLUSTER NODES the next time. + this.#refreshTableASAP = true; + } + if (!asking) { + const [ip, port] = ipAndPort.split(":"); + this.#slots[parseInt(newSlot)] = new ClusterNode( + ip, + parseInt(port), + ); + } + } else { + throw err; + } + } else { + throw err; // An unexpected error occurred. + } + } + } + throw new ClusterError( + `Too many Cluster redirections? (last error: ${lastError!.message ?? + ""})`, + ); + } + + async initializeSlotsCache(): Promise { + for (const node of this.#startupNodes) { + try { + const redis = await getRedisLink(node); + const clusterSlots = await redis.clusterSlots() as Array< + [number, number, [string, number]] + >; + const nodes = [] as ClusterNode[]; + const slotMap = {} as SlotMap; + for (const [from, to, master] of clusterSlots) { + for (let slot = from; slot <= to; slot++) { + const [ip, port] = master; + const name = `${ip}:${port}`; + const node = { + name, + hostname: ip, + port, + }; + nodes.push(node); + slotMap[slot] = node; + } + } + this.#nodes = nodes; + this.#slots = slotMap; + await this.#populateStartupNodes(); + this.#refreshTableASAP = false; + return; + } catch (_err) { + // TODO: Consider logging `_err` here + continue; + } + } + } + + #populateStartupNodes() { + for (const node of this.#nodes) { + this.#startupNodes.push(node); + } + + this.#startupNodes = uniqBy( + this.#startupNodes, + (node: ClusterNode) => node.name, + ); + } + + async #getRandomConnection(): Promise { + for (const node of shuffle(this.#startupNodes)) { + try { + let conn = this.#connectionByNodeName[node.name]; + if (conn) { + const message = await conn.ping(); + if (message === "PONG") { + return conn; + } + } else { + conn = await getRedisLink(node); + try { + const message = await conn.ping(); + if (message === "PONG") { + await this.#closeExistingConnection(); + this.#connectionByNodeName[node.name] = conn; + return conn; + } + } catch { + conn.close(); + } + } + } catch { + // Just try with the nest node. + } + } + throw new ClusterError("Can't reach a single startup node."); + } + + async #getConnectionBySlot(slot: number): Promise { + const node = this.#slots[slot]; + if (!node) { + return this.#getRandomConnection(); + } + const conn = this.#connectionByNodeName[node.name]; + if (!conn) { + try { + await this.#closeExistingConnection(); + this.#connectionByNodeName[node.name] = await getRedisLink(node); + } catch { + return this.#getRandomConnection(); + } + } + return conn; + } + + async #closeExistingConnection() { + const nodeNames = Object.keys(this.#connectionByNodeName); + while (nodeNames.length >= this.#maxConnections) { + const nodeName = sample(nodeNames); + const conn = this.#connectionByNodeName[nodeName]; + delete this.#connectionByNodeName[nodeName]; + try { + await conn.quit(); + } catch {} + } + } +} + +function getRedisLink(node: ClusterNode): Promise { + return connect(node); +} + +function getKeyFromCommand(command: string, args: RedisValue[]): string | null { + switch (command.toLowerCase()) { + case "info": + case "multi": + case "exec": + case "slaveof": + case "config": + case "shutdown": + return null; + default: + return args[1] as string; + } +} + +async function connectCluster(opts: ClusterConnectOptions) { + const executor = new ClusterExecutor(opts); + await executor.initializeSlotsCache(); + return new RedisImpl(executor); +} + +export { connectCluster as connect }; diff --git a/redis.ts b/redis.ts index 03d227e3..47f3d2bd 100644 --- a/redis.ts +++ b/redis.ts @@ -448,6 +448,10 @@ export class RedisImpl implements Redis { return this.execStatusReply("CLIENT", "UNPAUSE"); } + asking() { + return this.execStatusReply("ASKING"); + } + clusterAddSlots(...slots: number[]) { return this.execStatusReply("CLUSTER", "ADDSLOTS", ...slots); } From 9f2132cef63202c3fe214521c2858e64ee4e5e27 Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 25 Jun 2021 01:10:39 +0900 Subject: [PATCH 02/64] chore: Fix typo --- experimental/cluster/mod.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index a8190a06..c101841c 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -215,7 +215,7 @@ class ClusterExecutor { } } } catch { - // Just try with the nest node. + // Just try with the next node. } } throw new ClusterError("Can't reach a single startup node."); From 05baa00a238320623ff6bcd1e0fa1abb17c1b015 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 26 Jun 2021 21:34:03 +0900 Subject: [PATCH 03/64] fix: Update --- experimental/cluster/mod.ts | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index c101841c..c1214ae8 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -1,5 +1,5 @@ /** - * Base on https://github.com/antirez/redis-rb-cluster which is licensed as follows: + * Based on https://github.com/antirez/redis-rb-cluster which is licensed as follows: * * Copyright (C) 2013 Salvatore Sanfilippo * @@ -24,7 +24,8 @@ */ import { connect, RedisImpl } from "../../redis.ts"; -import { Connection } from "../../connection.ts"; +import type { CommandExecutor } from "../../executor.ts"; +import type { Connection } from "../../connection.ts"; import type { Redis } from "../../redis.ts"; import uniqBy from "https://cdn.skypack.dev/lodash.uniqby@4.5.0"; // TODO: Import `lodash.uniqby` from `vendor` directory import type { RedisReply, RedisValue } from "../../protocol/mod.ts"; @@ -32,6 +33,7 @@ import calculateSlot from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; import shuffle from "https://cdn.skypack.dev/lodash.shuffle@4.2.0"; import sample from "https://cdn.skypack.dev/lodash.sample@4.2.1"; import { ErrorReplyError } from "../../errors.ts"; +import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface ClusterConnectOptions { nodes: Array; @@ -60,7 +62,7 @@ const kRedisClusterRequestTTL = 16; class ClusterError extends Error {} // TODO: This class should implement CommandExecutor interface. -class ClusterExecutor { +class ClusterExecutor implements CommandExecutor { #nodes!: ClusterNode[]; #slots!: SlotMap; #startupNodes: ClusterNode[]; @@ -115,6 +117,9 @@ class ClusterExecutor { lastError = err; if (err instanceof Deno.errors.BadResource) { tryRandomNode = true; + if (ttl < kRedisClusterRequestTTL / 2) { + await delay(100); + } continue; } else if (err instanceof ErrorReplyError) { const [code, newSlot, ipAndPort] = err.message.split(/\s+/); @@ -226,11 +231,12 @@ class ClusterExecutor { if (!node) { return this.#getRandomConnection(); } - const conn = this.#connectionByNodeName[node.name]; + let conn = this.#connectionByNodeName[node.name]; if (!conn) { try { await this.#closeExistingConnection(); - this.#connectionByNodeName[node.name] = await getRedisLink(node); + conn = await getRedisLink(node); + this.#connectionByNodeName[node.name] = conn; } catch { return this.#getRandomConnection(); } From 2aebae731e37f588a1ee371ed7f5b3cec61325c2 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 00:15:31 +0900 Subject: [PATCH 04/64] test: Try to add tests for DEL command --- experimental/cluster/mod.ts | 33 +++++++++++++++++++++++++++------ redis_test.ts | 1 + tests/cluster/key_test.ts | 37 +++++++++++++++++++++++++++++++++++++ tests/cluster/test.ts | 1 + tests/cluster/test_util.ts | 26 ++++++++++++++++++++++++++ tests/test_util.ts | 8 ++++---- 6 files changed, 96 insertions(+), 10 deletions(-) create mode 100644 tests/cluster/key_test.ts create mode 100644 tests/cluster/test.ts create mode 100644 tests/cluster/test_util.ts diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index c1214ae8..db3fc218 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -24,6 +24,7 @@ */ import { connect, RedisImpl } from "../../redis.ts"; +import type { RedisConnectOptions } from "../../redis.ts"; import type { CommandExecutor } from "../../executor.ts"; import type { Connection } from "../../connection.ts"; import type { Redis } from "../../redis.ts"; @@ -38,6 +39,7 @@ import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface ClusterConnectOptions { nodes: Array; maxConnections: number; + redisOptions?: RedisConnectOptions; } export interface NodeOptions { @@ -69,12 +71,14 @@ class ClusterExecutor implements CommandExecutor { #refreshTableASAP?: boolean; #maxConnections: number; #connectionByNodeName: { [name: string]: Redis } = {}; + #redisOpts: RedisConnectOptions | undefined; constructor(opts: ClusterConnectOptions) { this.#startupNodes = opts.nodes.map((node) => new ClusterNode(node.hostname, node.port ?? 6379) ); this.#maxConnections = opts.maxConnections; + this.#redisOpts = opts.redisOptions; } get connection(): Connection { @@ -155,7 +159,7 @@ class ClusterExecutor implements CommandExecutor { async initializeSlotsCache(): Promise { for (const node of this.#startupNodes) { try { - const redis = await getRedisLink(node); + const redis = await getRedisLink(node, this.#redisOpts); const clusterSlots = await redis.clusterSlots() as Array< [number, number, [string, number]] >; @@ -207,7 +211,7 @@ class ClusterExecutor implements CommandExecutor { return conn; } } else { - conn = await getRedisLink(node); + conn = await getRedisLink(node, this.#redisOpts); try { const message = await conn.ping(); if (message === "PONG") { @@ -235,7 +239,7 @@ class ClusterExecutor implements CommandExecutor { if (!conn) { try { await this.#closeExistingConnection(); - conn = await getRedisLink(node); + conn = await getRedisLink(node, this.#redisOpts); this.#connectionByNodeName[node.name] = conn; } catch { return this.#getRandomConnection(); @@ -252,13 +256,26 @@ class ClusterExecutor implements CommandExecutor { delete this.#connectionByNodeName[nodeName]; try { await conn.quit(); - } catch {} + } catch { + // deno-lint-ignore no-empty + } } } } -function getRedisLink(node: ClusterNode): Promise { - return connect(node); +function getRedisLink( + node: ClusterNode, + opts: RedisConnectOptions | undefined, +): Promise { + if (opts) { + return connect({ + ...opts, + hostname: node.hostname, + port: node.port, + }); + } else { + return connect(node); + } } function getKeyFromCommand(command: string, args: RedisValue[]): string | null { @@ -275,6 +292,10 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { } } +/** + * @see https://redis.io/topics/cluster-tutorial + * @see https://redis.io/topics/cluster-spec + */ async function connectCluster(opts: ClusterConnectOptions) { const executor = new ClusterExecutor(opts); await executor.initializeSlotsCache(); diff --git a/redis_test.ts b/redis_test.ts index ab19c022..9873f0e8 100644 --- a/redis_test.ts +++ b/redis_test.ts @@ -14,3 +14,4 @@ import "./tests/set_test.ts"; import "./tests/sorted_set_test.ts"; import "./tests/stream_test.ts"; import "./tests/string_test.ts"; +import "./tests/cluster/test.ts"; diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts new file mode 100644 index 00000000..69082db8 --- /dev/null +++ b/tests/cluster/key_test.ts @@ -0,0 +1,37 @@ +import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; +import type { TestCluster } from "./test_util.ts"; +import { TestSuite } from "../test_util.ts"; +import { connect } from "../../experimental/cluster/mod.ts"; +import type { Redis } from "../../redis.ts"; +import { assertEquals } from "../../vendor/https/deno.land/std/testing/asserts.ts"; + +const suite = new TestSuite("cluster/key"); +let cluster!: TestCluster; +let client!: Redis; +const ports = nextPorts(3); + +suite.beforeEach(async () => { + if (cluster == null) { + cluster = await startRedisCluster(ports); + client = await connect({ + nodes: ports.map((port) => ({ + hostname: "127.0.0.1", + port, + })), + maxConnections: ports.length, + }); + } +}); + +suite.afterAll(() => { + stopRedisCluster(cluster); +}); + +suite.test("del", async () => { + await client.set("{hoge}foo", "a"); + await client.set("{hoge}bar", "b"); + const r = await client.del("{hoge}foo", "{hoge}bar"); + assertEquals(r, 2); +}); + +suite.runTests(); diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts new file mode 100644 index 00000000..46bf2400 --- /dev/null +++ b/tests/cluster/test.ts @@ -0,0 +1 @@ +import "./key_test.ts"; diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts new file mode 100644 index 00000000..00de98ff --- /dev/null +++ b/tests/cluster/test_util.ts @@ -0,0 +1,26 @@ +import { nextPort, startRedis, stopRedis } from "../test_util.ts"; +import type { TestServer } from "../test_util.ts"; + +export interface TestCluster { + servers: TestServer[]; +} + +export async function startRedisCluster(ports: number[]): Promise { + const servers = await Promise.all(ports.map((port) => + startRedis({ + port, + clusterEnabled: true, + }) + )); + return { servers }; +} + +export function stopRedisCluster(cluster: TestCluster): void { + for (const server of cluster.servers) { + stopRedis(server); + } +} + +export function nextPorts(n: number): Array { + return Array(n).fill(0).map(() => nextPort()); +} diff --git a/tests/test_util.ts b/tests/test_util.ts index 4e085611..8e696d0d 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -2,10 +2,10 @@ import { connect, Redis, RedisConnectOptions } from "../mod.ts"; import { delay } from "../vendor/https/deno.land/std/async/delay.ts"; type TestFunc = () => void | Promise; -type TestServer = { +export interface TestServer { path: string; process: Deno.Process; -}; +} export class TestSuite { private tests: { name: string; func: TestFunc }[] = []; @@ -74,7 +74,7 @@ export async function startRedis({ let config = `dir ${path}\nport ${port}\n`; config += clusterEnabled ? "cluster-enabled yes" : ""; - Deno.writeFileSync(`${path}/redis.conf`, encoder.encode(config), { + await Deno.writeFile(`${path}/redis.conf`, encoder.encode(config), { append: true, }); } @@ -91,7 +91,7 @@ export async function startRedis({ } export function stopRedis(server: TestServer): void { - Deno.removeSync(server.path, { recursive: true }); + Deno.remove(server.path, { recursive: true }); server.process.close(); } From d160c8aa0b7a613fe267b072e30120a580f649d3 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 00:15:41 +0900 Subject: [PATCH 05/64] chore: Ignore tests/server directory --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 21a9126e..29802cbc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ commands testdata/*/ benchmark/node_modules benchmark/benchmark.js +tests/server \ No newline at end of file From db2b2d5beb413386cc6690d9e50c8885781b5154 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 00:17:30 +0900 Subject: [PATCH 06/64] chore: Fix --- tests/test_util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_util.ts b/tests/test_util.ts index 8e696d0d..63d8ffb1 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -91,7 +91,7 @@ export async function startRedis({ } export function stopRedis(server: TestServer): void { - Deno.remove(server.path, { recursive: true }); + Deno.removeSync(server.path, { recursive: true }); server.process.close(); } From 004070a023a4543355098101d5631271eb9605fb Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 00:18:43 +0900 Subject: [PATCH 07/64] chore: Fix comment --- experimental/cluster/mod.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index db3fc218..3bcfedcd 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -256,8 +256,8 @@ class ClusterExecutor implements CommandExecutor { delete this.#connectionByNodeName[nodeName]; try { await conn.quit(); - } catch { - // deno-lint-ignore no-empty + } catch (err) { + console.error(err); // TODO: Improve logging } } } From 7586b069f2dabda3dee67c76b39b1800a0a80573 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 21:55:09 +0900 Subject: [PATCH 08/64] test: Try to setup Redis Cluster using redis-cli --- tests/cluster/key_test.ts | 25 ++++++++----------------- tests/cluster/test_util.ts | 26 +++++++++++++++++++++++++- tests/test_util.ts | 2 ++ 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts index 69082db8..07afa86b 100644 --- a/tests/cluster/key_test.ts +++ b/tests/cluster/key_test.ts @@ -1,26 +1,17 @@ import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; -import type { TestCluster } from "./test_util.ts"; import { TestSuite } from "../test_util.ts"; import { connect } from "../../experimental/cluster/mod.ts"; -import type { Redis } from "../../redis.ts"; import { assertEquals } from "../../vendor/https/deno.land/std/testing/asserts.ts"; const suite = new TestSuite("cluster/key"); -let cluster!: TestCluster; -let client!: Redis; -const ports = nextPorts(3); - -suite.beforeEach(async () => { - if (cluster == null) { - cluster = await startRedisCluster(ports); - client = await connect({ - nodes: ports.map((port) => ({ - hostname: "127.0.0.1", - port, - })), - maxConnections: ports.length, - }); - } +const ports = nextPorts(5); +const cluster = await startRedisCluster(ports); +const client = await connect({ + nodes: ports.map((port) => ({ + hostname: "127.0.0.1", + port, + })), + maxConnections: ports.length, }); suite.afterAll(() => { diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 00de98ff..c479de0a 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,5 +1,6 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; +import { connect } from "../../redis.ts"; export interface TestCluster { servers: TestServer[]; @@ -10,9 +11,32 @@ export async function startRedisCluster(ports: number[]): Promise { startRedis({ port, clusterEnabled: true, + additionalConfigurations: [ + `cluster-config-file tests/server/redis_cluster_${port}.conf`, + ], }) )); - return { servers }; + + const redisCLI = Deno.run({ + cmd: [ + "redis-cli", + "--cluster", + "create", + ...ports.map((port) => `127.0.0.1:${port}`), + "--cluster-replicas", + "1", + ], + }); + try { + const status = await redisCLI.status(); + if (!status.success) { + throw new Error("Failed to create the cluster"); + } + + return { servers }; + } finally { + redisCLI.close(); + } } export function stopRedisCluster(cluster: TestCluster): void { diff --git a/tests/test_util.ts b/tests/test_util.ts index 63d8ffb1..833c6ebd 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -64,6 +64,7 @@ const encoder = new TextEncoder(); export async function startRedis({ port = 6379, clusterEnabled = false, + additionalConfigurations = [] as string[], }): Promise { const path = `tests/server/${port}`; @@ -73,6 +74,7 @@ export async function startRedis({ let config = `dir ${path}\nport ${port}\n`; config += clusterEnabled ? "cluster-enabled yes" : ""; + config += additionalConfigurations.join("\n"); await Deno.writeFile(`${path}/redis.conf`, encoder.encode(config), { append: true, From 25386e88d64afd394b9b2bab8b1c9e44ed17de83 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 21:58:08 +0900 Subject: [PATCH 09/64] chore: Fixed lint --- tests/cluster/test_util.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index c479de0a..4c1a25d8 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,6 +1,5 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; -import { connect } from "../../redis.ts"; export interface TestCluster { servers: TestServer[]; From 29ec72107d7deb049bece040177849df351d7257 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 22:00:12 +0900 Subject: [PATCH 10/64] ci: Allow Deno to run 'redis-cli' --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 46776bea..01872cac 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -50,7 +50,7 @@ jobs: - name: Run tests run: | - deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server redis_test.ts + deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts - uses: bahmutov/npm-install@v1 with: working-directory: benchmark From ecaafbbb6850308d8aa60a6a90c32f4449ca347f Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 22:03:25 +0900 Subject: [PATCH 11/64] test: Failed to setup redis cluster... --- tests/test_util.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_util.ts b/tests/test_util.ts index 833c6ebd..06f07060 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -74,7 +74,9 @@ export async function startRedis({ let config = `dir ${path}\nport ${port}\n`; config += clusterEnabled ? "cluster-enabled yes" : ""; - config += additionalConfigurations.join("\n"); + if (additionalConfigurations.length > 0) { + config += "\n" + additionalConfigurations.join("\n"); + } await Deno.writeFile(`${path}/redis.conf`, encoder.encode(config), { append: true, From ae84469cd82d21906c5253f66d71917a0355c5f1 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 22:09:21 +0900 Subject: [PATCH 12/64] test: Improve error message --- tests/cluster/test_util.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 4c1a25d8..74e66b9d 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,5 +1,6 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; +import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; export interface TestCluster { servers: TestServer[]; @@ -25,15 +26,19 @@ export async function startRedisCluster(ports: number[]): Promise { "--cluster-replicas", "1", ], + stderr: "piped", }); try { const status = await redisCLI.status(); if (!status.success) { - throw new Error("Failed to create the cluster"); + const output = await readAll(redisCLI.stderr); + const decoder = new TextDecoder(); + throw new Error(decoder.decode(output)); } return { servers }; } finally { + redisCLI.stderr.close(); redisCLI.close(); } } From 30b95e220c4d90de727e78a7ef2f14f2016de940 Mon Sep 17 00:00:00 2001 From: uki00a Date: Mon, 5 Jul 2021 22:13:43 +0900 Subject: [PATCH 13/64] test: Try to wait for servers to start up --- tests/cluster/test_util.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 74e66b9d..2cc1f111 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,6 +1,7 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; +import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface TestCluster { servers: TestServer[]; @@ -17,6 +18,8 @@ export async function startRedisCluster(ports: number[]): Promise { }) )); + await delay(5000); + const redisCLI = Deno.run({ cmd: [ "redis-cli", From d23e2ecdd5408905f26753d45e32e1b493ca250f Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 00:46:23 +0900 Subject: [PATCH 14/64] fix: Update .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 29802cbc..33fbe5ef 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ commands testdata/*/ benchmark/node_modules benchmark/benchmark.js -tests/server \ No newline at end of file +tests/server/*/ \ No newline at end of file From b22b839be76002f42051f158bc84e301fa482704 Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 00:56:05 +0900 Subject: [PATCH 15/64] test: Update cluster client tests --- tests/cluster/key_test.ts | 2 +- tests/cluster/test_util.ts | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts index 07afa86b..52fd3e8a 100644 --- a/tests/cluster/key_test.ts +++ b/tests/cluster/key_test.ts @@ -4,7 +4,7 @@ import { connect } from "../../experimental/cluster/mod.ts"; import { assertEquals } from "../../vendor/https/deno.land/std/testing/asserts.ts"; const suite = new TestSuite("cluster/key"); -const ports = nextPorts(5); +const ports = nextPorts(6); const cluster = await startRedisCluster(ports); const client = await connect({ nodes: ports.map((port) => ({ diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 2cc1f111..cded3563 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,7 +1,6 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; -import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface TestCluster { servers: TestServer[]; @@ -13,12 +12,11 @@ export async function startRedisCluster(ports: number[]): Promise { port, clusterEnabled: true, additionalConfigurations: [ - `cluster-config-file tests/server/redis_cluster_${port}.conf`, + `cluster-config-file tests/server/cluster/${port}_nodes.conf`, ], }) )); - - await delay(5000); + const cluster = { servers }; const redisCLI = Deno.run({ cmd: [ @@ -34,12 +32,13 @@ export async function startRedisCluster(ports: number[]): Promise { try { const status = await redisCLI.status(); if (!status.success) { + stopRedisCluster(cluster); const output = await readAll(redisCLI.stderr); const decoder = new TextDecoder(); throw new Error(decoder.decode(output)); } - return { servers }; + return cluster; } finally { redisCLI.stderr.close(); redisCLI.close(); From 3c1125e92ecc572d0f0c684a7d62a336841aebb9 Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 00:57:33 +0900 Subject: [PATCH 16/64] chore: Debug --- tests/cluster/test_util.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index cded3563..a5753262 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,6 +1,7 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; +import { connect } from "../../redis.ts"; export interface TestCluster { servers: TestServer[]; @@ -18,6 +19,12 @@ export async function startRedisCluster(ports: number[]): Promise { )); const cluster = { servers }; + { + const redis = await connect({ hostname: "127.0.0.1", port: ports[0] }); + await redis.ping(); + await redis.quit(); + } + const redisCLI = Deno.run({ cmd: [ "redis-cli", From 39f510ac974159484fa7322b9de46585216d827c Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:04:03 +0900 Subject: [PATCH 17/64] chore: Debug (part 2) --- tests/cluster/test_util.ts | 1 + tests/test_util.ts | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index a5753262..e0cef0b2 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -15,6 +15,7 @@ export async function startRedisCluster(ports: number[]): Promise { additionalConfigurations: [ `cluster-config-file tests/server/cluster/${port}_nodes.conf`, ], + debug: true, }) )); const cluster = { servers }; diff --git a/tests/test_util.ts b/tests/test_util.ts index 06f07060..869922c8 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -65,6 +65,7 @@ export async function startRedis({ port = 6379, clusterEnabled = false, additionalConfigurations = [] as string[], + debug = false, }): Promise { const path = `tests/server/${port}`; @@ -86,7 +87,7 @@ export async function startRedis({ const process = Deno.run({ cmd: ["redis-server", `${path}/redis.conf`], stdin: "null", - stdout: "null", + stdout: debug ? "inherit" : "null", }); // Ample time for server to finish startup From dbeb56ffdaf17cce260ecc316cb866d8565c101c Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:05:44 +0900 Subject: [PATCH 18/64] chore: Lint --- tests/test_util.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_util.ts b/tests/test_util.ts index 869922c8..0db45bb1 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -65,6 +65,7 @@ export async function startRedis({ port = 6379, clusterEnabled = false, additionalConfigurations = [] as string[], + // deno-lint-ignore no-unused-vars debug = false, }): Promise { const path = `tests/server/${port}`; From 8ead7c53295a103eeb8a21f7b133cf16eeffe147 Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:09:21 +0900 Subject: [PATCH 19/64] test: Update cluster-config-file --- tests/cluster/test_util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index e0cef0b2..108496d3 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -13,7 +13,7 @@ export async function startRedisCluster(ports: number[]): Promise { port, clusterEnabled: true, additionalConfigurations: [ - `cluster-config-file tests/server/cluster/${port}_nodes.conf`, + `cluster-config-file tests/server/${port}/nodes.conf`, ], debug: true, }) From b4e1174161f9b877f594a1249eecf61bda7d248a Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:14:57 +0900 Subject: [PATCH 20/64] test: Update cluster-conf-file (part 2) --- tests/cluster/test_util.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 108496d3..28911a8d 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -13,7 +13,8 @@ export async function startRedisCluster(ports: number[]): Promise { port, clusterEnabled: true, additionalConfigurations: [ - `cluster-config-file tests/server/${port}/nodes.conf`, + // `cluster-config-file tests/server/${port}/nodes.conf`, + `cluster-config-file ${port}_nodes.conf`, ], debug: true, }) From 0850c5eed52a7adff463be3fff8155277063db6d Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:18:32 +0900 Subject: [PATCH 21/64] test: Remove debugging related stuff --- tests/cluster/test_util.ts | 10 +--------- tests/test_util.ts | 4 +--- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 28911a8d..85c8f017 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -13,20 +13,12 @@ export async function startRedisCluster(ports: number[]): Promise { port, clusterEnabled: true, additionalConfigurations: [ - // `cluster-config-file tests/server/${port}/nodes.conf`, + // TODO: The configuration file should be created in `test/server/` directory. `cluster-config-file ${port}_nodes.conf`, ], - debug: true, }) )); const cluster = { servers }; - - { - const redis = await connect({ hostname: "127.0.0.1", port: ports[0] }); - await redis.ping(); - await redis.quit(); - } - const redisCLI = Deno.run({ cmd: [ "redis-cli", diff --git a/tests/test_util.ts b/tests/test_util.ts index 0db45bb1..06f07060 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -65,8 +65,6 @@ export async function startRedis({ port = 6379, clusterEnabled = false, additionalConfigurations = [] as string[], - // deno-lint-ignore no-unused-vars - debug = false, }): Promise { const path = `tests/server/${port}`; @@ -88,7 +86,7 @@ export async function startRedis({ const process = Deno.run({ cmd: ["redis-server", `${path}/redis.conf`], stdin: "null", - stdout: debug ? "inherit" : "null", + stdout: "null", }); // Ample time for server to finish startup From 2107ec65abfa58771ad2030f2df75944d0365094 Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:23:39 +0900 Subject: [PATCH 22/64] test: Temporary disable all tests except those for redis cluster --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 01872cac..7b9aeb2d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -50,7 +50,7 @@ jobs: - name: Run tests run: | - deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts + deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli tests/cluster/test.ts # deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts - uses: bahmutov/npm-install@v1 with: working-directory: benchmark From cfcd206bf3c78cc52ead2ccf1d0064bc595cdfd8 Mon Sep 17 00:00:00 2001 From: uki00a Date: Tue, 6 Jul 2021 01:25:04 +0900 Subject: [PATCH 23/64] chore: Lint --- tests/cluster/test_util.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 85c8f017..614b75f2 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,7 +1,6 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; -import { connect } from "../../redis.ts"; export interface TestCluster { servers: TestServer[]; From c0353f10c799b8da321b86d9e84177a0306a5f96 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:21:52 +0900 Subject: [PATCH 24/64] test: Add delay --- tests/cluster/test_util.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 614b75f2..f063f980 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,6 +1,7 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; +import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface TestCluster { servers: TestServer[]; @@ -38,6 +39,8 @@ export async function startRedisCluster(ports: number[]): Promise { throw new Error(decoder.decode(output)); } + await delay(5000); + return cluster; } finally { redisCLI.stderr.close(); From 8fb8d4a4dd855980b0791fecacfbccc5e8bcfbe1 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:30:14 +0900 Subject: [PATCH 25/64] test: Add "--cluster-yes" option --- tests/cluster/test_util.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index f063f980..a363ad98 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -27,6 +27,7 @@ export async function startRedisCluster(ports: number[]): Promise { ...ports.map((port) => `127.0.0.1:${port}`), "--cluster-replicas", "1", + "--cluster-yes", ], stderr: "piped", }); @@ -39,8 +40,6 @@ export async function startRedisCluster(ports: number[]): Promise { throw new Error(decoder.decode(output)); } - await delay(5000); - return cluster; } finally { redisCLI.stderr.close(); From 37fbc2608820808af1abbf6e45b57072a97750b4 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:31:24 +0900 Subject: [PATCH 26/64] chore: Lint --- tests/cluster/test_util.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index a363ad98..2e193340 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,7 +1,6 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; -import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface TestCluster { servers: TestServer[]; From fac2172387d5842b5131c11a886617eaf1a3c109 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:38:34 +0900 Subject: [PATCH 27/64] test: Re-add delay --- tests/cluster/test_util.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 2e193340..12bd1df4 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -1,6 +1,7 @@ import { nextPort, startRedis, stopRedis } from "../test_util.ts"; import type { TestServer } from "../test_util.ts"; import { readAll } from "../../vendor/https/deno.land/std/io/util.ts"; +import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; export interface TestCluster { servers: TestServer[]; @@ -39,6 +40,8 @@ export async function startRedisCluster(ports: number[]): Promise { throw new Error(decoder.decode(output)); } + await delay(5000); + return cluster; } finally { redisCLI.stderr.close(); From b33c357f30dd17b840ae0865f422ca9da036cb94 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:41:24 +0900 Subject: [PATCH 28/64] fix: Fix cluster error handling --- experimental/cluster/mod.ts | 4 ++-- tests/cluster/test_util.ts | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 3bcfedcd..d503d44a 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -127,8 +127,8 @@ class ClusterExecutor implements CommandExecutor { continue; } else if (err instanceof ErrorReplyError) { const [code, newSlot, ipAndPort] = err.message.split(/\s+/); - if (code === "MOVED" || code === "ASK") { - if (code === "ASK") { + if (code === "-MOVED" || code === "-ASK") { + if (code === "-ASK") { asking = true; } else { // Serve replied with MOVED. It's better for us to diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 12bd1df4..608fd5a1 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -40,6 +40,7 @@ export async function startRedisCluster(ports: number[]): Promise { throw new Error(decoder.decode(output)); } + // Ample time for cluster to finish startup await delay(5000); return cluster; From 6ea18070019ded569a805961862fd9ff4592ebc4 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:44:39 +0900 Subject: [PATCH 29/64] chore: Debug --- experimental/cluster/mod.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index d503d44a..ac5f875b 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -127,6 +127,7 @@ class ClusterExecutor implements CommandExecutor { continue; } else if (err instanceof ErrorReplyError) { const [code, newSlot, ipAndPort] = err.message.split(/\s+/); + console.log([code, newSlot, ipAndPort]); if (code === "-MOVED" || code === "-ASK") { if (code === "-ASK") { asking = true; From 9523561acc6f146c1ddd0d5df25c5cece7e92cb6 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 00:58:40 +0900 Subject: [PATCH 30/64] chore: Debug --- experimental/cluster/mod.ts | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index ac5f875b..dad3cc6f 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -63,6 +63,11 @@ const kRedisClusterRequestTTL = 16; class ClusterError extends Error {} +// TODO: Remove this! +function debug(x: unknown): void { + console.log(x); +} + // TODO: This class should implement CommandExecutor interface. class ClusterExecutor implements CommandExecutor { #nodes!: ClusterNode[]; @@ -89,19 +94,20 @@ class ClusterExecutor implements CommandExecutor { if (this.#refreshTableASAP) { await this.initializeSlotsCache(); } - const key = getKeyFromCommand(command, args); - if (key == null) { - throw new ClusterError( - "No way to dispatch this command to Redis Cluster.", - ); - } - const slot = calculateSlot(key); let asking = false; let tryRandomNode = false; let ttl = kRedisClusterRequestTTL; let lastError: null | Error; while (ttl > 0) { ttl -= 1; + const key = getKeyFromCommand(command, args); + if (key == null) { + throw new ClusterError( + "No way to dispatch this command to Redis Cluster.", + ); + } + const slot = calculateSlot(key); + debug(`Slot is ${slot}`); let r: Redis; if (tryRandomNode) { r = await this.#getRandomConnection(); @@ -127,7 +133,7 @@ class ClusterExecutor implements CommandExecutor { continue; } else if (err instanceof ErrorReplyError) { const [code, newSlot, ipAndPort] = err.message.split(/\s+/); - console.log([code, newSlot, ipAndPort]); + debug([code, newSlot, ipAndPort]); if (code === "-MOVED" || code === "-ASK") { if (code === "-ASK") { asking = true; From 4722814fe43be2675844eb19fa6e3e8d326f6830 Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 01:00:42 +0900 Subject: [PATCH 31/64] chore: Log redirection message --- experimental/cluster/mod.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index dad3cc6f..7252f886 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -132,6 +132,7 @@ class ClusterExecutor implements CommandExecutor { } continue; } else if (err instanceof ErrorReplyError) { + debug(err); const [code, newSlot, ipAndPort] = err.message.split(/\s+/); debug([code, newSlot, ipAndPort]); if (code === "-MOVED" || code === "-ASK") { From 889d276b21d3f591216a2b1d452b1607c4512dfd Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 01:22:59 +0900 Subject: [PATCH 32/64] fix: Fix key selecting --- experimental/cluster/mod.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 7252f886..a3617ad5 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -226,6 +226,8 @@ class ClusterExecutor implements CommandExecutor { await this.#closeExistingConnection(); this.#connectionByNodeName[node.name] = conn; return conn; + } else { + await conn.quit(); } } catch { conn.close(); @@ -296,7 +298,7 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { case "shutdown": return null; default: - return args[1] as string; + return args[0] as string; } } From 9dc5ea69139364423cd513e9a814adcf61d0c01a Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 01:34:16 +0900 Subject: [PATCH 33/64] feat: Add ClusterExecutor#close --- experimental/cluster/mod.ts | 21 ++++++++++++++++++++- tests/cluster/key_test.ts | 4 ++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index a3617ad5..ddd055c1 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -164,6 +164,18 @@ class ClusterExecutor implements CommandExecutor { ); } + close(): void { + const nodeNames = Object.keys(this.#connectionByNodeName); + for (const nodeName of nodeNames) { + const conn = this.#connectionByNodeName[nodeName]; + if (conn) { + conn.close(); + delete this.#connectionByNodeName[nodeName]; + } + } + this.#refreshTableASAP = true; + } + async initializeSlotsCache(): Promise { for (const node of this.#startupNodes) { try { @@ -309,7 +321,14 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { async function connectCluster(opts: ClusterConnectOptions) { const executor = new ClusterExecutor(opts); await executor.initializeSlotsCache(); - return new RedisImpl(executor); + const redis = new RedisImpl(executor); + + // TODO: This is not ideal. We should refactor this! + function close(): void { + executor.close(); + } + + return Object.assign(redis, { close }); } export { connectCluster as connect }; diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts index 52fd3e8a..559d5102 100644 --- a/tests/cluster/key_test.ts +++ b/tests/cluster/key_test.ts @@ -18,6 +18,10 @@ suite.afterAll(() => { stopRedisCluster(cluster); }); +suite.afterEach(() => { + client.close(); +}); + suite.test("del", async () => { await client.set("{hoge}foo", "a"); await client.set("{hoge}bar", "b"); From fdaa503d0681ad8bc36a696a89e8309e7bcb79dc Mon Sep 17 00:00:00 2001 From: uki00a Date: Thu, 8 Jul 2021 01:47:20 +0900 Subject: [PATCH 34/64] test(del): multiple keys in different hash slots --- experimental/cluster/mod.ts | 1 + tests/cluster/key_test.ts | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index ddd055c1..8139b4ee 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -151,6 +151,7 @@ class ClusterExecutor implements CommandExecutor { ); } } else { + debug(err); throw err; } } else { diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts index 559d5102..e5a5258b 100644 --- a/tests/cluster/key_test.ts +++ b/tests/cluster/key_test.ts @@ -1,7 +1,10 @@ import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; import { TestSuite } from "../test_util.ts"; import { connect } from "../../experimental/cluster/mod.ts"; -import { assertEquals } from "../../vendor/https/deno.land/std/testing/asserts.ts"; +import { + assertEquals, + assertThrowsAsync, +} from "../../vendor/https/deno.land/std/testing/asserts.ts"; const suite = new TestSuite("cluster/key"); const ports = nextPorts(6); @@ -22,11 +25,19 @@ suite.afterEach(() => { client.close(); }); -suite.test("del", async () => { +suite.test("del multiple keys in the same hash slot", async () => { await client.set("{hoge}foo", "a"); await client.set("{hoge}bar", "b"); const r = await client.del("{hoge}foo", "{hoge}bar"); assertEquals(r, 2); }); +suite.test("del multiple keys in different hash slots", async () => { + await client.set("foo", "a"); + await client.set("bar", "b"); + await assertThrowsAsync(async () => { + await client.del("foo", "bar"); + }); +}); + suite.runTests(); From 6c9cacfc6c1d8dfac4281e744587f66b1ffaf985 Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 9 Jul 2021 00:09:45 +0900 Subject: [PATCH 35/64] fix: Close redis connections --- experimental/cluster/mod.ts | 46 ++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 8139b4ee..6681a086 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -181,29 +181,33 @@ class ClusterExecutor implements CommandExecutor { for (const node of this.#startupNodes) { try { const redis = await getRedisLink(node, this.#redisOpts); - const clusterSlots = await redis.clusterSlots() as Array< - [number, number, [string, number]] - >; - const nodes = [] as ClusterNode[]; - const slotMap = {} as SlotMap; - for (const [from, to, master] of clusterSlots) { - for (let slot = from; slot <= to; slot++) { - const [ip, port] = master; - const name = `${ip}:${port}`; - const node = { - name, - hostname: ip, - port, - }; - nodes.push(node); - slotMap[slot] = node; + try { + const clusterSlots = await redis.clusterSlots() as Array< + [number, number, [string, number]] + >; + const nodes = [] as ClusterNode[]; + const slotMap = {} as SlotMap; + for (const [from, to, master] of clusterSlots) { + for (let slot = from; slot <= to; slot++) { + const [ip, port] = master; + const name = `${ip}:${port}`; + const node = { + name, + hostname: ip, + port, + }; + nodes.push(node); + slotMap[slot] = node; + } } + this.#nodes = nodes; + this.#slots = slotMap; + await this.#populateStartupNodes(); + this.#refreshTableASAP = false; + return; + } finally { + await redis.quit(); } - this.#nodes = nodes; - this.#slots = slotMap; - await this.#populateStartupNodes(); - this.#refreshTableASAP = false; - return; } catch (_err) { // TODO: Consider logging `_err` here continue; From a83975601898848894def554373118d09bf0f04b Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 9 Jul 2021 00:10:18 +0900 Subject: [PATCH 36/64] chore: Tweak a test --- tests/cluster/key_test.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts index e5a5258b..da1f3132 100644 --- a/tests/cluster/key_test.ts +++ b/tests/cluster/key_test.ts @@ -5,6 +5,7 @@ import { assertEquals, assertThrowsAsync, } from "../../vendor/https/deno.land/std/testing/asserts.ts"; +import { ErrorReplyError } from "../../errors.ts"; const suite = new TestSuite("cluster/key"); const ports = nextPorts(6); @@ -35,9 +36,13 @@ suite.test("del multiple keys in the same hash slot", async () => { suite.test("del multiple keys in different hash slots", async () => { await client.set("foo", "a"); await client.set("bar", "b"); - await assertThrowsAsync(async () => { - await client.del("foo", "bar"); - }); + await assertThrowsAsync( + async () => { + await client.del("foo", "bar"); + }, + ErrorReplyError, + "-CROSSSLOT Keys in request don't hash to the same slot", + ); }); suite.runTests(); From 58356c732d721d16c76a94efe7435a7c2cdd97c6 Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 9 Jul 2021 02:16:18 +0900 Subject: [PATCH 37/64] chore: Use lodash-es --- experimental/cluster/mod.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 6681a086..a4e68a41 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -28,11 +28,11 @@ import type { RedisConnectOptions } from "../../redis.ts"; import type { CommandExecutor } from "../../executor.ts"; import type { Connection } from "../../connection.ts"; import type { Redis } from "../../redis.ts"; -import uniqBy from "https://cdn.skypack.dev/lodash.uniqby@4.5.0"; // TODO: Import `lodash.uniqby` from `vendor` directory +import uniqBy from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; import type { RedisReply, RedisValue } from "../../protocol/mod.ts"; import calculateSlot from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; -import shuffle from "https://cdn.skypack.dev/lodash.shuffle@4.2.0"; -import sample from "https://cdn.skypack.dev/lodash.sample@4.2.1"; +import shuffle from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; +import sample from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; import { ErrorReplyError } from "../../errors.ts"; import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; @@ -68,7 +68,6 @@ function debug(x: unknown): void { console.log(x); } -// TODO: This class should implement CommandExecutor interface. class ClusterExecutor implements CommandExecutor { #nodes!: ClusterNode[]; #slots!: SlotMap; From 61bf0b86f3192aa9620cd68e5874bea045a228d8 Mon Sep 17 00:00:00 2001 From: uki00a Date: Fri, 9 Jul 2021 02:19:19 +0900 Subject: [PATCH 38/64] chore: Manage deps using deps.ts --- experimental/cluster/deps.ts | 5 +++++ experimental/cluster/mod.ts | 5 +---- 2 files changed, 6 insertions(+), 4 deletions(-) create mode 100644 experimental/cluster/deps.ts diff --git a/experimental/cluster/deps.ts b/experimental/cluster/deps.ts new file mode 100644 index 00000000..b8c4ebaf --- /dev/null +++ b/experimental/cluster/deps.ts @@ -0,0 +1,5 @@ +// TODO: Manage dependencies using dlink +export { default as calculateSlot } from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; +export { default as shuffle } from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; +export { default as sample } from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; +export { default as uniqBy } from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index a4e68a41..52c49727 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -28,13 +28,10 @@ import type { RedisConnectOptions } from "../../redis.ts"; import type { CommandExecutor } from "../../executor.ts"; import type { Connection } from "../../connection.ts"; import type { Redis } from "../../redis.ts"; -import uniqBy from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; import type { RedisReply, RedisValue } from "../../protocol/mod.ts"; -import calculateSlot from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; -import shuffle from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; -import sample from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; import { ErrorReplyError } from "../../errors.ts"; import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; +import { calculateSlot, sample, shuffle, uniqBy } from "./deps.ts"; export interface ClusterConnectOptions { nodes: Array; From fdba9b787f3c80ed157cc45d32b98a15bf1b442d Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 15:30:17 +0900 Subject: [PATCH 39/64] chore: Rename key_test.ts to test.ts --- tests/cluster/key_test.ts | 48 -------------------------------------- tests/cluster/test.ts | 49 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 49 deletions(-) delete mode 100644 tests/cluster/key_test.ts diff --git a/tests/cluster/key_test.ts b/tests/cluster/key_test.ts deleted file mode 100644 index da1f3132..00000000 --- a/tests/cluster/key_test.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; -import { TestSuite } from "../test_util.ts"; -import { connect } from "../../experimental/cluster/mod.ts"; -import { - assertEquals, - assertThrowsAsync, -} from "../../vendor/https/deno.land/std/testing/asserts.ts"; -import { ErrorReplyError } from "../../errors.ts"; - -const suite = new TestSuite("cluster/key"); -const ports = nextPorts(6); -const cluster = await startRedisCluster(ports); -const client = await connect({ - nodes: ports.map((port) => ({ - hostname: "127.0.0.1", - port, - })), - maxConnections: ports.length, -}); - -suite.afterAll(() => { - stopRedisCluster(cluster); -}); - -suite.afterEach(() => { - client.close(); -}); - -suite.test("del multiple keys in the same hash slot", async () => { - await client.set("{hoge}foo", "a"); - await client.set("{hoge}bar", "b"); - const r = await client.del("{hoge}foo", "{hoge}bar"); - assertEquals(r, 2); -}); - -suite.test("del multiple keys in different hash slots", async () => { - await client.set("foo", "a"); - await client.set("bar", "b"); - await assertThrowsAsync( - async () => { - await client.del("foo", "bar"); - }, - ErrorReplyError, - "-CROSSSLOT Keys in request don't hash to the same slot", - ); -}); - -suite.runTests(); diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index 46bf2400..da1f3132 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -1 +1,48 @@ -import "./key_test.ts"; +import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; +import { TestSuite } from "../test_util.ts"; +import { connect } from "../../experimental/cluster/mod.ts"; +import { + assertEquals, + assertThrowsAsync, +} from "../../vendor/https/deno.land/std/testing/asserts.ts"; +import { ErrorReplyError } from "../../errors.ts"; + +const suite = new TestSuite("cluster/key"); +const ports = nextPorts(6); +const cluster = await startRedisCluster(ports); +const client = await connect({ + nodes: ports.map((port) => ({ + hostname: "127.0.0.1", + port, + })), + maxConnections: ports.length, +}); + +suite.afterAll(() => { + stopRedisCluster(cluster); +}); + +suite.afterEach(() => { + client.close(); +}); + +suite.test("del multiple keys in the same hash slot", async () => { + await client.set("{hoge}foo", "a"); + await client.set("{hoge}bar", "b"); + const r = await client.del("{hoge}foo", "{hoge}bar"); + assertEquals(r, 2); +}); + +suite.test("del multiple keys in different hash slots", async () => { + await client.set("foo", "a"); + await client.set("bar", "b"); + await assertThrowsAsync( + async () => { + await client.del("foo", "bar"); + }, + ErrorReplyError, + "-CROSSSLOT Keys in request don't hash to the same slot", + ); +}); + +suite.runTests(); From 0654dd8c317a8bd327849db40b4d6e9c9fb7564e Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 17:00:53 +0900 Subject: [PATCH 40/64] docs: Add README files --- README.md | 6 ++++++ experimental/README.md | 8 ++++++++ experimental/cluster/README.md | 4 ++++ 3 files changed, 18 insertions(+) create mode 100644 experimental/README.md create mode 100644 experimental/cluster/README.md diff --git a/README.md b/README.md index 0630a807..b2f3c4be 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,12 @@ const sub = await cacheClient.subscribe("__redis__:invalidate"); })(); ``` +### Experimental features + +deno-redis provides some experimental features. + +See [experimental/README.md](experimental/README.md) for details. + ## Roadmap for v1 - See https://github.com/denodrivers/redis/issues/78 diff --git a/experimental/README.md b/experimental/README.md new file mode 100644 index 00000000..69bf7fd7 --- /dev/null +++ b/experimental/README.md @@ -0,0 +1,8 @@ +# Experimental features + +deno-redis has some experimental features: + +- [Redis Cluster client](cluster/README.md) + +**These experimental features may be subject to breaking changes even after a +major release.** diff --git a/experimental/cluster/README.md b/experimental/cluster/README.md new file mode 100644 index 00000000..873f5239 --- /dev/null +++ b/experimental/cluster/README.md @@ -0,0 +1,4 @@ +# experimental/cluster + +This module implements a client impelementation for +[the Redis Cluster](https://redis.io/topics/cluster-tutorial). From a5a49b53e2f0af8f62c82c2397a97fdee15df0ab Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 18:36:19 +0900 Subject: [PATCH 41/64] chore: Manage deps with dlink --- experimental/cluster/deps.ts | 5 ----- experimental/cluster/mod.ts | 5 ++++- modules-lock.json | 14 ++++++++++++++ modules.json | 14 ++++++++++++++ .../cdn.skypack.dev/cluster-key-slot/lib/index.js | 3 +++ vendor/https/cdn.skypack.dev/lodash-es/sample.js | 3 +++ vendor/https/cdn.skypack.dev/lodash-es/shuffle.js | 3 +++ vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js | 3 +++ 8 files changed, 44 insertions(+), 6 deletions(-) delete mode 100644 experimental/cluster/deps.ts create mode 100644 vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js create mode 100644 vendor/https/cdn.skypack.dev/lodash-es/sample.js create mode 100644 vendor/https/cdn.skypack.dev/lodash-es/shuffle.js create mode 100644 vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js diff --git a/experimental/cluster/deps.ts b/experimental/cluster/deps.ts deleted file mode 100644 index b8c4ebaf..00000000 --- a/experimental/cluster/deps.ts +++ /dev/null @@ -1,5 +0,0 @@ -// TODO: Manage dependencies using dlink -export { default as calculateSlot } from "https://cdn.skypack.dev/cluster-key-slot@1.1.0"; -export { default as shuffle } from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; -export { default as sample } from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; -export { default as uniqBy } from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 52c49727..d8895fdb 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -31,7 +31,10 @@ import type { Redis } from "../../redis.ts"; import type { RedisReply, RedisValue } from "../../protocol/mod.ts"; import { ErrorReplyError } from "../../errors.ts"; import { delay } from "../../vendor/https/deno.land/std/async/delay.ts"; -import { calculateSlot, sample, shuffle, uniqBy } from "./deps.ts"; +import calculateSlot from "../../vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js"; +import sample from "../../vendor/https/cdn.skypack.dev/lodash-es/sample.js"; +import shuffle from "../../vendor/https/cdn.skypack.dev/lodash-es/shuffle.js"; +import uniqBy from "../../vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js"; export interface ClusterConnectOptions { nodes: Array; diff --git a/modules-lock.json b/modules-lock.json index ef6d5ca1..d0839f54 100644 --- a/modules-lock.json +++ b/modules-lock.json @@ -11,5 +11,19 @@ "/io/util.ts", "/fmt/colors.ts" ] + }, + "https://cdn.skypack.dev/lodash-es": { + "version": "@4.17.21", + "modules": [ + "/sample.js", + "/shuffle.js", + "/uniqBy.js" + ] + }, + "https://cdn.skypack.dev/cluster-key-slot": { + "version": "@1.1.0", + "modules": [ + "/lib/index.js" + ] } } diff --git a/modules.json b/modules.json index ef6d5ca1..d0839f54 100644 --- a/modules.json +++ b/modules.json @@ -11,5 +11,19 @@ "/io/util.ts", "/fmt/colors.ts" ] + }, + "https://cdn.skypack.dev/lodash-es": { + "version": "@4.17.21", + "modules": [ + "/sample.js", + "/shuffle.js", + "/uniqBy.js" + ] + }, + "https://cdn.skypack.dev/cluster-key-slot": { + "version": "@1.1.0", + "modules": [ + "/lib/index.js" + ] } } diff --git a/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js b/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js new file mode 100644 index 00000000..e8c35431 --- /dev/null +++ b/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js @@ -0,0 +1,3 @@ +export * from "https://cdn.skypack.dev/cluster-key-slot@1.1.0/lib/index.js"; +import {default as dew} from "https://cdn.skypack.dev/cluster-key-slot@1.1.0/lib/index.js"; +export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/sample.js b/vendor/https/cdn.skypack.dev/lodash-es/sample.js new file mode 100644 index 00000000..c6af7032 --- /dev/null +++ b/vendor/https/cdn.skypack.dev/lodash-es/sample.js @@ -0,0 +1,3 @@ +export * from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; +import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; +export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js b/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js new file mode 100644 index 00000000..99cc5cea --- /dev/null +++ b/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js @@ -0,0 +1,3 @@ +export * from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; +import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; +export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js b/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js new file mode 100644 index 00000000..38e2d966 --- /dev/null +++ b/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js @@ -0,0 +1,3 @@ +export * from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; +import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; +export default dew; From 59adefafcc13ad2b76e47cfd4e121ede1fb39515 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 18:37:46 +0900 Subject: [PATCH 42/64] chore: Run "deno fmt" --- vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js | 2 +- vendor/https/cdn.skypack.dev/lodash-es/sample.js | 2 +- vendor/https/cdn.skypack.dev/lodash-es/shuffle.js | 2 +- vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js b/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js index e8c35431..5390950d 100644 --- a/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js +++ b/vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js @@ -1,3 +1,3 @@ export * from "https://cdn.skypack.dev/cluster-key-slot@1.1.0/lib/index.js"; -import {default as dew} from "https://cdn.skypack.dev/cluster-key-slot@1.1.0/lib/index.js"; +import { default as dew } from "https://cdn.skypack.dev/cluster-key-slot@1.1.0/lib/index.js"; export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/sample.js b/vendor/https/cdn.skypack.dev/lodash-es/sample.js index c6af7032..5e80f752 100644 --- a/vendor/https/cdn.skypack.dev/lodash-es/sample.js +++ b/vendor/https/cdn.skypack.dev/lodash-es/sample.js @@ -1,3 +1,3 @@ export * from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; -import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; +import { default as dew } from "https://cdn.skypack.dev/lodash-es@4.17.21/sample.js"; export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js b/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js index 99cc5cea..81123f20 100644 --- a/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js +++ b/vendor/https/cdn.skypack.dev/lodash-es/shuffle.js @@ -1,3 +1,3 @@ export * from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; -import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; +import { default as dew } from "https://cdn.skypack.dev/lodash-es@4.17.21/shuffle.js"; export default dew; diff --git a/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js b/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js index 38e2d966..b577f435 100644 --- a/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js +++ b/vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js @@ -1,3 +1,3 @@ export * from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; -import {default as dew} from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; +import { default as dew } from "https://cdn.skypack.dev/lodash-es@4.17.21/uniqBy.js"; export default dew; From cf3abcf4b624e02709560271b0f0515ba4323e1e Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 22:08:03 +0900 Subject: [PATCH 43/64] fix: Remove redisOptions and add newRedis instead --- experimental/cluster/mod.ts | 27 ++++++--------------------- tests/cluster/test.ts | 2 +- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index d8895fdb..124c6ba1 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -39,7 +39,7 @@ import uniqBy from "../../vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js"; export interface ClusterConnectOptions { nodes: Array; maxConnections: number; - redisOptions?: RedisConnectOptions; + newRedis?: (opts: RedisConnectOptions) => Promise; } export interface NodeOptions { @@ -75,14 +75,14 @@ class ClusterExecutor implements CommandExecutor { #refreshTableASAP?: boolean; #maxConnections: number; #connectionByNodeName: { [name: string]: Redis } = {}; - #redisOpts: RedisConnectOptions | undefined; + #newRedis: (opts: RedisConnectOptions) => Promise; constructor(opts: ClusterConnectOptions) { this.#startupNodes = opts.nodes.map((node) => new ClusterNode(node.hostname, node.port ?? 6379) ); this.#maxConnections = opts.maxConnections; - this.#redisOpts = opts.redisOptions; + this.#newRedis = opts.newRedis ?? connect; } get connection(): Connection { @@ -179,7 +179,7 @@ class ClusterExecutor implements CommandExecutor { async initializeSlotsCache(): Promise { for (const node of this.#startupNodes) { try { - const redis = await getRedisLink(node, this.#redisOpts); + const redis = await this.#newRedis(node); try { const clusterSlots = await redis.clusterSlots() as Array< [number, number, [string, number]] @@ -235,7 +235,7 @@ class ClusterExecutor implements CommandExecutor { return conn; } } else { - conn = await getRedisLink(node, this.#redisOpts); + conn = await this.#newRedis(node); try { const message = await conn.ping(); if (message === "PONG") { @@ -265,7 +265,7 @@ class ClusterExecutor implements CommandExecutor { if (!conn) { try { await this.#closeExistingConnection(); - conn = await getRedisLink(node, this.#redisOpts); + conn = await this.#newRedis(node); this.#connectionByNodeName[node.name] = conn; } catch { return this.#getRandomConnection(); @@ -289,21 +289,6 @@ class ClusterExecutor implements CommandExecutor { } } -function getRedisLink( - node: ClusterNode, - opts: RedisConnectOptions | undefined, -): Promise { - if (opts) { - return connect({ - ...opts, - hostname: node.hostname, - port: node.port, - }); - } else { - return connect(node); - } -} - function getKeyFromCommand(command: string, args: RedisValue[]): string | null { switch (command.toLowerCase()) { case "info": diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index da1f3132..ed52039f 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -7,7 +7,7 @@ import { } from "../../vendor/https/deno.land/std/testing/asserts.ts"; import { ErrorReplyError } from "../../errors.ts"; -const suite = new TestSuite("cluster/key"); +const suite = new TestSuite("cluster/client"); const ports = nextPorts(6); const cluster = await startRedisCluster(ports); const client = await connect({ From 03568fccab43f996e9668cefc30b98598b3def46 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 22:28:26 +0900 Subject: [PATCH 44/64] test: Verifies a -MOVED error is correctly handled --- experimental/cluster/mod.ts | 4 +-- tests/cluster/test.ts | 65 +++++++++++++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 9 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 124c6ba1..4d6b74af 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -307,7 +307,7 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { * @see https://redis.io/topics/cluster-tutorial * @see https://redis.io/topics/cluster-spec */ -async function connectCluster(opts: ClusterConnectOptions) { +async function connectToCluster(opts: ClusterConnectOptions) { const executor = new ClusterExecutor(opts); await executor.initializeSlotsCache(); const redis = new RedisImpl(executor); @@ -320,4 +320,4 @@ async function connectCluster(opts: ClusterConnectOptions) { return Object.assign(redis, { close }); } -export { connectCluster as connect }; +export { connectToCluster as connect }; diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index ed52039f..a762b793 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -1,21 +1,28 @@ import { nextPorts, startRedisCluster, stopRedisCluster } from "./test_util.ts"; import { TestSuite } from "../test_util.ts"; -import { connect } from "../../experimental/cluster/mod.ts"; +import { connect as connectToCluster } from "../../experimental/cluster/mod.ts"; import { + assert, assertEquals, assertThrowsAsync, } from "../../vendor/https/deno.land/std/testing/asserts.ts"; +import sample from "../../vendor/https/cdn.skypack.dev/lodash-es/shuffle.js"; +import calculateSlot from "../../vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js"; import { ErrorReplyError } from "../../errors.ts"; +import { connect, RedisImpl } from "../../redis.ts"; +import type { CommandExecutor } from "../../executor.ts"; const suite = new TestSuite("cluster/client"); const ports = nextPorts(6); const cluster = await startRedisCluster(ports); -const client = await connect({ - nodes: ports.map((port) => ({ - hostname: "127.0.0.1", - port, - })), - maxConnections: ports.length, +const nodes = ports.map((port) => ({ + hostname: "127.0.0.1", + port, +})); +const maxConnections = nodes.length; +const client = await connectToCluster({ + nodes, + maxConnections, }); suite.afterAll(() => { @@ -45,4 +52,48 @@ suite.test("del multiple keys in different hash slots", async () => { ); }); +suite.test("handle a -MOVED redirection error", async () => { + let redirected = false; + const client = await connectToCluster({ + nodes, + maxConnections, + async newRedis(opts) { + const redis = await connect(opts); + const { hostname, port } = opts; + const proxyExecutor = { + get connection() { + return redis.executor.connection; + }, + async exec(cmd, ...args) { + if (cmd === "GET" && !redirected) { + // Manually cause a -MOVED redirection error + const [key] = args; + assert(typeof key === "string"); + const slot = calculateSlot(key); + const randomPort = sample(ports.filter((x) => x !== port)); + const error = new ErrorReplyError( + `-MOVED ${slot} ${hostname}:${randomPort}`, + ); + redirected = true; + throw error; + } else { + const reply = await redis.executor.exec(cmd, ...args); + return reply; + } + }, + } as CommandExecutor; + return new RedisImpl(proxyExecutor); + }, + }); + + try { + await client.set("foo", "bar"); + const r = await client.get("foo"); + assertEquals(r, "bar"); + assert(redirected); + } finally { + client.close(); + } +}); + suite.runTests(); From d1c7d8ed930d4d9ae5fefefaab24d77b7d868fd7 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 22:31:04 +0900 Subject: [PATCH 45/64] chore: Remove debugging things --- experimental/cluster/mod.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 4d6b74af..3ebc3e28 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -63,11 +63,6 @@ const kRedisClusterRequestTTL = 16; class ClusterError extends Error {} -// TODO: Remove this! -function debug(x: unknown): void { - console.log(x); -} - class ClusterExecutor implements CommandExecutor { #nodes!: ClusterNode[]; #slots!: SlotMap; @@ -106,7 +101,6 @@ class ClusterExecutor implements CommandExecutor { ); } const slot = calculateSlot(key); - debug(`Slot is ${slot}`); let r: Redis; if (tryRandomNode) { r = await this.#getRandomConnection(); @@ -131,9 +125,7 @@ class ClusterExecutor implements CommandExecutor { } continue; } else if (err instanceof ErrorReplyError) { - debug(err); const [code, newSlot, ipAndPort] = err.message.split(/\s+/); - debug([code, newSlot, ipAndPort]); if (code === "-MOVED" || code === "-ASK") { if (code === "-ASK") { asking = true; @@ -150,7 +142,6 @@ class ClusterExecutor implements CommandExecutor { ); } } else { - debug(err); throw err; } } else { From 0189a0cd81fc28a4babce94c86985a6929c2081f Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 22:59:15 +0900 Subject: [PATCH 46/64] test: Improve a test for a -MOVED error --- tests/cluster/test.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index a762b793..bbad0e87 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -3,6 +3,7 @@ import { TestSuite } from "../test_util.ts"; import { connect as connectToCluster } from "../../experimental/cluster/mod.ts"; import { assert, + assertArrayIncludes, assertEquals, assertThrowsAsync, } from "../../vendor/https/deno.land/std/testing/asserts.ts"; @@ -54,12 +55,15 @@ suite.test("del multiple keys in different hash slots", async () => { suite.test("handle a -MOVED redirection error", async () => { let redirected = false; + let manuallyRedirectedPort!: number; + const portsSent = new Set(); const client = await connectToCluster({ nodes, maxConnections, async newRedis(opts) { const redis = await connect(opts); const { hostname, port } = opts; + assert(port != null); const proxyExecutor = { get connection() { return redis.executor.connection; @@ -70,13 +74,14 @@ suite.test("handle a -MOVED redirection error", async () => { const [key] = args; assert(typeof key === "string"); const slot = calculateSlot(key); - const randomPort = sample(ports.filter((x) => x !== port)); + manuallyRedirectedPort = sample(ports.filter((x) => x !== port)); const error = new ErrorReplyError( - `-MOVED ${slot} ${hostname}:${randomPort}`, + `-MOVED ${slot} ${hostname}:${manuallyRedirectedPort}`, ); redirected = true; throw error; } else { + portsSent.add(Number(port)); const reply = await redis.executor.exec(cmd, ...args); return reply; } @@ -91,6 +96,8 @@ suite.test("handle a -MOVED redirection error", async () => { const r = await client.get("foo"); assertEquals(r, "bar"); assert(redirected); + // Check if a cluster client correctly handles a -MOVED error + assertArrayIncludes(Array.from(portsSent), [manuallyRedirectedPort]); } finally { client.close(); } From 91fdbba1d72d0c761c2f275c769b92830707a878 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 23:09:48 +0900 Subject: [PATCH 47/64] fix: Update a test for a -MOVED error --- tests/cluster/test.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index bbad0e87..1825f73a 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -62,8 +62,7 @@ suite.test("handle a -MOVED redirection error", async () => { maxConnections, async newRedis(opts) { const redis = await connect(opts); - const { hostname, port } = opts; - assert(port != null); + assert(opts.port != null); const proxyExecutor = { get connection() { return redis.executor.connection; @@ -74,14 +73,17 @@ suite.test("handle a -MOVED redirection error", async () => { const [key] = args; assert(typeof key === "string"); const slot = calculateSlot(key); - manuallyRedirectedPort = sample(ports.filter((x) => x !== port)); + manuallyRedirectedPort = sample( + ports.filter((x) => x !== opts.port), + ); const error = new ErrorReplyError( - `-MOVED ${slot} ${hostname}:${manuallyRedirectedPort}`, + `-MOVED ${slot} ${opts.hostname}:${manuallyRedirectedPort}`, ); redirected = true; throw error; } else { - portsSent.add(Number(port)); + assert(opts.port); + portsSent.add(Number(opts.port)); const reply = await redis.executor.exec(cmd, ...args); return reply; } @@ -97,7 +99,7 @@ suite.test("handle a -MOVED redirection error", async () => { assertEquals(r, "bar"); assert(redirected); // Check if a cluster client correctly handles a -MOVED error - assertArrayIncludes(Array.from(portsSent), [manuallyRedirectedPort]); + assertArrayIncludes([...portsSent], [manuallyRedirectedPort]); } finally { client.close(); } From 45bdfdcf9ec2cf6398851d625058243b9086f0b7 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 23:13:27 +0900 Subject: [PATCH 48/64] fix: Mistakenly imported shuffle.js instead of sample.js... --- tests/cluster/test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index 1825f73a..72cf1b5c 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -7,7 +7,7 @@ import { assertEquals, assertThrowsAsync, } from "../../vendor/https/deno.land/std/testing/asserts.ts"; -import sample from "../../vendor/https/cdn.skypack.dev/lodash-es/shuffle.js"; +import sample from "../../vendor/https/cdn.skypack.dev/lodash-es/sample.js"; import calculateSlot from "../../vendor/https/cdn.skypack.dev/cluster-key-slot/lib/index.js"; import { ErrorReplyError } from "../../errors.ts"; import { connect, RedisImpl } from "../../redis.ts"; From c746bc4a6cff457db28c3e802b8dd9dc23f405d0 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sat, 10 Jul 2021 23:22:56 +0900 Subject: [PATCH 49/64] docs: Update experimental/cluster/README.md --- experimental/cluster/README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/experimental/cluster/README.md b/experimental/cluster/README.md index 873f5239..15b8eda7 100644 --- a/experimental/cluster/README.md +++ b/experimental/cluster/README.md @@ -1,4 +1,30 @@ # experimental/cluster +[![deno doc](https://doc.deno.land/badge.svg)](https://doc.deno.land/https/deno.land/x/redis/experimental/cluster/mod.ts) + This module implements a client impelementation for [the Redis Cluster](https://redis.io/topics/cluster-tutorial). + +The implementation is based on the +[antirez/redis-rb-cluster](https://github.com/antirez/redis-rb-cluster). + +## Usage + +```typescript +import { connect } from "https://deno.land/x/redis/experimental/cluster/mod.ts"; + +const cluster = await connect({ + nodes: [ + { + hostname: "127.0.0.1", + port: 7000, + }, + { + hostname: "127.0.0.1", + port: 7001, + }, + ], +}); + +await cluster.get("{foo}bar"); +``` From 76c9ce14f96869d90d6af169ec8eff56709fdffd Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 00:29:21 +0900 Subject: [PATCH 50/64] chore: Remove duplicated code --- experimental/cluster/mod.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 3ebc3e28..d6816199 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -180,12 +180,7 @@ class ClusterExecutor implements CommandExecutor { for (const [from, to, master] of clusterSlots) { for (let slot = from; slot <= to; slot++) { const [ip, port] = master; - const name = `${ip}:${port}`; - const node = { - name, - hostname: ip, - port, - }; + const node = new ClusterNode(ip, port); nodes.push(node); slotMap[slot] = node; } From 3f4fa7e1e41c55c153e39e3763a8f0415903434d Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 00:30:28 +0900 Subject: [PATCH 51/64] fix: Make maxConnections optional --- experimental/cluster/mod.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index d6816199..41f096a3 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -38,7 +38,7 @@ import uniqBy from "../../vendor/https/cdn.skypack.dev/lodash-es/uniqBy.js"; export interface ClusterConnectOptions { nodes: Array; - maxConnections: number; + maxConnections?: number; newRedis?: (opts: RedisConnectOptions) => Promise; } @@ -76,7 +76,7 @@ class ClusterExecutor implements CommandExecutor { this.#startupNodes = opts.nodes.map((node) => new ClusterNode(node.hostname, node.port ?? 6379) ); - this.#maxConnections = opts.maxConnections; + this.#maxConnections = opts.maxConnections ?? 50; // TODO(uki00a): To be honest, I'm not sure if this default value is appropriate... this.#newRedis = opts.newRedis ?? connect; } From d7928b5134ea31dabc9d8f275abe7792a9867c25 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 00:45:01 +0900 Subject: [PATCH 52/64] chore: Add #getRedisLink method --- experimental/cluster/mod.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 41f096a3..744c75d7 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -170,7 +170,7 @@ class ClusterExecutor implements CommandExecutor { async initializeSlotsCache(): Promise { for (const node of this.#startupNodes) { try { - const redis = await this.#newRedis(node); + const redis = await this.#getRedisLink(node); try { const clusterSlots = await redis.clusterSlots() as Array< [number, number, [string, number]] @@ -221,7 +221,7 @@ class ClusterExecutor implements CommandExecutor { return conn; } } else { - conn = await this.#newRedis(node); + conn = await this.#getRedisLink(node); try { const message = await conn.ping(); if (message === "PONG") { @@ -251,7 +251,7 @@ class ClusterExecutor implements CommandExecutor { if (!conn) { try { await this.#closeExistingConnection(); - conn = await this.#newRedis(node); + conn = await this.#getRedisLink(node); this.#connectionByNodeName[node.name] = conn; } catch { return this.#getRandomConnection(); @@ -273,6 +273,11 @@ class ClusterExecutor implements CommandExecutor { } } } + + #getRedisLink(node: ClusterNode): Promise { + const { hostname, port } = node; + return this.#newRedis({ hostname, port }); + } } function getKeyFromCommand(command: string, args: RedisValue[]): string | null { From e7aed829e269864bdf7068dbf799b6db9be26221 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:21:07 +0900 Subject: [PATCH 53/64] test: Verify that a -ASK redirection error is correctly handled --- tests/cluster/test.ts | 60 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index 72cf1b5c..659ba2a9 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -20,11 +20,7 @@ const nodes = ports.map((port) => ({ hostname: "127.0.0.1", port, })); -const maxConnections = nodes.length; -const client = await connectToCluster({ - nodes, - maxConnections, -}); +const client = await connectToCluster({ nodes }); suite.afterAll(() => { stopRedisCluster(cluster); @@ -59,7 +55,6 @@ suite.test("handle a -MOVED redirection error", async () => { const portsSent = new Set(); const client = await connectToCluster({ nodes, - maxConnections, async newRedis(opts) { const redis = await connect(opts); assert(opts.port != null); @@ -97,9 +92,62 @@ suite.test("handle a -MOVED redirection error", async () => { await client.set("foo", "bar"); const r = await client.get("foo"); assertEquals(r, "bar"); + // Check if a cluster client correctly handles a -MOVED error assert(redirected); + assertArrayIncludes([...portsSent], [manuallyRedirectedPort]); + } finally { + client.close(); + } +}); + +suite.test("handle a -ASK redirection error", async () => { + let redirected = false; + let manuallyRedirectedPort!: number; + const portsSent = new Set(); + const commandsSent = new Set(); + const client = await connectToCluster({ + nodes, + async newRedis(opts) { + const redis = await connect(opts); + assert(opts.port != null); + const proxyExecutor = { + get connection() { + return redis.executor.connection; + }, + async exec(cmd, ...args) { + commandsSent.add(cmd); + if (cmd === "GET" && !redirected) { + // Manually cause a -ASK redirection error + const [key] = args; + assert(typeof key === "string"); + const slot = calculateSlot(key); + manuallyRedirectedPort = sample( + ports.filter((x) => x !== opts.port), + ); + const error = new ErrorReplyError( + `-ASK ${slot} ${opts.hostname}:${manuallyRedirectedPort}`, + ); + redirected = true; + throw error; + } else { + assert(opts.port); + portsSent.add(Number(opts.port)); + const reply = await redis.executor.exec(cmd, ...args); + return reply; + } + }, + } as CommandExecutor; + return new RedisImpl(proxyExecutor); + }, + }); + try { + await client.set("hoge", "piyo"); + const r = await client.get("hoge"); + assertEquals(r, "piyo"); // Check if a cluster client correctly handles a -MOVED error + assert(redirected); assertArrayIncludes([...portsSent], [manuallyRedirectedPort]); + assertArrayIncludes([...commandsSent], ["ASKING"]); } finally { client.close(); } From fdfbb90f03d9e2ddc8f4a79eb4dd7fb97627191e Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:31:50 +0900 Subject: [PATCH 54/64] fix: Try to correctly handle -ASK errors --- experimental/cluster/mod.ts | 38 +++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 744c75d7..5831aaa8 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -65,7 +65,7 @@ class ClusterError extends Error {} class ClusterExecutor implements CommandExecutor { #nodes!: ClusterNode[]; - #slots!: SlotMap; + #nodeBySlot!: SlotMap; #startupNodes: ClusterNode[]; #refreshTableASAP?: boolean; #maxConnections: number; @@ -89,6 +89,7 @@ class ClusterExecutor implements CommandExecutor { await this.initializeSlotsCache(); } let asking = false; + let askingNode: ClusterNode | null = null; let tryRandomNode = false; let ttl = kRedisClusterRequestTTL; let lastError: null | Error; @@ -102,7 +103,10 @@ class ClusterExecutor implements CommandExecutor { } const slot = calculateSlot(key); let r: Redis; - if (tryRandomNode) { + if (asking && askingNode) { + r = await this.#getConnectionByNode(askingNode); + askingNode = null; + } else if (tryRandomNode) { r = await this.#getRandomConnection(); tryRandomNode = false; } else { @@ -134,12 +138,16 @@ class ClusterExecutor implements CommandExecutor { // ask for CLUSTER NODES the next time. this.#refreshTableASAP = true; } - if (!asking) { - const [ip, port] = ipAndPort.split(":"); - this.#slots[parseInt(newSlot)] = new ClusterNode( - ip, - parseInt(port), - ); + const [ip, port] = ipAndPort.split(":"); + const node = new ClusterNode( + ip, + parseInt(port), + ); + if (asking) { + // Server replied with -ASK. We should send the next query to the redirected node. + askingNode = node; + } else { + this.#nodeBySlot[parseInt(newSlot)] = node; } } else { throw err; @@ -186,7 +194,7 @@ class ClusterExecutor implements CommandExecutor { } } this.#nodes = nodes; - this.#slots = slotMap; + this.#nodeBySlot = slotMap; await this.#populateStartupNodes(); this.#refreshTableASAP = false; return; @@ -243,21 +251,27 @@ class ClusterExecutor implements CommandExecutor { } async #getConnectionBySlot(slot: number): Promise { - const node = this.#slots[slot]; + const node = this.#nodeBySlot[slot]; if (!node) { return this.#getRandomConnection(); } + return this.#getConnectionByNode(node); + } + + async #getConnectionByNode(node: ClusterNode): Promise { let conn = this.#connectionByNodeName[node.name]; - if (!conn) { + if (conn) { + return conn; + } else { try { await this.#closeExistingConnection(); conn = await this.#getRedisLink(node); this.#connectionByNodeName[node.name] = conn; + return conn; } catch { return this.#getRandomConnection(); } } - return conn; } async #closeExistingConnection() { From e3f07263b3485ddb956ceb7f75b3fd03f605ead5 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:33:56 +0900 Subject: [PATCH 55/64] chore: Lint --- experimental/cluster/mod.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 5831aaa8..7ae0e1c9 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -250,12 +250,13 @@ class ClusterExecutor implements CommandExecutor { throw new ClusterError("Can't reach a single startup node."); } - async #getConnectionBySlot(slot: number): Promise { + #getConnectionBySlot(slot: number): Promise { const node = this.#nodeBySlot[slot]; - if (!node) { + if (node) { + return this.#getConnectionByNode(node); + } else { return this.#getRandomConnection(); } - return this.#getConnectionByNode(node); } async #getConnectionByNode(node: ClusterNode): Promise { From e7dbe953a8fcbf6c08139ef949997d98d0e478c6 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:37:03 +0900 Subject: [PATCH 56/64] docs: Improve docs --- command.ts | 3 +++ experimental/cluster/README.md | 2 +- experimental/cluster/mod.ts | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/command.ts b/command.ts index 8e55ff8f..83584a1e 100644 --- a/command.ts +++ b/command.ts @@ -1101,6 +1101,9 @@ XRANGE somestream - + clientUnpause(): Promise; // Cluster + /** + * @see https://redis.io/topics/cluster-spec + */ asking(): Promise; clusterAddSlots(...slots: number[]): Promise; clusterCountFailureReports(node_id: string): Promise; diff --git a/experimental/cluster/README.md b/experimental/cluster/README.md index 15b8eda7..edc707e5 100644 --- a/experimental/cluster/README.md +++ b/experimental/cluster/README.md @@ -2,7 +2,7 @@ [![deno doc](https://doc.deno.land/badge.svg)](https://doc.deno.land/https/deno.land/x/redis/experimental/cluster/mod.ts) -This module implements a client impelementation for +This module provides a client impelementation for [the Redis Cluster](https://redis.io/topics/cluster-tutorial). The implementation is based on the diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 7ae0e1c9..44814085 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -310,10 +310,12 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { } /** + * Connects to the Redis Cluster. + * * @see https://redis.io/topics/cluster-tutorial * @see https://redis.io/topics/cluster-spec */ -async function connectToCluster(opts: ClusterConnectOptions) { +async function connectToCluster(opts: ClusterConnectOptions): Promise { const executor = new ClusterExecutor(opts); await executor.initializeSlotsCache(); const redis = new RedisImpl(executor); From 84e61967384084f1dffb6dc6830eb9d9e8d98beb Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:38:44 +0900 Subject: [PATCH 57/64] chore: Move pasing logig to ClusterNode --- experimental/cluster/mod.ts | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 44814085..637a587f 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -57,6 +57,15 @@ class ClusterNode { constructor(readonly hostname: string, readonly port: number) { this.name = `${hostname}:${port}`; } + + static parseIPAndPort(ipAndPort: string): ClusterNode { + const [ip, port] = ipAndPort.split(":"); + const node = new ClusterNode( + ip, + parseInt(port), + ); + return node; + } } const kRedisClusterRequestTTL = 16; @@ -138,11 +147,7 @@ class ClusterExecutor implements CommandExecutor { // ask for CLUSTER NODES the next time. this.#refreshTableASAP = true; } - const [ip, port] = ipAndPort.split(":"); - const node = new ClusterNode( - ip, - parseInt(port), - ); + const node = ClusterNode.parseIPAndPort(ipAndPort); if (asking) { // Server replied with -ASK. We should send the next query to the redirected node. askingNode = node; From 3a0d5ad5dc1c818fda2f43a2954381e55c67d7c9 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:41:00 +0900 Subject: [PATCH 58/64] chore: Make config files in tests/tmp directory --- .gitignore | 2 +- tests/cluster/test_util.ts | 2 +- tests/test_util.ts | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 33fbe5ef..86f2aa32 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ commands testdata/*/ benchmark/node_modules benchmark/benchmark.js -tests/server/*/ \ No newline at end of file +tests/tmp \ No newline at end of file diff --git a/tests/cluster/test_util.ts b/tests/cluster/test_util.ts index 608fd5a1..ca7f10cf 100644 --- a/tests/cluster/test_util.ts +++ b/tests/cluster/test_util.ts @@ -13,7 +13,7 @@ export async function startRedisCluster(ports: number[]): Promise { port, clusterEnabled: true, additionalConfigurations: [ - // TODO: The configuration file should be created in `test/server/` directory. + // TODO: The configuration file should be created in `test/tmp/` directory. `cluster-config-file ${port}_nodes.conf`, ], }) diff --git a/tests/test_util.ts b/tests/test_util.ts index 06f07060..735c38c2 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -66,11 +66,11 @@ export async function startRedis({ clusterEnabled = false, additionalConfigurations = [] as string[], }): Promise { - const path = `tests/server/${port}`; + const path = `tests/tmp/${port}`; if (!(await exists(path))) { Deno.mkdirSync(path); - Deno.copyFileSync(`tests/server/redis.conf`, `${path}/redis.conf`); + Deno.copyFileSync(`tests/tmp/redis.conf`, `${path}/redis.conf`); let config = `dir ${path}\nport ${port}\n`; config += clusterEnabled ? "cluster-enabled yes" : ""; From 526722a555d68e14a7066de26704ea3f76ec28fc Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:41:16 +0900 Subject: [PATCH 59/64] Revert "test: Temporary disable all tests except those for redis cluster" This reverts commit 2107ec65abfa58771ad2030f2df75944d0365094. --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7b9aeb2d..01872cac 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -50,7 +50,7 @@ jobs: - name: Run tests run: | - deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli tests/cluster/test.ts # deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts + deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts - uses: bahmutov/npm-install@v1 with: working-directory: benchmark From 005b63edb403a34500d9e6bbfa6cf5e10846497d Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:42:17 +0900 Subject: [PATCH 60/64] chore: Run "deno fmt" --- experimental/cluster/mod.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index 637a587f..dbeaf9a7 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -316,7 +316,7 @@ function getKeyFromCommand(command: string, args: RedisValue[]): string | null { /** * Connects to the Redis Cluster. - * + * * @see https://redis.io/topics/cluster-tutorial * @see https://redis.io/topics/cluster-spec */ From dc07df6bdf9a504d3dc89740d1c4e536e484d0c3 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:44:39 +0900 Subject: [PATCH 61/64] ci: Fix build.yml --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 01872cac..a30240c3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -50,7 +50,7 @@ jobs: - name: Run tests run: | - deno test --allow-net --allow-read=tests --allow-write=tests/server --allow-run=redis-server,redis-cli redis_test.ts + deno test --allow-net --allow-read=tests --allow-write=tests/tmp --allow-run=redis-server,redis-cli redis_test.ts - uses: bahmutov/npm-install@v1 with: working-directory: benchmark From 3f7b9ec19b8f59fc21af8a3f3dd1e94db4c65a85 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:47:09 +0900 Subject: [PATCH 62/64] fix: Add tests/tmp/.gitkeep --- .gitignore | 2 +- tests/tmp/.gitkeep | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 tests/tmp/.gitkeep diff --git a/.gitignore b/.gitignore index 86f2aa32..b65c471c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ commands testdata/*/ benchmark/node_modules benchmark/benchmark.js -tests/tmp \ No newline at end of file +tests/tmp/*/ \ No newline at end of file diff --git a/tests/tmp/.gitkeep b/tests/tmp/.gitkeep new file mode 100644 index 00000000..e69de29b From 43b5406881cb3dac1af49c3502db94f9f52c7f17 Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 01:48:41 +0900 Subject: [PATCH 63/64] fix: redis.conf was not correctly copied... --- tests/test_util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_util.ts b/tests/test_util.ts index 735c38c2..39cebdb0 100644 --- a/tests/test_util.ts +++ b/tests/test_util.ts @@ -70,7 +70,7 @@ export async function startRedis({ if (!(await exists(path))) { Deno.mkdirSync(path); - Deno.copyFileSync(`tests/tmp/redis.conf`, `${path}/redis.conf`); + Deno.copyFileSync(`tests/server/redis.conf`, `${path}/redis.conf`); let config = `dir ${path}\nport ${port}\n`; config += clusterEnabled ? "cluster-enabled yes" : ""; From e111313c4d1d6f8fe99294b8af4d11c7ae86c3cd Mon Sep 17 00:00:00 2001 From: uki00a Date: Sun, 11 Jul 2021 02:08:43 +0900 Subject: [PATCH 64/64] chore: Minor fixes --- experimental/cluster/mod.ts | 2 +- tests/cluster/test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/experimental/cluster/mod.ts b/experimental/cluster/mod.ts index dbeaf9a7..504b301f 100644 --- a/experimental/cluster/mod.ts +++ b/experimental/cluster/mod.ts @@ -143,7 +143,7 @@ class ClusterExecutor implements CommandExecutor { if (code === "-ASK") { asking = true; } else { - // Serve replied with MOVED. It's better for us to + // Server replied with MOVED. It's better for us to // ask for CLUSTER NODES the next time. this.#refreshTableASAP = true; } diff --git a/tests/cluster/test.ts b/tests/cluster/test.ts index 659ba2a9..0b52804e 100644 --- a/tests/cluster/test.ts +++ b/tests/cluster/test.ts @@ -144,7 +144,7 @@ suite.test("handle a -ASK redirection error", async () => { await client.set("hoge", "piyo"); const r = await client.get("hoge"); assertEquals(r, "piyo"); - // Check if a cluster client correctly handles a -MOVED error + // Check if a cluster client correctly handles a -ASK error assert(redirected); assertArrayIncludes([...portsSent], [manuallyRedirectedPort]); assertArrayIncludes([...commandsSent], ["ASKING"]);