Skip to content

Redis client factory (error handling and good defaults) #1761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/webapp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"@heroicons/react": "^2.0.12",
"@internal/run-engine": "workspace:*",
"@internal/zod-worker": "workspace:*",
"@internal/redis": "workspace:*",
"@internal/redis-worker": "workspace:*",
"@internationalized/date": "^3.5.1",
"@lezer/highlight": "^1.1.6",
Expand Down Expand Up @@ -258,4 +259,4 @@
"engines": {
"node": ">=16.0.0"
}
}
}
4 changes: 3 additions & 1 deletion apps/webapp/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
"@internal/run-engine": ["../../internal-packages/run-engine/src/index"],
"@internal/run-engine/*": ["../../internal-packages/run-engine/src/*"],
"@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"],
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"]
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"],
"@internal/redis": ["../../internal-packages/redis/src/index"],
"@internal/redis/*": ["../../internal-packages/redis/src/*"]
},
"noEmit": true
}
Expand Down
3 changes: 2 additions & 1 deletion internal-packages/redis-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"type": "module",
"dependencies": {
"@opentelemetry/api": "^1.9.0",
"@internal/redis": "workspace:*",
"@trigger.dev/core": "workspace:*",
"ioredis": "^5.3.2",
"lodash.omit": "^4.5.0",
Expand All @@ -23,4 +24,4 @@
"typecheck": "tsc --noEmit",
"test": "vitest --no-file-parallelism"
}
}
}
47 changes: 21 additions & 26 deletions internal-packages/redis-worker/src/queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createRedisClient } from "@internal/redis";
import { Logger } from "@trigger.dev/core/logger";
import Redis, { type Callback, type RedisOptions, type Result } from "ioredis";
import { nanoid } from "nanoid";
Expand Down Expand Up @@ -50,35 +51,29 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
logger?: Logger;
}) {
this.name = name;
this.redis = new Redis({
...redisOptions,
keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`,
retryStrategy(times) {
const delay = Math.min(times * 50, 1000);
return delay;
this.logger = logger ?? new Logger("SimpleQueue", "debug");

this.redis = createRedisClient(
{
...redisOptions,
keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`,
retryStrategy(times) {
const delay = Math.min(times * 50, 1000);
return delay;
},
maxRetriesPerRequest: 20,
},
maxRetriesPerRequest: 20,
});
{
onError: (error) => {
this.logger.error(`RedisWorker queue redis client error:`, {
error,
keyPrefix: redisOptions.keyPrefix,
});
},
}
);
this.#registerCommands();
this.schema = schema;

this.logger = logger ?? new Logger("SimpleQueue", "debug");

this.redis.on("error", (error) => {
this.logger.error(`Redis Error for queue ${this.name}:`, { queue: this.name, error });
});

this.redis.on("connect", () => {
this.logger.log(`Redis connected for queue ${this.name}`);
});

this.redis.on("reconnecting", () => {
this.logger.warn(`Redis reconnecting for queue ${this.name}`);
});

this.redis.on("close", () => {
this.logger.warn(`Redis connection closed for queue ${this.name}`);
});
}

