Skip to content

Commit

Permalink
Fix Redis <=6.2 throwing errors when trying to clear the command queu…
Browse files Browse the repository at this point in the history
…e in pooling mode. (#1763)

* Try to unify queue implementations.

* Switch pool client and connection pool to command reader

* Document version requirement.

* changelog

* A newline

* Move types

* Tidy up version check
  • Loading branch information
Half-Shot authored Aug 2, 2023
1 parent c557cbe commit 8953058
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 106 deletions.
2 changes: 2 additions & 0 deletions changelog.d/1763.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix Redis <=6.2 failing to clear the command queue in pooling mode.

4 changes: 3 additions & 1 deletion docs/connection_pooling.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ The IRC bridge can be configured to run it's IRC connections through a seperate
allowing you to restart and update (in most cases) the main process while keeping connections alive. This in
effect allows you to have a bridge that *appears* to not restart (sometimes nicknamed eternal bridges).

To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance.
To configure the bridge in this mode you will need to setup a [Redis](https://redis.io/) instance. Ideally, you
**should** run the bridge with Redis `6.2.0` or greater as it is more efficent when used with streams. The bridge
requires Redis `5.0.0` or greater to run.

In your bridge, configure the following:

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"react": "^18.2.0",
"react-dom": "^18.2.0",
"sanitize-html": "^2.7.2",
"semver": "^7.5.4",
"typed-emitter": "^2.1.0",
"typescript": "^5.0.4",
"url-join": "^5.0.0",
Expand All @@ -77,6 +78,7 @@
"@types/react": "^18.0.26",
"@types/react-dom": "^18.0.9",
"@types/sanitize-html": "^2.6.2",
"@types/semver": "^7.5.0",
"@typescript-eslint/eslint-plugin": "^5.38.0",
"@typescript-eslint/parser": "^5.38.0",
"@vitejs/plugin-react": "^3.0.1",
Expand Down
130 changes: 130 additions & 0 deletions src/pool-service/CommandReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { Redis } from "ioredis";
import semver from "semver";
import { Logger } from "matrix-appservice-bridge";

const TRIM_EVERY_MS = 30000;
const COMMAND_BLOCK_TIMEOUT = 10000;
const TRIM_MAXLEN_COUNT = 100_000;

const log = new Logger('RedisCommandReader');

export class RedisCommandReader {
private shouldRun = true;
private commandStreamId = "$"
private supportsMinId = false;
private trimInterval?: NodeJS.Timer;

constructor(
private readonly redis: Redis,
private readonly streamName: string,
private readonly onCommand: (cmdType: string, cmdPayload: string) => Promise<void>) {

}

private updateLastRead(lastRead: string) {
this.commandStreamId = lastRead;
}

public stop() {
this.shouldRun = false;
clearInterval(this.trimInterval);
}

public async readQueue() {
const newCmds = await this.redis.xread(
"BLOCK", COMMAND_BLOCK_TIMEOUT, "STREAMS", this.streamName, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmds === null) {
// This means we've waited for some time and seen no new commands, to be safe revert to the HEAD of the queue.
log.info(`Stream has been idle for ${COMMAND_BLOCK_TIMEOUT}ms, listening for messages at $`);
this.commandStreamId = '$';
return;
}
// This is a list of keys, containing a list of commands, hence needing to deeply extract the values.
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
// If we crash, we don't want to get stuck on this msg.
this.updateLastRead(msgId);
setImmediate(
() => this.onCommand(cmdType, payload)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${cmdType}, ${payload})`, ex)
),
);
}

}

public async getSupported() {
let options: Map<string, string>;
try {
// Fetch the "Server" info block and parse out the various lines.
const serverLines = (
await this.redis.info("Server")
).split('\n').filter(v => !v.startsWith('#')).map(v => v.split(':', 2)) as [string, string][];
options = new Map(serverLines);
}
catch (ex) {
log.error("Failed to fetch server info from Redis", ex);
// Treat it as if we got zero useful options back.
options = new Map();
}
const version = options.get('redis_version');
if (!version) {
log.warn(`Unable to identify Redis version, assuming unsupported version.`);
this.supportsMinId = false;
return;
}
// We did get a server version back but we know it's unsupported.
if (semver.lt(version, '5.0.0')) {
throw new Error('Redis version is unsupported. The minimum required version is 5.0.0');
}
this.supportsMinId = !!semver.satisfies(version, '>=6.2');
}

private async trimCommandStream() {
if (this.commandStreamId === '$') {
// At the head of the queue, don't trim.
return;
}
try {
let trimCount;
if (this.supportsMinId) {
trimCount = await this.redis.xtrim(
this.streamName, "MINID", this.commandStreamId
);
}
else {
// If Redis doesn't support minid (requires >=6.2), we can fallback to
// trimming a large amount of messages instead.
trimCount = await this.redis.xtrim(
this.streamName, "MAXLEN", TRIM_MAXLEN_COUNT
);
}
log.debug(`Trimmed ${trimCount} commands from the stream`);
}
catch (ex) {
log.warn(`Failed to trim commands from the stream`, ex);
}
}

public async start() {
await this.getSupported();
this.trimInterval = setInterval(this.trimCommandStream.bind(this), TRIM_EVERY_MS);
log.info(`Listening for new commands`);
let loopCommandCheck: () => void;
// eslint-disable-next-line prefer-const
loopCommandCheck = () => {
if (!this.shouldRun) {
log.info(`Finished`);
return;
}
this.readQueue().finally(() => {
return loopCommandCheck();
});
}

loopCommandCheck();
}
}
47 changes: 14 additions & 33 deletions src/pool-service/IrcConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { OutCommandType,
import { parseMessage } from 'matrix-org-irc';
import { collectDefaultMetrics, register, Gauge } from 'prom-client';
import { createServer, Server } from 'http';
import { RedisCommandReader } from './CommandReader';

collectDefaultMetrics();

Expand Down Expand Up @@ -52,6 +53,7 @@ export class IrcConnectionPool {
private metricsServer?: Server;
private shouldRun = true;
private heartbeatTimer?: NodeJS.Timer;
private readonly commandReader: RedisCommandReader;

constructor(private readonly config: typeof Config) {
this.shouldRun = false;
Expand All @@ -60,10 +62,9 @@ export class IrcConnectionPool {
this.cmdWriter.on('connecting', () => {
log.debug('Connecting to', config.redisUri);
});
}

private updateLastRead(lastRead: string) {
this.commandStreamId = lastRead;
this.commandReader = new RedisCommandReader(
this.cmdReader, REDIS_IRC_POOL_COMMAND_IN_STREAM, this.handleStreamCommand.bind(this)
);
}

private async sendCommandOut<T extends OutCommandType>(type: T, payload: OutCommandPayload[T]) {
Expand Down Expand Up @@ -307,6 +308,12 @@ export class IrcConnectionPool {
}
}

private async handleStreamCommand(cmdType: string, payload: string) {
const commandType = cmdType as InCommandType;
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
return this.handleCommand(commandType, commandData);
}

public async handleInternalPing({ info }: IrcConnectionPoolCommandIn<InCommandType.ConnectionPing>) {
const { clientId } = info;
const conn = this.connections.get(clientId);
Expand Down Expand Up @@ -337,6 +344,7 @@ export class IrcConnectionPool {
return;
}
try {
log.debug(`Trimming up to ${this.commandStreamId}`);
const trimCount = await this.cmdWriter.xtrim(
REDIS_IRC_POOL_COMMAND_IN_STREAM, "MINID", this.commandStreamId
);
Expand Down Expand Up @@ -410,38 +418,11 @@ export class IrcConnectionPool {
void this.trimCommandStream();
}, HEARTBEAT_EVERY_MS);


log.info(`Listening for new commands`);
setImmediate(async () => {
while (this.shouldRun) {
const newCmds = await this.cmdReader.xread(
"BLOCK", 0, "STREAMS", REDIS_IRC_POOL_COMMAND_IN_STREAM, this.commandStreamId
).catch(ex => {
log.warn(`Failed to read new command:`, ex);
return null;
});
if (newCmds === null) {
// Unexpected, this is blocking.
continue;
}
// This is a list of keys, containing a list of commands, hence needing to deeply extract the values.
for (const [msgId, [cmdType, payload]] of newCmds[0][1]) {
const commandType = cmdType as InCommandType;

// If we crash, we don't want to get stuck on this msg.
await this.updateLastRead(msgId);
const commandData = JSON.parse(payload) as IrcConnectionPoolCommandIn<InCommandType>;
setImmediate(
() => this.handleCommand(commandType, commandData)
.catch(ex => log.warn(`Failed to handle msg ${msgId} (${commandType}, ${payload})`, ex)
),
);
}
}
});
return this.commandReader.start();
}

public async close() {
this.commandReader.stop();
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
}
Expand Down
Loading

0 comments on commit 8953058

Please sign in to comment.