Skip to content

Commit

Permalink
refactor(pubsub): Use CommandExecutor#exec instead of sendCommand (#252)
Browse files Browse the repository at this point in the history
  • Loading branch information
uki00a authored Aug 29, 2021
1 parent 8d6e959 commit 5ce5b4b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 40 deletions.
57 changes: 19 additions & 38 deletions pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Connection } from "./connection.ts";
import type { CommandExecutor } from "./executor.ts";
import { InvalidStateError } from "./errors.ts";
import { readArrayReply, sendCommand } from "./protocol/mod.ts";
import { readArrayReply } from "./protocol/mod.ts";

type DefaultMessageType = string;
type ValidMessageType = string | string[];
Expand All @@ -27,71 +27,52 @@ class RedisSubscriptionImpl<
TMessage extends ValidMessageType = DefaultMessageType,
> implements RedisSubscription<TMessage> {
get isConnected(): boolean {
return this.connection.isConnected;
return this.executor.connection.isConnected;
}

get isClosed(): boolean {
return this.connection.isClosed;
return this.executor.connection.isClosed;
}

private channels = Object.create(null);
private patterns = Object.create(null);

constructor(private connection: Connection) {
constructor(private executor: CommandExecutor) {
// Force retriable connection for connection shared for pub/sub.
if (!connection.isRetriable) connection.forceRetry();
if (!executor.connection.isRetriable) executor.connection.forceRetry();
}

async psubscribe(...patterns: string[]) {
await sendCommand(
this.connection.writer!,
this.connection.reader!,
"PSUBSCRIBE",
...patterns,
);
await this.executor.exec("PSUBSCRIBE", ...patterns);
for (const pat of patterns) {
this.patterns[pat] = true;
}
}

async punsubscribe(...patterns: string[]) {
await sendCommand(
this.connection.writer!,
this.connection.reader!,
"PUNSUBSCRIBE",
...patterns,
);
await this.executor.exec("PUNSUBSCRIBE", ...patterns);
for (const pat of patterns) {
delete this.patterns[pat];
}
}

async subscribe(...channels: string[]) {
await sendCommand(
this.connection.writer!,
this.connection.reader!,
"SUBSCRIBE",
...channels,
);
await this.executor.exec("SUBSCRIBE", ...channels);
for (const chan of channels) {
this.channels[chan] = true;
}
}

async unsubscribe(...channels: string[]) {
await sendCommand(
this.connection.writer!,
this.connection.reader!,
"UNSUBSCRIBE",
...channels,
);
await this.executor.exec("UNSUBSCRIBE", ...channels);
for (const chan of channels) {
delete this.channels[chan];
}
}

async *receive(): AsyncIterableIterator<RedisPubSubMessage<TMessage>> {
let forceReconnect = false;
const connection = this.executor.connection;
while (this.isConnected) {
try {
let rep: [string, string, TMessage] | [
Expand All @@ -101,15 +82,15 @@ class RedisSubscriptionImpl<
TMessage,
];
try {
rep = (await readArrayReply(this.connection.reader)).value() as [
rep = (await readArrayReply(connection.reader)).value() as [
string,
string,
TMessage,
] | [string, string, string, TMessage];
} catch (err) {
if (err instanceof Deno.errors.BadResource) {
// Connection already closed.
this.connection.close();
connection.close();
break;
}
throw err;
Expand Down Expand Up @@ -137,7 +118,7 @@ class RedisSubscriptionImpl<
} else throw error;
} finally {
if ((!this.isClosed && !this.isConnected) || forceReconnect) {
await this.connection.reconnect();
await connection.reconnect();
forceReconnect = false;

if (Object.keys(this.channels).length > 0) {
Expand All @@ -156,29 +137,29 @@ class RedisSubscriptionImpl<
await this.unsubscribe(...Object.keys(this.channels));
await this.punsubscribe(...Object.keys(this.patterns));
} finally {
this.connection.close();
this.executor.connection.close();
}
}
}

export async function subscribe<
TMessage extends ValidMessageType = DefaultMessageType,
>(
connection: Connection,
executor: CommandExecutor,
...channels: string[]
): Promise<RedisSubscription<TMessage>> {
const sub = new RedisSubscriptionImpl<TMessage>(connection);
const sub = new RedisSubscriptionImpl<TMessage>(executor);
await sub.subscribe(...channels);
return sub;
}

export async function psubscribe<
TMessage extends ValidMessageType = DefaultMessageType,
>(
connection: Connection,
executor: CommandExecutor,
...patterns: string[]
): Promise<RedisSubscription<TMessage>> {
const sub = new RedisSubscriptionImpl<TMessage>(connection);
const sub = new RedisSubscriptionImpl<TMessage>(executor);
await sub.psubscribe(...patterns);
return sub;
}
4 changes: 2 additions & 2 deletions redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1146,13 +1146,13 @@ class RedisImpl implements Redis {
subscribe<TMessage extends string | string[] = string>(
...channels: string[]
) {
return subscribe<TMessage>(this.executor.connection, ...channels);
return subscribe<TMessage>(this.executor, ...channels);
}

psubscribe<TMessage extends string | string[] = string>(
...patterns: string[]
) {
return psubscribe<TMessage>(this.executor.connection, ...patterns);
return psubscribe<TMessage>(this.executor, ...patterns);
}

pubsubChannels(pattern?: string) {
Expand Down

0 comments on commit 5ce5b4b

Please sign in to comment.