Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatic reconnection #349

Merged
merged 22 commits into from
Dec 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { sendCommand } from "./protocol/mod.ts";
import type { Raw, RedisValue } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError } from "./errors.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
import {
BufReader,
BufWriter,
Expand All @@ -21,6 +21,7 @@ export interface Connection {
close(): void;
connect(): Promise<void>;
reconnect(): Promise<void>;
sendCommand(command: string, args?: Array<RedisValue>): Promise<RedisReply>;
}

export interface RedisConnectionOptions {
Expand All @@ -36,6 +37,8 @@ export interface RedisConnectionOptions {
backoff?: Backoff;
}

const kEmptyRedisArgs: Array<RedisValue> = [];

export class RedisConnection implements Connection {
name: string | null = null;
closer!: Closer;
Expand All @@ -45,7 +48,6 @@ export class RedisConnection implements Connection {

private readonly hostname: string;
private readonly port: number | string;
private retryCount = 0;
private _isClosed = false;
private _isConnected = false;
private backoff: Backoff;
Expand Down Expand Up @@ -84,8 +86,8 @@ export class RedisConnection implements Connection {
): Promise<void> {
try {
password && username
? await this.sendCommand("AUTH", username, password)
: await this.sendCommand("AUTH", password);
? await this.sendCommand("AUTH", [username, password])
: await this.sendCommand("AUTH", [password]);
} catch (error) {
if (error instanceof ErrorReplyError) {
throw new AuthenticationError("Authentication failed", {
Expand All @@ -101,21 +103,61 @@ export class RedisConnection implements Connection {
db: number | undefined = this.options.db,
): Promise<void> {
if (!db) throw new Error("The database index is undefined.");
await this.sendCommand("SELECT", db);
await this.sendCommand("SELECT", [db]);
}

private async sendCommand(
async sendCommand(
command: string,
...args: Array<RedisValue>
): Promise<Raw> {
const reply = await sendCommand(this.writer, this.reader, command, ...args);
return reply.value();
args?: Array<RedisValue>,
): Promise<RedisReply> {
try {
const reply = await sendCommand(
this.writer,
this.reader,
command,
args ?? kEmptyRedisArgs,
);
return reply;
} catch (error) {
if (
!isRetriableError(error) ||
this.isManuallyClosedByUser()
) {
throw error;
}

for (let i = 0; i < this.maxRetryCount; i++) {
// Try to reconnect to the server and retry the command
this.close();
try {
await this.connect();

const reply = await sendCommand(
this.writer,
this.reader,
command,
args ?? kEmptyRedisArgs,
);

return reply;
} catch { // TODO: use `AggregateError`?
const backoff = this.backoff(i);
await delay(backoff);
}
}

throw error;
}
}

/**
* Connect to Redis server
*/
async connect(): Promise<void> {
await this.#connect(0);
}

async #connect(retryCount: number) {
try {
const dialOpts: Deno.ConnectOptions = {
hostname: this.hostname,
Expand All @@ -142,21 +184,18 @@ export class RedisConnection implements Connection {
this.close();
throw error;
}
this.retryCount = 0;
} catch (error) {
if (error instanceof AuthenticationError) {
this.retryCount = 0;
throw (error.cause ?? error);
}

if (this.retryCount++ >= this.maxRetryCount) {
this.retryCount = 0;
const backoff = this.backoff(retryCount);
retryCount++;
if (retryCount >= this.maxRetryCount) {
throw error;
}

const backoff = this.backoff(this.retryCount);
await delay(backoff);
await this.connect();
await this.#connect(retryCount);
}
}

Expand All @@ -183,6 +222,10 @@ export class RedisConnection implements Connection {
await this.sendCommand("PING");
}
}

private isManuallyClosedByUser(): boolean {
return this._isClosed && !this._isConnected;
}
}

class AuthenticationError extends Error {}
Expand Down
9 changes: 9 additions & 0 deletions errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ export class InvalidStateError extends Error {
super(message ? `${base}: ${message}` : base);
}
}

export function isRetriableError(error: Error): boolean {
return (error instanceof Deno.errors.BadResource ||
error instanceof Deno.errors.BrokenPipe ||
error instanceof Deno.errors.ConnectionAborted ||
error instanceof Deno.errors.ConnectionRefused ||
error instanceof Deno.errors.ConnectionReset ||
error instanceof EOFError);
}
26 changes: 2 additions & 24 deletions executor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import type { Connection } from "./connection.ts";
import { EOFError } from "./errors.ts";
import {
Deferred,
deferred,
} from "./vendor/https/deno.land/std/async/deferred.ts";
import { sendCommand } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";

export interface CommandExecutor {
Expand Down Expand Up @@ -48,29 +46,9 @@ export class MuxExecutor implements CommandExecutor {
private dequeue(): void {
const [e] = this.queue;
if (!e) return;
sendCommand(
this.connection.writer,
this.connection.reader,
e.command,
...e.args,
)
this.connection.sendCommand(e.command, e.args)
.then(e.d.resolve)
.catch(async (error) => {
if (
this.connection.maxRetryCount > 0 &&
// Error `BadResource` is thrown when an attempt is made to write to a closed connection,
// Make sure that the connection wasn't explicitly closed by the user before trying to reconnect.
((error instanceof Deno.errors.BadResource &&
!this.connection.isClosed) ||
error instanceof Deno.errors.BrokenPipe ||
error instanceof Deno.errors.ConnectionAborted ||
error instanceof Deno.errors.ConnectionRefused ||
error instanceof Deno.errors.ConnectionReset ||
error instanceof EOFError)
) {
await this.connection.reconnect();
} else e.d.reject(error);
})
.catch(e.d.reject)
.finally(() => {
this.queue.shift();
this.dequeue();
Expand Down
2 changes: 1 addition & 1 deletion protocol/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export async function sendCommand(
writer: BufWriter,
reader: BufReader,
command: string,
...args: RedisValue[]
args: RedisValue[],
): Promise<RedisReply> {
await writeRequest(writer, command, args);
await writer.flush();
Expand Down
34 changes: 34 additions & 0 deletions tests/commands/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,40 @@ export function generalTests(
});
});

describe("automatic reconnection", () => {
it("reconnects when the connection is lost", async () => {
const tempClient = await newClient(getOpts());
try {
const id = await tempClient.clientID();
await client.clientKill({ id });
const reply = await tempClient.ping();
assertEquals(reply, "PONG");
} finally {
tempClient.close();
}
});

it("fails when max retry count is exceeded", async () => {
const tempClient = await newClient({
...getOpts(),
maxRetryCount: 0,
});
try {
const id = await tempClient.clientID();
await client.clientKill({ id });
await assertRejects(() => tempClient.ping());
} finally {
tempClient.close();
}
});

it("does not reconnect when the connection is manually closed by the user", async () => {
const tempClient = await newClient(getOpts());
tempClient.close();
await assertRejects(() => tempClient.ping());
});
});

describe("createLazyClient", () => {
it("returns the lazily connected client", async () => {
const opts = getOpts();
Expand Down