diff --git a/apps/webapp/app/v3/authenticatedSocketConnection.server.ts b/apps/webapp/app/v3/authenticatedSocketConnection.server.ts index ce98438784..15cc16c72c 100644 --- a/apps/webapp/app/v3/authenticatedSocketConnection.server.ts +++ b/apps/webapp/app/v3/authenticatedSocketConnection.server.ts @@ -43,9 +43,10 @@ export class AuthenticatedSocketConnection { }); }); }, + canSendMessage: () => ws.readyState === WebSocket.OPEN, }); - this._consumer = new DevQueueConsumer(authenticatedEnv, this._sender, { + this._consumer = new DevQueueConsumer(this.id, authenticatedEnv, this._sender, { ipAddress: Array.isArray(this.ipAddress) ? this.ipAddress.join(", ") : this.ipAddress, }); diff --git a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts index 948850410b..6a8ca101d6 100644 --- a/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/devQueueConsumer.server.ts @@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server"; import { getMaxDuration } from "../utils/maxDuration"; import { DevSubscriber, devPubSub } from "./devPubSub.server"; import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; +import { createRedisClient, RedisClient } from "~/redis.server"; const MessageBody = z.discriminatedUnion("type", [ z.object({ @@ -53,14 +54,21 @@ export class DevQueueConsumer { private _currentSpan: Span | undefined; private _endSpanInNextIteration = false; private _inProgressRuns: Map = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids + private _connectionLostAt?: Date; + private _redisClient: RedisClient; constructor( + public id: string, public env: AuthenticatedEnvironment, private _sender: ZodMessageSender, private _options: DevQueueConsumerOptions = {} ) { this._traceTimeoutSeconds = _options.traceTimeoutSeconds ?? 60; this._maximumItemsPerTrace = _options.maximumItemsPerTrace ?? 1_000; + this._redisClient = createRedisClient("tr:devQueueConsumer", { + keyPrefix: "tr:devQueueConsumer:", + ...devPubSub.redisOptions, + }); } // This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it @@ -235,6 +243,8 @@ export class DevQueueConsumer { return; } + await this._redisClient.set(`connection:${this.env.id}`, this.id, "EX", 60 * 60 * 24); // 24 hours + this._enabled = true; // Create the session await createNewSession(this.env, this._options.ipAddress ?? "unknown"); @@ -252,6 +262,38 @@ export class DevQueueConsumer { return; } + const canSendMessage = await this._sender.validateCanSendMessage(); + + if (!canSendMessage) { + this._connectionLostAt ??= new Date(); + + if (Date.now() - this._connectionLostAt.getTime() > 60 * 1000) { + logger.debug("Connection lost for more than 60 seconds, stopping the consumer", { + env: this.env, + }); + + await this.stop("Connection lost for more than 60 seconds"); + return; + } + + setTimeout(() => this.#doWork(), 1000); + return; + } + + this._connectionLostAt = undefined; + + const currentConnection = await this._redisClient.get(`connection:${this.env.id}`); + + if (currentConnection && currentConnection !== this.id) { + logger.debug("Another connection is active, stopping the consumer", { + currentConnection, + env: this.env, + }); + + await this.stop("Another connection is active"); + return; + } + // Check if the trace has expired if ( this._perTraceCountdown === 0 || diff --git a/apps/webapp/app/v3/utils/zodPubSub.server.ts b/apps/webapp/app/v3/utils/zodPubSub.server.ts index e1af35ca28..2c081bc368 100644 --- a/apps/webapp/app/v3/utils/zodPubSub.server.ts +++ b/apps/webapp/app/v3/utils/zodPubSub.server.ts @@ -116,6 +116,10 @@ export class ZodPubSub { this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis); } + get redisOptions() { + return this._options.redis; + } + public async publish( channel: string, type: K,