Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve type-safety for events #476

Merged
merged 1 commit into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 56 additions & 97 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
import type {
ConnectionEventMap,
ConnectionEventType,
TypedEventTarget,
} from "./events.ts";
import {
kUnstableCreateProtocol,
kUnstablePipeline,
Expand All @@ -12,74 +17,6 @@ import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import { delay } from "./deps/std/async.ts";

type TypedEventTarget<EventMap extends object> = {
new (): IntermediateEventTarget<EventMap>;
};

interface IntermediateEventTarget<EventMap> extends EventTarget {
addEventListener<K extends keyof EventMap>(
type: K,
callback: (
event: EventMap[K] extends Event ? EventMap[K] : never,
) => EventMap[K] extends Event ? void : never,
options?: AddEventListenerOptions | boolean,
): void;

addEventListener(
type: string,
callback: EventListenerOrEventListenerObject | null,
options?: AddEventListenerOptions | boolean,
): void;

removeEventListener<K extends keyof EventMap>(
type: K,
callback: (
event: EventMap[K] extends Event ? EventMap[K] : never,
) => EventMap[K] extends Event ? void : never,
options?: EventListenerOptions | boolean,
): void;

removeEventListener(
type: string,
callback: EventListenerOrEventListenerObject | null,
options?: EventListenerOptions | boolean,
): void;
}

export type ConnectionEvent = Record<string, unknown>;

export type ConnectionErrorEvent = {
error: Error;
};

export type ConnectionReconnectingEvent = {
delay: number;
};

export type ConnectionEventMap = {
error: CustomEvent<ConnectionErrorEvent>;
connect: CustomEvent<ConnectionEvent>;
reconnecting: CustomEvent<ConnectionReconnectingEvent>;
ready: CustomEvent<ConnectionEvent>;
close: CustomEvent<ConnectionEvent>;
end: CustomEvent<ConnectionEvent>;
};

export type ConnectionEventTarget = TypedEventTarget<ConnectionEventMap>;

export type ConnectionEventType =
| "error"
| "connect"
| "reconnecting"
| "ready"
| "close"
| "end";

export type ConnectionEventArg<T extends ConnectionEventType> = T extends
"error" ? Error
: T extends "reconnecting" ? number
: undefined;

export interface SendCommandOptions {
/**
* When this option is set, simple or bulk string replies are returned as `Uint8Array` type.
Expand All @@ -96,7 +33,7 @@ export interface SendCommandOptions {
inline?: boolean;
}

export interface Connection extends EventTarget {
export interface Connection extends TypedEventTarget<ConnectionEventMap> {
name: string | null;
isClosed: boolean;
isConnected: boolean;
Expand Down Expand Up @@ -158,8 +95,8 @@ interface PendingCommand {
reject: (error: unknown) => void;
}

export class RedisConnection extends (EventTarget as ConnectionEventTarget)
implements Connection {
export class RedisConnection
implements Connection, TypedEventTarget<ConnectionEventMap> {
name: string | null = null;
private maxRetryCount = 10;

Expand All @@ -172,6 +109,7 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
private commandQueue: PendingCommand[] = [];
#conn!: Deno.Conn;
#protocol!: Protocol;
#eventTarget = new EventTarget();

get isClosed(): boolean {
return this._isClosed;
Expand All @@ -190,8 +128,6 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
port: number | string,
private options: RedisConnectionOptions,
) {
super();

this.hostname = hostname;
this.port = port;
if (options.name) {
Expand All @@ -216,10 +152,10 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
const authError = new AuthenticationError("Authentication failed", {
cause: error,
});
this.fireEvent("error", authError);
this.#dispatchEvent("error", { error: authError });
throw authError;
} else {
this.fireEvent("error", error as Error);
this.#dispatchEvent("error", { error });
throw error;
}
}
Expand Down Expand Up @@ -261,6 +197,37 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
return promise;
}

addEventListener<K extends keyof ConnectionEventMap>(
type: K,
callback: (event: CustomEvent<ConnectionEventMap[K]>) => void,
options?: AddEventListenerOptions | boolean,
): void {
return this.#eventTarget.addEventListener(
type,
callback as (event: Event) => void,
options,
);
}

removeEventListener<K extends keyof ConnectionEventMap>(
type: K,
callback: (event: CustomEvent<ConnectionEventMap[K]>) => void,
options?: EventListenerOptions | boolean,
): void {
return this.#eventTarget.removeEventListener(
type,
callback as (event: Event) => void,
options,
);
}

#dispatchEvent<K extends ConnectionEventType>(
type: K,
detail: ConnectionEventMap[K],
): boolean {
return this.#eventTarget.dispatchEvent(new CustomEvent(type, { detail }));
}

[kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply> {
return this.#protocol.readReply(returnsUint8Arrays);
}
Expand Down Expand Up @@ -301,7 +268,7 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)

this._isClosed = false;
this._isConnected = true;
this.fireEvent("connect", undefined);
this.#dispatchEvent("connect", undefined);

try {
if (this.options.password != null) {
Expand All @@ -315,24 +282,24 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
throw error;
}

this.fireEvent("ready", undefined);
this.#dispatchEvent("ready", undefined);

this.#enableHealthCheckIfNeeded();
} catch (error) {
if (error instanceof AuthenticationError) {
this.fireEvent("error", error);
this.fireEvent("end", undefined);
this.#dispatchEvent("error", { error });
this.#dispatchEvent("end", undefined);
throw (error.cause ?? error);
}

const backoff = this.backoff(retryCount);
retryCount++;
if (retryCount >= this.maxRetryCount) {
this.fireEvent("error", error as Error);
this.fireEvent("end", undefined);
this.#dispatchEvent("error", { error: error as Error });
this.#dispatchEvent("end", undefined);
throw error;
}
this.fireEvent("reconnecting", backoff);
this.#dispatchEvent("reconnecting", { delay: backoff });
await delay(backoff);
await this.#connect(retryCount);
}
Expand All @@ -351,15 +318,15 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
this.#conn!.close();
} catch (error) {
if (!(error instanceof Deno.errors.BadResource)) {
this.fireEvent("error", error as Error);
this.#dispatchEvent("error", { error: error as Error });
throw error;
}
} finally {
if (!isClosedAlready) {
this.fireEvent("close", undefined);
this.#dispatchEvent("close", undefined);

if (!canReconnect) {
this.fireEvent("end", undefined);
this.#dispatchEvent("end", undefined);
}
}
}
Expand All @@ -369,8 +336,8 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
try {
await this.sendCommand("PING");
this._isConnected = true;
} catch (error) { // TODO: Maybe we should log this error.
this.fireEvent("error", error as Error);
} catch (error) {
this.#dispatchEvent("error", { error });
this.#close(true);
await this.connect();
await this.sendCommand("PING");
Expand All @@ -389,7 +356,7 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
!isRetriableError(error) ||
this.isManuallyClosedByUser()
) {
this.fireEvent("error", error as Error);
this.#dispatchEvent("error", { error });
return command.reject(error);
}

Expand All @@ -406,7 +373,7 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)
}
}

this.fireEvent("error", error as Error);
this.#dispatchEvent("error", { error });
command.reject(error);
} finally {
this.commandQueue.shift();
Expand Down Expand Up @@ -442,14 +409,6 @@ export class RedisConnection extends (EventTarget as ConnectionEventTarget)

setTimeout(ping, healthCheckInterval);
}

private fireEvent<T extends ConnectionEventType>(
eventType: T,
eventArg: ConnectionEventArg<T>,
): boolean {
const event = new CustomEvent(eventType, { detail: eventArg });
return this.dispatchEvent(event);
}
}

class AuthenticationError extends Error {}
Expand Down
49 changes: 49 additions & 0 deletions events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export interface TypedEventTarget<TEventMap extends Record<string, unknown>>
extends
Omit<
EventTarget,
"addEventListener" | "removeEventListener" | "dispatchEvent"
> {
addEventListener<K extends keyof TEventMap>(
type: K,
callback: (
event: CustomEvent<TEventMap[K]>,
) => void,
options?: AddEventListenerOptions | boolean,
): void;

removeEventListener<K extends keyof TEventMap>(
type: K,
callback: (
event: CustomEvent<TEventMap[K]>,
) => void,
options?: EventListenerOptions | boolean,
): void;
}

export type ConnectionEvent = Record<string, unknown>;

export type ConnectionErrorEventDetails = {
error: unknown;
};

export type ConnectionReconnectingEventDetails = {
delay: number;
};

export type ConnectionEventMap = {
error: ConnectionErrorEventDetails;
connect: unknown;
reconnecting: ConnectionReconnectingEventDetails;
ready: unknown;
close: unknown;
end: unknown;
};

export type ConnectionEventType =
| "error"
| "connect"
| "reconnecting"
| "ready"
| "close"
| "end";
13 changes: 6 additions & 7 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ export type {
} from "./command.ts";
export type {
Connection,
ConnectionErrorEvent,
ConnectionEvent,
ConnectionEventArg,
ConnectionEventMap,
ConnectionEventTarget,
ConnectionEventType,
ConnectionReconnectingEvent,
RedisConnectionOptions,
SendCommandOptions,
} from "./connection.ts";
export type {
ConnectionErrorEventDetails,
ConnectionEvent,
ConnectionEventType,
ConnectionReconnectingEventDetails,
} from "./events.ts";
export type { CommandExecutor } from "./executor.ts";
export type { RedisPipeline } from "./pipeline.ts";
export type {
Expand Down
Loading
Loading