Skip to content

Commit

Permalink
feat: add experimental/web_streams_connection module (#405)
Browse files Browse the repository at this point in the history
`std/io` has been deprecated in [v0.203.0](https://github.com/denoland/deno_std/releases/tag/0.203.0). So, we need to migrate from `Deno.Reader`/`Deno.Writer` to Web Streams API in near future. In preparation for it, this commit experimentally adds support for RESP based on Web Streams API.
  • Loading branch information
uki00a authored Oct 26, 2023
1 parent d235d71 commit 07104b0
Show file tree
Hide file tree
Showing 54 changed files with 830 additions and 262 deletions.
3 changes: 2 additions & 1 deletion benchmark/benchmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { dirname, join } from "node:path";
export function run({
driver,
client,
outputFilename = driver,
}) {
const encoder = new TextEncoder();
return suite(
Expand Down Expand Up @@ -67,7 +68,7 @@ export function run({
await client.flushdb();
}),
save({
file: driver,
file: outputFilename,
details: true,
folder: join(
dirname(dirname(new URL(import.meta.url).pathname)),
Expand Down
33 changes: 25 additions & 8 deletions benchmark/deno-redis.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
import { run } from "./benchmark.js";
import { connect } from "../mod.ts";
import { connect as connectWebStreams } from "../experimental/web_streams_connection/mod.ts";

const redis = await connect({ hostname: "127.0.0.1" });
try {
await run({
client: redis,
driver: "deno-redis",
});
} finally {
await redis.quit();
{
const redis = await connect({ hostname: "127.0.0.1" });
try {
await run({
client: redis,
driver: "deno-redis",
});
} finally {
await redis.quit();
}
}

{
const redis = await connectWebStreams({ hostname: "127.0.0.1" });
try {
await run({
client: redis,
driver: "deno-redis (experimental/web_streams_connection)",
outputFilename:
"deno-redis-with-experimental-web-streams-based-connection",
});
} finally {
await redis.quit();
}
}
2 changes: 1 addition & 1 deletion command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type {
Raw,
RedisValue,
SimpleString,
} from "./protocol/mod.ts";
} from "./protocol/shared/types.ts";
import type { RedisPipeline } from "./pipeline.ts";
import type { RedisSubscription } from "./pubsub.ts";
import type {
Expand Down
48 changes: 23 additions & 25 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { readReply, sendCommand, sendCommands } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
import type { Command } from "./protocol/command.ts";
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
import { kUnstablePipeline, kUnstableReadReply } from "./internal/symbols.ts";
import { BufReader } from "./vendor/https/deno.land/std/io/buf_reader.ts";
import { BufWriter } from "./vendor/https/deno.land/std/io/buf_writer.ts";
import {
kUnstableCreateProtocol,
kUnstablePipeline,
kUnstableReadReply,
} from "./internal/symbols.ts";
import {
Deferred,
deferred,
} from "./vendor/https/deno.land/std/async/deferred.ts";
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";
type Closer = Deno.Closer;

export interface SendCommandOptions {
/**
Expand Down Expand Up @@ -61,6 +62,11 @@ export interface RedisConnectionOptions {
* When this option is set, a `PING` command is sent every specified number of seconds.
*/
healthCheckInterval?: number;

/**
* @private
*/
[kUnstableCreateProtocol]?: (conn: Deno.Conn) => Protocol;
}

export const kEmptyRedisArgs: Array<RedisValue> = [];
Expand All @@ -74,9 +80,6 @@ interface PendingCommand {

export class RedisConnection implements Connection {
name: string | null = null;
private reader!: BufReader;
private writer!: BufWriter;
private closer!: Closer;
private maxRetryCount = 10;

private readonly hostname: string;
Expand All @@ -86,6 +89,8 @@ export class RedisConnection implements Connection {
private backoff: Backoff;

private commandQueue: PendingCommand[] = [];
#conn!: Deno.Conn;
#protocol!: Protocol;

get isClosed(): boolean {
return this._isClosed;
Expand Down Expand Up @@ -160,11 +165,11 @@ export class RedisConnection implements Connection {
}

[kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply> {
return readReply(this.reader, returnsUint8Arrays);
return this.#protocol.readReply(returnsUint8Arrays);
}

[kUnstablePipeline](commands: Array<Command>) {
return sendCommands(this.writer, this.reader, commands);
return this.#protocol.pipeline(commands);
}

/**
Expand All @@ -184,9 +189,9 @@ export class RedisConnection implements Connection {
? await Deno.connectTls(dialOpts)
: await Deno.connect(dialOpts);

this.closer = conn;
this.reader = new BufReader(conn);
this.writer = new BufWriter(conn);
this.#conn = conn;
this.#protocol = this.options?.[kUnstableCreateProtocol]?.(conn) ??
new DenoStreamsProtocol(conn);
this._isClosed = false;
this._isConnected = true;

Expand Down Expand Up @@ -222,16 +227,13 @@ export class RedisConnection implements Connection {
this._isClosed = true;
this._isConnected = false;
try {
this.closer!.close();
this.#conn!.close();
} catch (error) {
if (!(error instanceof Deno.errors.BadResource)) throw error;
}
}

async reconnect(): Promise<void> {
if (!this.reader.peek(1)) {
throw new Error("Client is closed.");
}
try {
await this.sendCommand("PING");
this._isConnected = true;
Expand All @@ -247,9 +249,7 @@ export class RedisConnection implements Connection {
if (!command) return;

try {
const reply = await sendCommand(
this.writer,
this.reader,
const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
Expand All @@ -269,9 +269,7 @@ export class RedisConnection implements Connection {
try {
await this.connect();

const reply = await sendCommand(
this.writer,
this.reader,
const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
Expand Down
5 changes: 4 additions & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
"benchmark/ioredis.js"
]
},
"test": {
"exclude": ["benchmark/", "tmp/", "vendor/"]
},
"tasks": {
"test": "deno test --allow-net --allow-read=tests --allow-write=tests/tmp --allow-run=redis-server,redis-cli --coverage=coverage --trace-ops ./tests",
"test": "deno test --allow-net --allow-read=tests --allow-write=tests/tmp --allow-run=redis-server,redis-cli --coverage=coverage --trace-ops",
"test:doc": "deno test --doc --no-run --import-map=import_map.test.json",
"coverage": "deno coverage ./coverage --lcov --output=coverage/lcov.info",
"make_mod": "deno run --allow-read --allow-write --allow-run --check tools/make_mod.ts",
Expand Down
6 changes: 6 additions & 0 deletions errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ export class ConnectionClosedError extends Error {}
export class SubscriptionClosedError extends Error {}

export class ErrorReplyError extends Error {}
export class NotImplementedError extends Error {
constructor(message?: string) {
super(message ? `Not implemented: ${message}` : "Not implemented");
}
}

export class InvalidStateError extends Error {
constructor(message?: string) {
Expand All @@ -19,5 +24,6 @@ export function isRetriableError(error: Error): boolean {
error instanceof Deno.errors.ConnectionAborted ||
error instanceof Deno.errors.ConnectionRefused ||
error instanceof Deno.errors.ConnectionReset ||
error instanceof Deno.errors.UnexpectedEof ||
error instanceof EOFError);
}
2 changes: 1 addition & 1 deletion executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Connection, SendCommandOptions } from "./connection.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";

export interface CommandExecutor {
readonly connection: Connection;
Expand Down
2 changes: 1 addition & 1 deletion experimental/cluster/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import type { RedisConnectOptions } from "../../redis.ts";
import type { CommandExecutor } from "../../executor.ts";
import type { Connection, SendCommandOptions } from "../../connection.ts";
import type { Redis } from "../../redis.ts";
import type { RedisReply, RedisValue } from "../../protocol/mod.ts";
import type { RedisReply, RedisValue } from "../../protocol/shared/types.ts";
import { ErrorReplyError } from "../../errors.ts";
import { delay } from "../../vendor/https/deno.land/std/async/delay.ts";
import calculateSlot from "../../vendor/https/esm.sh/cluster-key-slot/lib/index.js";
Expand Down
15 changes: 15 additions & 0 deletions experimental/web_streams_connection/mod.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { kUnstableCreateProtocol } from "../../internal/symbols.ts";
import type { RedisConnectOptions } from "../../redis.ts";
import { connect as _connect } from "../../redis.ts";
import { Protocol } from "../../protocol/web_streams/mod.ts";

function createProtocol(conn: Deno.Conn) {
return new Protocol(conn);
}

export function connect(options: RedisConnectOptions) {
return _connect({
...options,
[kUnstableCreateProtocol]: createProtocol,
});
}
56 changes: 56 additions & 0 deletions internal/buffered_readable_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { concateBytes } from "./concate_bytes.ts";

const LF = "\n".charCodeAt(0);
/**
* Wraps `ReadableStream` to provide buffering. Heavily inspired by `deno_std/io/buf_reader.ts`.
*
* {@link https://github.com/denoland/deno_std/blob/0.204.0/io/buf_reader.ts}
*/
export class BufferedReadableStream {
#reader: ReadableStreamDefaultReader<Uint8Array>;
#buffer: Uint8Array;
constructor(readable: ReadableStream<Uint8Array>) {
// TODO: This class could probably be optimized with a BYOB reader.
this.#reader = readable.getReader();
this.#buffer = new Uint8Array(0);
}

async readLine(): Promise<Uint8Array> {
const i = this.#buffer.indexOf(LF);
if (i > -1) {
return this.#consume(i + 1);
}
for (;;) {
await this.#fill();
const i = this.#buffer.indexOf(LF);
if (i > -1) return this.#consume(i + 1);
}
}

async readFull(buffer: Uint8Array): Promise<void> {
if (buffer.length <= this.#buffer.length) {
buffer.set(this.#consume(buffer.length));
return;
}
for (;;) {
await this.#fill();
if (this.#buffer.length >= buffer.length) break;
}
return this.readFull(buffer);
}

#consume(n: number): Uint8Array {
const b = this.#buffer.subarray(0, n);
this.#buffer = this.#buffer.subarray(n);
return b;
}

async #fill() {
const chunk = await this.#reader.read();
if (chunk.done) {
throw new Deno.errors.BadResource();
}
const bytes = chunk.value;
this.#buffer = concateBytes(this.#buffer, bytes);
}
}
Loading

0 comments on commit 07104b0

Please sign in to comment.