@@ -20,7 +20,6 @@ import {
2020 unflattenAttributes ,
2121} from "@trigger.dev/core/v3" ;
2222import { Prisma , TaskEvent , TaskEventStatus , type TaskEventKind } from "@trigger.dev/database" ;
23- import Redis , { RedisOptions } from "ioredis" ;
2423import { createHash } from "node:crypto" ;
2524import { EventEmitter } from "node:stream" ;
2625import { Gauge } from "prom-client" ;
@@ -32,6 +31,7 @@ import { logger } from "~/services/logger.server";
3231import { singleton } from "~/utils/singleton" ;
3332import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server" ;
3433import { startActiveSpan } from "./tracer.server" ;
34+ import { createRedisClient , RedisClient , RedisWithClusterOptions } from "~/redis.server" ;
3535
3636const MAX_FLUSH_DEPTH = 5 ;
3737
@@ -97,7 +97,7 @@ export type EventBuilder = {
9797export type EventRepoConfig = {
9898 batchSize : number ;
9999 batchInterval : number ;
100- redis : RedisOptions ;
100+ redis : RedisWithClusterOptions ;
101101 retentionInDays : number ;
102102} ;
103103
@@ -200,7 +200,7 @@ type TaskEventSummary = Pick<
200200export class EventRepository {
201201 private readonly _flushScheduler : DynamicFlushScheduler < CreatableEvent > ;
202202 private _randomIdGenerator = new RandomIdGenerator ( ) ;
203- private _redisPublishClient : Redis ;
203+ private _redisPublishClient : RedisClient ;
204204 private _subscriberCount = 0 ;
205205
206206 get subscriberCount ( ) {
@@ -218,7 +218,7 @@ export class EventRepository {
218218 callback : this . #flushBatch. bind ( this ) ,
219219 } ) ;
220220
221- this . _redisPublishClient = new Redis ( this . _config . redis ) ;
221+ this . _redisPublishClient = createRedisClient ( "trigger:eventRepoPublisher" , this . _config . redis ) ;
222222 }
223223
224224 async insert ( event : CreatableEvent ) {
@@ -989,7 +989,7 @@ export class EventRepository {
989989 }
990990
991991 async subscribeToTrace ( traceId : string ) {
992- const redis = new Redis ( this . _config . redis ) ;
992+ const redis = createRedisClient ( "trigger:eventRepoSubscriber" , this . _config . redis ) ;
993993
994994 const channel = `events:${ traceId } ` ;
995995
@@ -1147,8 +1147,8 @@ function initializeEventRepo() {
11471147 host : env . PUBSUB_REDIS_HOST ,
11481148 username : env . PUBSUB_REDIS_USERNAME ,
11491149 password : env . PUBSUB_REDIS_PASSWORD ,
1150- enableAutoPipelining : true ,
1151- ... ( env . PUBSUB_REDIS_TLS_DISABLED === "true" ? { } : { tls : { } } ) ,
1150+ tlsDisabled : env . PUBSUB_REDIS_TLS_DISABLED === " true" ,
1151+ clusterMode : env . PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1" ,
11521152 } ,
11531153 } ) ;
11541154
0 commit comments