Skip to content

Commit

Permalink
Implement retriable connection
Browse files Browse the repository at this point in the history
At the moment, a client will die if the server is killed or the connection is lost. This is a problem especially with clients subscribed to a channel.
Since the Writer/Reader buffers are passed around to I/O functions, it is impossible to reconnect a Redis client.
Having a connection object allows to easily replace the buffers.

Resolves: denodrivers#83
  • Loading branch information
sebastienfilion committed Jun 24, 2020
1 parent dd1d035 commit abd0516
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 148 deletions.
1 change: 1 addition & 0 deletions command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,5 +535,6 @@ export type RedisCommands = {
): Promise<[BulkString, BulkString[]]>;

readonly isClosed: boolean;
readonly isConnected: boolean;
close(): void;
};
174 changes: 174 additions & 0 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts";
import { CommandExecutor, RedisRawReply, muxExecutor, sendCommand } from "./io.ts";

type Reader = Deno.Reader;
type Writer = Deno.Writer;
type Closer = Deno.Closer;

export type RedisConnectOptions = {
hostname?: string;
port?: number | string;
tls?: boolean;
db?: number;
password?: string;
name?: string;
maxRetryCount?: number;
};

export class RedisConnection {

name: string | null = null
closer: Closer | null = null
executor: CommandExecutor | null = null;
reader: Reader | null = null;
writer: Writer | null = null;

private _isConnected: boolean = false;

get isConnected(): boolean {
return this._isConnected;
}

private _isClosed: boolean = false;

get isClosed(): boolean {
return this._isClosed;
}

maxRetryCount = 0;
private retryCount = 0;

private connectThunkified: () => Promise<RedisConnection>;
private thunkifyConnect(
hostname: string,
port: string | number,
options: RedisConnectOptions
): () => Promise<RedisConnection> {

return async () => {
const dialOpts: Deno.ConnectOptions = {
hostname,
port: parsePortLike(port),
};
if (!Number.isSafeInteger(dialOpts.port)) {
throw new Error("deno-redis: opts.port is invalid");
}
const conn: Deno.Conn = options?.tls
? await Deno.connectTls(dialOpts)
: await Deno.connect(dialOpts);

if (options.name) this.name = options.name;
if (options.maxRetryCount) this.maxRetryCount = options.maxRetryCount;

this.closer = conn;
this.reader = new BufReader(conn);
this.writer = new BufWriter(conn);
this.executor = muxExecutor(this, this.maxRetryCount > 0);

this._isClosed = false;
this._isConnected = true;

if (options?.password) this.authenticate(options.password);
if (options?.db) this.selectDb(options.db);

return this as RedisConnection;
}
}

constructor(hostname: string, port: number | string, private options: RedisConnectOptions) {
this.connectThunkified = this.thunkifyConnect(hostname, port, options);
}

authenticate(password: string | undefined = this.options.password): Promise<RedisRawReply> {
if (!password) throw new Error("The password is undefined.");

const readerAsBuffer = this.reader as BufReader;
const writerAsBuffer = this.writer as BufWriter;

return sendCommand(writerAsBuffer, readerAsBuffer, "AUTH", password);
}

selectDb(databaseIndex: number | undefined = this.options.db): Promise<RedisRawReply> {
if (!databaseIndex) throw new Error("The database index is undefined.");

const readerAsBuffer = this.reader as BufReader;
const writerAsBuffer = this.writer as BufWriter;

return sendCommand(writerAsBuffer, readerAsBuffer, "SELECT", databaseIndex);
}

close() {
this._isClosed = true;
this._isConnected = false;
try {
this.closer!.close();
} catch (error) {
if (!(error instanceof Deno.errors.BadResource)) throw error;
}
}

/**
* Connect to Redis server
* @param opts redis server's url http/https url with port number
* Examples:
* const conn = connect({hostname: "127.0.0.1", port: 6379})// -> tcp, 127.0.0.1:6379
* const conn = connect({hostname: "redis.proxy", port: 443, tls: true}) // -> TLS, redis.proxy:443
*/
async connect(): Promise<RedisConnection> {

return this.connectThunkified();
}

async reconnect(): Promise<RedisConnection> {
const readerAsBuffer = this.reader as BufReader;
const writerAsBuffer = this.writer as BufWriter;
if (!readerAsBuffer.peek(1)) throw new Error("Client is closed.");

try {
await sendCommand(writerAsBuffer, readerAsBuffer, "PING");
this._isConnected = true;

return Promise.resolve(this);
} catch (error) {
this._isConnected = false;
return new Promise(
(resolve, reject) => {
const interval = setInterval(
async () => {
if (this.retryCount > this.maxRetryCount) {
await this.close();
reject(new Error("Could not reconnect"));
}

try {
await this.close();
await this.connect();

await sendCommand(this.writer as BufWriter, this.reader as BufReader, "PING");

this._isConnected = true;
clearInterval(interval);
resolve(this);
} catch (error) {} finally {
this.retryCount++;
}
},
1200
);
}
)
}
}
}

