From e5f918d9046f76896674dd39d241c4091c40efcc Mon Sep 17 00:00:00 2001 From: Sebastien Filion Date: Tue, 23 Jun 2020 13:01:17 -0400 Subject: [PATCH] Implement retriable connection At the moment, a client will die if the server is killed or the connection is lost. This is a problem especially with clients subscribed to a channel. Since the Writer/Reader buffers are passed around to I/O functions, it is impossible to reconnect a Redis client. Having a connection object allows to easily replace the buffers. Resolves: https://github.com/denolib/deno-redis/issues/83 --- README.md | 22 ++++++ command.ts | 1 + connection.ts | 181 +++++++++++++++++++++++++++++++++++++++++++++++++ io.ts | 30 ++++++-- pipeline.ts | 16 ++--- pubsub.ts | 107 ++++++++++++++++++----------- pubsub_test.ts | 121 +++++++++++++++++++++++++++------ redis.ts | 126 ++++++++++++++++------------------ 8 files changed, 463 insertions(+), 141 deletions(-) create mode 100644 connection.ts diff --git a/README.md b/README.md index 29dd247b..73ad65c7 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,28 @@ const sub = await redis.subscribe("channel"); ## Advanced Usage +### Retriable connection + +By default, a client's connection will throw an error if the server dies or the network becomes unavailable. +A connection can be made "retriable" by setting the value `maxRetryCount` when connecting a new client. + +```ts +const redis = await connect({ ...options, maxRetryCount: 10 }); + +// The client will try to connect to the server 10 times if the server dies or the network becomes unavailable. +``` + +The property is set automatically to `10` when creating a subscriber client. +After a reconnection succeeds, the client will subscribe again to all the channels and patterns. + +```ts +const redis = await connect(options); +const subscriberClient = await redis.subscribe("channel"); + +// The client's connection will now be forced to try to connect to the server 10 times if the server dies or the network +// becomes unavailable. +``` + ### Execute raw commands `redis.executor` is raw level [redis protocol](https://redis.io/topics/protocol) executor. diff --git a/command.ts b/command.ts index 72a37991..f7cdffa8 100644 --- a/command.ts +++ b/command.ts @@ -535,5 +535,6 @@ export type RedisCommands = { ): Promise<[BulkString, BulkString[]]>; readonly isClosed: boolean; + readonly isConnected: boolean; close(): void; }; diff --git a/connection.ts b/connection.ts new file mode 100644 index 00000000..e2d5d9da --- /dev/null +++ b/connection.ts @@ -0,0 +1,181 @@ +import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts"; +import { CommandExecutor, CommandFunc, RedisRawReply, muxExecutor, sendCommand } from "./io.ts"; + +// type Reader = Deno.Reader; +// type Writer = Deno.Writer; +type Closer = Deno.Closer; + +export type RedisConnectOptions = { + hostname?: string; + port?: number | string; + tls?: boolean; + db?: number; + password?: string; + name?: string; + maxRetryCount?: number; +}; + +export class RedisConnection { + + name: string | null = null; + closer: Closer | null = null; + reader: BufReader | null = null; + writer: BufWriter | null = null; + + executor: CommandExecutor | null = null; + + get exec(): CommandFunc { + + return this.executor!.exec; + } + + private _isConnected: boolean = false; + + get isConnected(): boolean { + return this._isConnected; + } + + private _isClosed: boolean = false; + + get isClosed(): boolean { + return this._isClosed; + } + + maxRetryCount = 0; + private retryCount = 0; + + private connectThunkified: () => Promise; + private thunkifyConnect( + hostname: string, + port: string | number, + options: RedisConnectOptions + ): () => Promise { + + return async () => { + const dialOpts: Deno.ConnectOptions = { + hostname, + port: parsePortLike(port), + }; + if (!Number.isSafeInteger(dialOpts.port)) { + throw new Error("deno-redis: opts.port is invalid"); + } + const conn: Deno.Conn = options?.tls + ? await Deno.connectTls(dialOpts) + : await Deno.connect(dialOpts); + + if (options.name) this.name = options.name; + if (options.maxRetryCount) this.maxRetryCount = options.maxRetryCount; + + this.closer = conn; + this.reader = new BufReader(conn); + this.writer = new BufWriter(conn); + this.executor = muxExecutor(this, this.maxRetryCount > 0); + + this._isClosed = false; + this._isConnected = true; + + if (options?.password) this.authenticate(options.password); + if (options?.db) this.selectDb(options.db); + + return this as RedisConnection; + } + } + + constructor(hostname: string, port: number | string, private options: RedisConnectOptions) { + this.connectThunkified = this.thunkifyConnect(hostname, port, options); + } + + authenticate(password: string | undefined = this.options.password): Promise { + if (!password) throw new Error("The password is undefined."); + + const readerAsBuffer = this.reader as BufReader; + const writerAsBuffer = this.writer as BufWriter; + + return sendCommand(writerAsBuffer, readerAsBuffer, "AUTH", password); + } + + selectDb(databaseIndex: number | undefined = this.options.db): Promise { + if (!databaseIndex) throw new Error("The database index is undefined."); + + const readerAsBuffer = this.reader as BufReader; + const writerAsBuffer = this.writer as BufWriter; + + return sendCommand(writerAsBuffer, readerAsBuffer, "SELECT", databaseIndex); + } + + close() { + this._isClosed = true; + this._isConnected = false; + try { + this.closer!.close(); + } catch (error) { + if (!(error instanceof Deno.errors.BadResource)) throw error; + } + } + + /** + * Connect to Redis server + * @param opts redis server's url http/https url with port number + * Examples: + * const conn = connect({hostname: "127.0.0.1", port: 6379})// -> tcp, 127.0.0.1:6379 + * const conn = connect({hostname: "redis.proxy", port: 443, tls: true}) // -> TLS, redis.proxy:443 + */ + async connect(): Promise { + + return this.connectThunkified(); + } + + async reconnect(): Promise { + const readerAsBuffer = this.reader as BufReader; + const writerAsBuffer = this.writer as BufWriter; + if (!readerAsBuffer.peek(1)) throw new Error("Client is closed."); + + try { + await sendCommand(writerAsBuffer, readerAsBuffer, "PING"); + this._isConnected = true; + + return Promise.resolve(this); + } catch (error) { + this._isConnected = false; + return new Promise( + (resolve, reject) => { + const interval = setInterval( + async () => { + if (this.retryCount > this.maxRetryCount) { + await this.close(); + reject(new Error("Could not reconnect")); + } + + try { + await this.close(); + await this.connect(); + + await sendCommand(this.writer as BufWriter, this.reader as BufReader, "PING"); + + this._isConnected = true; + this.retryCount = 0; + clearInterval(interval); + resolve(this); + } catch (error) {} finally { + this.retryCount++; + } + }, + 1200 + ); + } + ) + } + } +} + +function parsePortLike(port: string | number | undefined): number { + if (typeof port === "string") { + return parseInt(port); + } else if (typeof port === "number") { + return port; + } else if (port === undefined) { + return 6379; + } else { + throw new Error("port is invalid: typeof=" + typeof port); + } +} \ No newline at end of file diff --git a/io.ts b/io.ts index f8ff0323..b1d5ad99 100644 --- a/io.ts +++ b/io.ts @@ -8,6 +8,7 @@ import { Deferred, } from "./vendor/https/deno.land/std/async/mod.ts"; import { ConditionalArray, Bulk, Integer, Status, Raw } from "./command.ts"; +import { RedisConnection } from "./connection.ts"; export type StatusReply = ["status", Status]; export type IntegerReply = ["integer", Integer]; @@ -175,8 +176,8 @@ function tryParseErrorReply(line: string): never { } export function muxExecutor( - r: BufReader, - w: BufWriter, + connection: RedisConnection, + attemptReconnect: boolean = false ): CommandExecutor { let queue: { command: string; @@ -187,12 +188,31 @@ export function muxExecutor( function dequeue(): void { const [e] = queue; if (!e) return; - sendCommand(w, r, e.command, ...e.args) + sendCommand( + connection.writer as BufWriter, + connection.reader as BufReader, + e.command, + ...e.args + ) .then((v) => { - // console.log(e.command, e.args, v); e.d.resolve(v); }) - .catch((err) => e.d.reject(err)) + .catch(async (err) => { + if ( + ( + // Error `BadResource` is thrown when an attempt is made to write to a closed connection, + // Make sure that the connection wasn't explicitly closed by the user before trying to reconnect. + (err instanceof Deno.errors.BadResource && !connection.isClosed) + || err instanceof Deno.errors.BrokenPipe + || err instanceof Deno.errors.ConnectionAborted + || err instanceof Deno.errors.ConnectionRefused + || err instanceof Deno.errors.ConnectionReset + ) + && attemptReconnect + ) + await connection.reconnect(); + else e.d.reject(err); + }) .finally(() => { queue.shift(); dequeue(); diff --git a/pipeline.ts b/pipeline.ts index 5829dfcb..15666056 100644 --- a/pipeline.ts +++ b/pipeline.ts @@ -1,7 +1,4 @@ -import { - BufReader, - BufWriter, -} from "./vendor/https/deno.land/std/io/bufio.ts"; +import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts"; import { createRequest, readReply, @@ -16,6 +13,7 @@ import { Deferred, } from "./vendor/https/deno.land/std/async/mod.ts"; import { RedisCommands } from "./command.ts"; +import { RedisConnection } from "./connection.ts"; const encoder = new TextEncoder(); export type RawReplyOrError = RedisRawReply | ErrorReply; @@ -25,8 +23,7 @@ export type RedisPipeline = { } & RedisCommands; export function createRedisPipeline( - writer: BufWriter, - reader: BufReader, + connection: RedisConnection, opts?: { tx: true }, ): RedisPipeline { let commands: string[] = []; @@ -48,13 +45,14 @@ export function createRedisPipeline( } async function send(cmds: string[]): Promise { + const writerAsBuffer = connection.writer! as BufWriter; const msg = cmds.join(""); - await writer.write(encoder.encode(msg)); - await writer.flush(); + await writerAsBuffer.write(encoder.encode(msg)); + await writerAsBuffer.flush(); const ret: RawReplyOrError[] = []; for (let i = 0; i < cmds.length; i++) { try { - const rep = await readReply(reader); + const rep = await readReply(connection.reader! as BufReader); ret.push(rep); } catch (e) { if (e instanceof ErrorReplyError) { diff --git a/pubsub.ts b/pubsub.ts index 1ab3d106..2dd2a6de 100644 --- a/pubsub.ts +++ b/pubsub.ts @@ -1,7 +1,5 @@ -import { - BufReader, - BufWriter, -} from "./vendor/https/deno.land/std/io/bufio.ts"; +import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts"; +import { RedisConnection } from "./connection.ts"; import { readArrayReply, sendCommand } from "./io.ts"; export type RedisSubscription = { @@ -21,68 +19,101 @@ export type RedisPubSubMessage = { }; class RedisSubscriptionImpl implements RedisSubscription { - private _isClosed = false; + + get isConnected(): boolean { + return this.connection.isConnected; + } + get isClosed(): boolean { - return this._isClosed; + return this.connection.isClosed; } private channels = Object.create(null); private patterns = Object.create(null); - constructor(private writer: BufWriter, private reader: BufReader) {} + constructor(private connection: RedisConnection) { + // Force retriable connection for connection shared for pub/sub. + if (connection.maxRetryCount === 0) connection.maxRetryCount = 10; + } async psubscribe(...patterns: string[]) { - await sendCommand(this.writer, this.reader, "PSUBSCRIBE", ...patterns); + await sendCommand( + this.connection.writer! as BufWriter, + this.connection.reader! as BufReader, + "PSUBSCRIBE", + ...patterns + ); for (const pat of patterns) { this.patterns[pat] = true; } } async punsubscribe(...patterns: string[]) { - await sendCommand(this.writer, this.reader, "PUNSUBSCRIBE", ...patterns); + await sendCommand( + this.connection.writer! as BufWriter, + this.connection.reader! as BufReader, + "PUNSUBSCRIBE", + ...patterns + ); for (const pat of patterns) { delete this.patterns[pat]; } } async subscribe(...channels: string[]) { - await sendCommand(this.writer, this.reader, "SUBSCRIBE", ...channels); + await sendCommand( + this.connection.writer! as BufWriter, + this.connection.reader! as BufReader, + "SUBSCRIBE", + ...channels + ); for (const chan of channels) { this.channels[chan] = true; } } async unsubscribe(...channels: string[]) { - await sendCommand(this.writer, this.reader, "UNSUBSCRIBE", ...channels); + await sendCommand( + this.connection.writer! as BufWriter, + this.connection.reader! as BufReader, + "UNSUBSCRIBE", + ...channels + ); for (const chan of channels) { delete this.channels[chan]; } } async *receive(): AsyncIterableIterator { - while (!this._isClosed) { - let rep: string[]; + let forceReconnect = false; + while (this.isConnected) { try { - rep = (await readArrayReply(this.reader)) as string[]; - } catch (err) { - if (err instanceof Deno.errors.BadResource) { // Connection already closed. - this._isClosed = true; - break; + const rep = (await readArrayReply(this.connection.reader as BufReader)) as string[]; + const ev = rep[0]; + + if (ev === "message" && rep.length === 3) { + yield { + channel: rep[1], + message: rep[2], + }; + } else if (ev === "pmessage" && rep.length === 4) { + yield { + pattern: rep[1], + channel: rep[2], + message: rep[3], + }; + } + } catch (error) { + if (error.message === "Invalid state" || error instanceof Deno.errors.BadResource) forceReconnect = true; + else throw error; + } finally { + if ((!this.isClosed && !this.isConnected) || forceReconnect) { + await this.connection.reconnect(); + forceReconnect = false; + + if (Object.keys(this.channels).length > 0) await this.subscribe(...Object.keys(this.channels)); + if (Object.keys(this.patterns).length > 0) await this.psubscribe(...Object.keys(this.patterns)); } - throw err; - } - const ev = rep[0]; - if (ev === "message" && rep.length === 3) { - yield { - channel: rep[1], - message: rep[2], - }; - } else if (ev === "pmessage" && rep.length === 4) { - yield { - pattern: rep[1], - channel: rep[2], - message: rep[3], - }; } } } @@ -92,27 +123,25 @@ class RedisSubscriptionImpl implements RedisSubscription { await this.unsubscribe(...Object.keys(this.channels)); await this.punsubscribe(...Object.keys(this.patterns)); } finally { - this._isClosed = true; + this.connection.close(); } } } export async function subscribe( - writer: BufWriter, - reader: BufReader, + connection: RedisConnection, ...channels: string[] ): Promise { - const sub = new RedisSubscriptionImpl(writer, reader); + const sub = new RedisSubscriptionImpl(connection); await sub.subscribe(...channels); return sub; } export async function psubscribe( - writer: BufWriter, - reader: BufReader, + connection: RedisConnection, ...patterns: string[] ): Promise { - const sub = new RedisSubscriptionImpl(writer, reader); + const sub = new RedisSubscriptionImpl(connection); await sub.psubscribe(...patterns); return sub; } diff --git a/pubsub_test.ts b/pubsub_test.ts index c3b42073..30999aff 100644 --- a/pubsub_test.ts +++ b/pubsub_test.ts @@ -1,7 +1,6 @@ -import { - assertEquals, - assert, -} from "./vendor/https/deno.land/std/testing/asserts.ts"; +import { assert, assertEquals, assertThrowsAsync } from "./vendor/https/deno.land/std/testing/asserts.ts"; +import { delay } from "./vendor/https/deno.land/std/async/mod.ts"; + import { connect } from "./redis.ts"; const { test } = Deno; const addr = { @@ -39,10 +38,14 @@ test({ message: "wayway", }); await sub.close(); - const a = await redis.get("aaa"); - assertEquals(a, undefined); - pub.close(); - redis.close(); + + assertEquals(sub.isClosed, true); + assertEquals(redis.isClosed, true); + + await pub.close(); + await assertThrowsAsync(async () => { + await redis.get("aaa"); + }, Deno.errors.BadResource); }, }); @@ -79,17 +82,95 @@ test({ }); test({ - name: - "testSubscriptionShouldNotThrowBadResourceErrorWhenConnectionIsClosed (#89)", - async fn() { - const redis = await connect(addr); - const sub = await redis.subscribe("test"); - const subscriptionPromise = (async () => { - // deno-lint-ignore no-empty - for await (const _ of sub.receive()) {} - })(); - redis.close(); - await subscriptionPromise; - assert(sub.isClosed); + name: "testSubscribe4 (#83)", + // sanitizeResources: false, + // sanitizeOps: false, + async fn(): Promise { + let parallelPromiseList: Promise[] = []; + + const throwawayRedisServerPort = 6464; + let promiseList; + let throwawayRedisServerChildProcess = createThrowawayRedisServer(throwawayRedisServerPort); + + await delay(500); + + const redisClient = await connect({ ...addr, name: "Main", port: throwawayRedisServerPort }); + const publisherRedisClient = await connect( + { ...addr, maxRetryCount: 10, name: "Publisher", port: throwawayRedisServerPort } + ); + const subscriberRedisClient = await redisClient.psubscribe("ps*"); + + const messageIterator = subscriberRedisClient.receive(); + + const interval = setInterval( + () => { + try { + parallelPromiseList.push(publisherRedisClient.publish("psub", "wayway")); + } catch (error) {} + }, + 900 + ); + + setTimeout( + async () => { + // console.debug("Kill the Redis server..."); + throwawayRedisServerChildProcess.close(); + + // console.debug(" ...done!"); + }, + 1000 + ) + + setTimeout(async () => { + assertEquals(redisClient.isConnected, false, "The main client still thinks it is connected."); + assertEquals(publisherRedisClient.isConnected, false, "The publisher client still thinks it is connected."); + assert(parallelPromiseList.length < 5, "Too many messages were published."); + + // console.debug("Reconnect Redis client..."); + + throwawayRedisServerChildProcess = createThrowawayRedisServer(throwawayRedisServerPort); + + await delay(500); + const temporaryRedisClient = await connect({ ...addr, port: throwawayRedisServerPort }); + await temporaryRedisClient.ping(); + temporaryRedisClient.close(); + + // console.debug(" ...done!"); + + await delay(1000); + + assert(redisClient.isConnected, "The main client is not connected."); + assert(publisherRedisClient.isConnected, "The publisher client is not connected."); + }, 2000); + + promiseList = Promise.all([ + messageIterator.next(), + messageIterator.next(), + messageIterator.next(), + messageIterator.next(), + messageIterator.next() + ]); + + await promiseList; + + // console.debug("All done... Cleaning up."); + + clearInterval(interval); + + throwawayRedisServerChildProcess.close(); + publisherRedisClient.close(); + redisClient.close(); }, }); + +function createThrowawayRedisServer (port: number) { + + return Deno.run( + { + cmd: [ "redis-server", "--port", port.toString() ], + stdin: "null", + stdout: "null" + } + ); +} + diff --git a/redis.ts b/redis.ts index 6591bb48..8eb6cd27 100644 --- a/redis.ts +++ b/redis.ts @@ -1,17 +1,11 @@ type Reader = Deno.Reader; type Writer = Deno.Writer; type Closer = Deno.Closer; -import { - BufReader, - BufWriter, -} from "./vendor/https/deno.land/std/io/bufio.ts"; -import { psubscribe, RedisSubscription, subscribe } from "./pubsub.ts"; -import { - muxExecutor, - CommandExecutor, - RedisRawReply, -} from "./io.ts"; -import { createRedisPipeline, RedisPipeline } from "./pipeline.ts"; + +import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts"; +import { psubscribe, subscribe } from "./pubsub.ts"; +import { CommandExecutor, CommandFunc, RedisRawReply } from "./io.ts"; +import { createRedisPipeline } from "./pipeline.ts"; import { RedisCommands, Status, @@ -22,30 +16,40 @@ import { Raw, BulkNil, } from "./command.ts"; +import { RedisConnection } from "./connection.ts"; export type Redis = RedisCommands & { executor: CommandExecutor; }; class RedisImpl implements RedisCommands { - _isClosed = false; + get isClosed() { - return this._isClosed; + return this.connection?.isClosed; + } + + get isConnected() { + return this.connection?.isConnected; + } + + get executor() { + return this.connection?.executor!; } constructor( - private closer: Closer, - private writer: BufWriter, - private reader: BufReader, - readonly executor: CommandExecutor, + private connection: RedisConnection, ) { } + close() { + return this.connection?.close(); + } + async execStatusReply( command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection?.executor!.exec(command, ...args); return reply as Status; } @@ -53,7 +57,7 @@ class RedisImpl implements RedisCommands { command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection!.exec(command, ...args); return reply as number; } @@ -61,7 +65,7 @@ class RedisImpl implements RedisCommands { command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection!.exec(command, ...args); return reply as T; } @@ -69,7 +73,7 @@ class RedisImpl implements RedisCommands { command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection!.exec(command, ...args); return reply as T[]; } @@ -77,7 +81,7 @@ class RedisImpl implements RedisCommands { command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection!.exec(command, ...args); return reply as Integer | BulkNil; } @@ -85,7 +89,7 @@ class RedisImpl implements RedisCommands { command: string, ...args: (string | number)[] ): Promise { - const [_, reply] = await this.executor.exec(command, ...args); + const [_, reply] = await this.connection!.exec(command, ...args); return reply as Status | BulkNil; } @@ -374,7 +378,7 @@ class RedisImpl implements RedisCommands { } else { _args.push(...args); } - const [_, raw] = await this.executor.exec(cmd, ..._args); + const [_, raw] = await this.connection!.exec(cmd, ..._args); return raw; } @@ -820,11 +824,11 @@ class RedisImpl implements RedisCommands { } subscribe(...channels: string[]) { - return subscribe(this.writer, this.reader, ...channels); + return subscribe(this.connection, ...channels); } psubscribe(...patterns: string[]) { - return psubscribe(this.writer, this.reader, ...patterns); + return psubscribe(this.connection, ...patterns); } pubsub_channels(pattern: string) { @@ -857,7 +861,7 @@ class RedisImpl implements RedisCommands { try { return this.execStatusReply("QUIT"); } finally { - this._isClosed = true; + this.connection?.close() } } @@ -1047,7 +1051,7 @@ class RedisImpl implements RedisCommands { } slowlog(subcommand: string, ...argument: string[]) { - return this.executor.exec("SLOWLOG", subcommand, ...argument); + return this.connection!.exec("SLOWLOG", subcommand, ...argument); } smembers(key: string) { @@ -1519,26 +1523,23 @@ class RedisImpl implements RedisCommands { // pipeline tx() { - return createRedisPipeline(this.writer, this.reader, { tx: true }); + return createRedisPipeline(this.connection, { tx: true }); } pipeline() { - return createRedisPipeline(this.writer, this.reader); + return createRedisPipeline(this.connection); } - // Stream - - close() { - this.closer.close(); - } } -export type RedisConnectOptions = { +export type RedisConnectOptions = { hostname: string; port?: number | string; tls?: boolean; db?: number; password?: string; + name?: string; + maxRetryCount?: number; }; function parsePortLike(port: string | number | undefined): number { @@ -1562,38 +1563,18 @@ function parsePortLike(port: string | number | undefined): number { */ export async function connect({ hostname, - port, + port = 6379, tls, db, password, + name, + maxRetryCount }: RedisConnectOptions): Promise { - const dialOpts: Deno.ConnectOptions = { - hostname, - port: parsePortLike(port), - }; - if (!Number.isSafeInteger(dialOpts.port)) { - throw new Error("deno-redis: opts.port is invalid"); - } - const conn: Deno.Conn = tls - ? await Deno.connectTls(dialOpts) - : await Deno.connect(dialOpts); - - const bufr = new BufReader(conn); - const bufw = new BufWriter(conn); - const exec = muxExecutor(bufr, bufw); - const client = await create(conn, conn, conn, exec); - if (password != null) { - try { - await client.auth(password); - } catch (err) { - client.close(); - throw err; - } - } - if (db) { - await client.select(db); - } - return client; + const connection = new RedisConnection(hostname, port, { tls, db, maxRetryCount, name, password }); + + await connection.connect(); + + return new RedisImpl(connection); } export function create( @@ -1602,10 +1583,19 @@ export function create( reader: Reader, executor: CommandExecutor, ): Redis { - return new RedisImpl( + return new RedisImpl({ closer, - new BufWriter(writer), - new BufReader(reader), executor, - ); + reader, + writer, + get exec(): CommandFunc { + return executor.exec; + }, + get isConnected(): boolean { + return true; + }, + get isClosed(): boolean { + return false; + } + } as RedisConnection); }