async enqueue({
Expand Down
3 changes: 2 additions & 1 deletion internal-packages/redis-worker/src/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { expect } from "vitest";
import { z } from "zod";
import { Worker } from "./worker.js";
import Redis from "ioredis";
import { createRedisClient } from "@internal/redis";

describe("Worker", () => {
redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
Expand Down Expand Up @@ -241,7 +242,7 @@ describe("Worker", () => {
expect(dlqSize).toBe(1);

// Create a Redis client to publish the redrive message
const redisClient = new Redis({
const redisClient = createRedisClient({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
Expand Down
10 changes: 9 additions & 1 deletion internal-packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Redis from "ioredis";
import { nanoid } from "nanoid";
import { startSpan } from "./telemetry.js";
import pLimit from "p-limit";
import { createRedisClient } from "@internal/redis";

export type WorkerCatalog = {
[key: string]: {
Expand Down Expand Up @@ -108,7 +109,14 @@ class Worker<TCatalog extends WorkerCatalog> {

this.setupShutdownHandlers();

this.subscriber = new Redis(this.options.redisOptions);
this.subscriber = createRedisClient(this.options.redisOptions, {
onError: (error) => {
this.logger.error(`RedisWorker subscriber redis client error:`, {
error,
keyPrefix: this.options.redisOptions.keyPrefix,
});
},
});
this.setupSubscriber();

return this;
Expand Down
4 changes: 3 additions & 1 deletion internal-packages/redis-worker/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
"@internal/testcontainers": ["../../internal-packages/testcontainers/src/index"],
"@internal/testcontainers/*": ["../../internal-packages/testcontainers/src/*"],
"@trigger.dev/core": ["../../packages/core/src/index"],
"@trigger.dev/core/*": ["../../packages/core/src/*"]
"@trigger.dev/core/*": ["../../packages/core/src/*"],
"@internal/redis": ["../../internal-packages/redis/src/index"],
"@internal/redis/*": ["../../internal-packages/redis/src/*"]
}
},
"exclude": ["node_modules"]
Expand Down
3 changes: 3 additions & 0 deletions internal-packages/redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Redis

This is a simple package that is used to return a valid Redis client and provides an error callback. It will log and swallow errors if they're not handled.
18 changes: 18 additions & 0 deletions internal-packages/redis/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "@internal/redis",
"private": true,
"version": "0.0.1",
"main": "./src/index.ts",
"types": "./src/index.ts",
"type": "module",
"dependencies": {
"ioredis": "^5.3.2",
"@trigger.dev/core": "workspace:*"
},
"devDependencies": {
"vitest": "^1.4.0"
},
"scripts": {
"typecheck": "tsc --noEmit"
}
}
40 changes: 40 additions & 0 deletions internal-packages/redis/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Redis, RedisOptions } from "ioredis";
import { Logger } from "@trigger.dev/core/logger";

const defaultOptions: Partial<RedisOptions> = {
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 1000);
return delay;
},
maxRetriesPerRequest: 20,
};

const logger = new Logger("Redis", "debug");

export function createRedisClient(
options: RedisOptions,
handlers?: { onError?: (err: Error) => void }
): Redis {
const client = new Redis({
...defaultOptions,
...options,
});

// Skip error handling setup if running in Vitest
if (process.env.VITEST) {
client.on("error", (error) => {
// swallow errors
});
return client;
}

client.on("error", (error) => {
if (handlers?.onError) {
handlers.onError(error);
} else {
logger.error(`Redis client error:`, { error, keyPrefix: options.keyPrefix });
}
});

return client;
}
23 changes: 23 additions & 0 deletions internal-packages/redis/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"compilerOptions": {
"target": "ES2019",
"lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"],
"module": "CommonJS",
"moduleResolution": "Node",
"moduleDetection": "force",
"verbatimModuleSyntax": false,
"types": ["vitest/globals"],
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
"isolatedModules": true,
"preserveWatchOutput": true,
"skipLibCheck": true,
"noEmit": true,
"strict": true,
"paths": {
"@trigger.dev/core": ["../../packages/core/src/index"],
"@trigger.dev/core/*": ["../../packages/core/src/*"]
}
},
"exclude": ["node_modules"]
}
1 change: 1 addition & 0 deletions internal-packages/run-engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"main": "./src/index.ts",
"types": "./src/index.ts",
"dependencies": {
"@internal/redis": "workspace:*",
"@internal/redis-worker": "workspace:*",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/semantic-conventions": "^1.27.0",
Expand Down
19 changes: 15 additions & 4 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createRedisClient } from "@internal/redis";
import { Worker } from "@internal/redis-worker";
import { Attributes, Span, SpanKind, trace, Tracer } from "@opentelemetry/api";
import { assertExhaustive } from "@trigger.dev/core";
Expand Down Expand Up @@ -135,10 +136,20 @@ export class RunEngine {

constructor(private readonly options: RunEngineOptions) {
this.prisma = options.prisma;
this.runLockRedis = new Redis({
...options.runLock.redis,
keyPrefix: `${options.runLock.redis.keyPrefix}runlock:`,
});
this.runLockRedis = createRedisClient(
{
...options.runLock.redis,
keyPrefix: `${options.runLock.redis.keyPrefix}runlock:`,
},
{
onError: (error) => {
this.logger.error(`RunLock redis client error:`, {
error,
keyPrefix: options.runLock.redis.keyPrefix,
});
},
}
);
this.runLock = new RunLocker({ redis: this.runLockRedis });

this.runQueue = new RunQueue({
Expand Down
6 changes: 3 additions & 3 deletions internal-packages/run-engine/src/engine/locking.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { createRedisClient } from "@internal/redis";
import { redisTest } from "@internal/testcontainers";
import { expect } from "vitest";
import { RunLocker } from "./locking.js";
import Redis from "ioredis";

describe("RunLocker", () => {
redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => {
const redis = new Redis(redisOptions);
const redis = createRedisClient(redisOptions);
try {
const runLock = new RunLocker({ redis });

Expand All @@ -23,7 +23,7 @@ describe("RunLocker", () => {
});

redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => {
const redis = new Redis(redisOptions);
const redis = createRedisClient(redisOptions);
try {
const runLock = new RunLocker({ redis });

Expand Down
11 changes: 6 additions & 5 deletions internal-packages/run-engine/src/run-queue/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { setTimeout } from "node:timers/promises";
import { RunQueue } from "./index.js";
import { SimpleWeightedChoiceStrategy } from "./simpleWeightedPriorityStrategy.js";
import { InputPayload } from "./types.js";
import { createRedisClient } from "@internal/redis";

const testOptions = {
name: "rq",
Expand Down Expand Up @@ -468,7 +469,7 @@ describe("RunQueue", () => {
},
});

const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });

try {
await queue.enqueueMessage({
Expand Down Expand Up @@ -598,7 +599,7 @@ describe("RunQueue", () => {
},
});

const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });

try {
await queue.enqueueMessage({
Expand Down Expand Up @@ -689,7 +690,7 @@ describe("RunQueue", () => {
},
});

const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });

try {
await queue.enqueueMessage({
Expand Down Expand Up @@ -803,7 +804,7 @@ describe("RunQueue", () => {
},
});

const redis = new Redis({ ...redisOptions, keyPrefix: "runqueue:test:" });
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });

try {
await queue.enqueueMessage({
Expand Down Expand Up @@ -858,7 +859,7 @@ describe("RunQueue", () => {
expect(dlqMembers).toContain(messageProd.runId);

//redrive
const redisClient = new Redis({
const redisClient = createRedisClient({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
Expand Down
19 changes: 17 additions & 2 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
RunQueueKeyProducer,
RunQueuePriorityStrategy,
} from "./types.js";
import { createRedisClient } from "@internal/redis";

const SemanticAttributes = {
QUEUE: "runqueue.queue",
Expand Down Expand Up @@ -71,13 +72,27 @@ export class RunQueue {

constructor(private readonly options: RunQueueOptions) {
this.retryOptions = options.retryOptions ?? defaultRetrySettings;
this.redis = new Redis(options.redis);
this.redis = createRedisClient(options.redis, {
onError: (error) => {
this.logger.error(`RunQueue redis client error:`, {
error,
keyPrefix: options.redis.keyPrefix,
});
},
});
this.logger = options.logger;

this.keys = new RunQueueShortKeyProducer("rq:");
this.queuePriorityStrategy = options.queuePriorityStrategy;

this.subscriber = new Redis(options.redis);
this.subscriber = createRedisClient(options.redis, {
onError: (error) => {
this.logger.error(`RunQueue subscriber redis client error:`, {
error,
keyPrefix: options.redis.keyPrefix,
});
},
});
this.#setupSubscriber();

this.#registerCommands();
Expand Down
Loading
Loading