function parsePortLike(port: string | number | undefined): number {
if (typeof port === "string") {
return parseInt(port);
} else if (typeof port === "number") {
return port;
} else if (port === undefined) {
return 6379;
} else {
throw new Error("port is invalid: typeof=" + typeof port);
}
}
28 changes: 23 additions & 5 deletions io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
Deferred,
} from "./vendor/https/deno.land/std/async/mod.ts";
import { ConditionalArray, Bulk, Integer, Status, Raw } from "./command.ts";
import { RedisConnection } from "./connection.ts";

export type StatusReply = ["status", Status];
export type IntegerReply = ["integer", Integer];
Expand Down Expand Up @@ -175,8 +176,8 @@ function tryParseErrorReply(line: string): never {
}

export function muxExecutor(
r: BufReader,
w: BufWriter,
connection: RedisConnection,
attemptReconnect: boolean = false
): CommandExecutor {
let queue: {
command: string;
Expand All @@ -187,12 +188,29 @@ export function muxExecutor(
function dequeue(): void {
const [e] = queue;
if (!e) return;
sendCommand(w, r, e.command, ...e.args)
sendCommand(
connection.writer as BufWriter,
connection.reader as BufReader,
e.command,
...e.args
)
.then((v) => {
// console.log(e.command, e.args, v);
e.d.resolve(v);
})
.catch((err) => e.d.reject(err))
.catch(async (err) => {
if (
(
err instanceof Deno.errors.BadResource
|| err instanceof Deno.errors.BrokenPipe
|| err instanceof Deno.errors.ConnectionAborted
|| err instanceof Deno.errors.ConnectionRefused
|| err instanceof Deno.errors.ConnectionReset
)
&& attemptReconnect
)
await connection.reconnect();
else e.d.reject(err);
})
.finally(() => {
queue.shift();
dequeue();
Expand Down
16 changes: 7 additions & 9 deletions pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
BufReader,
BufWriter,
} from "./vendor/https/deno.land/std/io/bufio.ts";
import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts";
import {
createRequest,
readReply,
Expand All @@ -16,6 +13,7 @@ import {
Deferred,
} from "./vendor/https/deno.land/std/async/mod.ts";
import { RedisCommands } from "./command.ts";
import { RedisConnection } from "./connection.ts";

const encoder = new TextEncoder();
export type RawReplyOrError = RedisRawReply | ErrorReply;
Expand All @@ -25,8 +23,7 @@ export type RedisPipeline = {
} & RedisCommands;

export function createRedisPipeline(
writer: BufWriter,
reader: BufReader,
connection: RedisConnection,
opts?: { tx: true },
): RedisPipeline {
let commands: string[] = [];
Expand All @@ -48,13 +45,14 @@ export function createRedisPipeline(
}

async function send(cmds: string[]): Promise<RawReplyOrError[]> {
const writerAsBuffer = connection.writer! as BufWriter;
const msg = cmds.join("");
await writer.write(encoder.encode(msg));
await writer.flush();
await writerAsBuffer.write(encoder.encode(msg));
await writerAsBuffer.flush();
const ret: RawReplyOrError[] = [];
for (let i = 0; i < cmds.length; i++) {
try {
const rep = await readReply(reader);
const rep = await readReply(connection.reader! as BufReader);
ret.push(rep);
} catch (e) {
if (e instanceof ErrorReplyError) {
Expand Down
Loading

0 comments on commit abd0516

Please sign in to comment.