Skip to content

Only allow a single dev queue consumer to dequeue at once #1737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ export class AuthenticatedSocketConnection {
});
});
},
canSendMessage: () => ws.readyState === WebSocket.OPEN,
});

this._consumer = new DevQueueConsumer(authenticatedEnv, this._sender, {
this._consumer = new DevQueueConsumer(this.id, authenticatedEnv, this._sender, {
ipAddress: Array.isArray(this.ipAddress) ? this.ipAddress.join(", ") : this.ipAddress,
});

Expand Down
42 changes: 42 additions & 0 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
import { getMaxDuration } from "../utils/maxDuration";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
import { createRedisClient, RedisClient } from "~/redis.server";

const MessageBody = z.discriminatedUnion("type", [
z.object({
Expand Down Expand Up @@ -53,14 +54,21 @@ export class DevQueueConsumer {
private _currentSpan: Span | undefined;
private _endSpanInNextIteration = false;
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
private _connectionLostAt?: Date;
private _redisClient: RedisClient;

constructor(
public id: string,
public env: AuthenticatedEnvironment,
private _sender: ZodMessageSender<typeof serverWebsocketMessages>,
private _options: DevQueueConsumerOptions = {}
) {
this._traceTimeoutSeconds = _options.traceTimeoutSeconds ?? 60;
this._maximumItemsPerTrace = _options.maximumItemsPerTrace ?? 1_000;
this._redisClient = createRedisClient("tr:devQueueConsumer", {
keyPrefix: "tr:devQueueConsumer:",
...devPubSub.redisOptions,
});
}

// This method is called when a background worker is deprecated and will no longer be used unless a run is locked to it
Expand Down Expand Up @@ -235,6 +243,8 @@ export class DevQueueConsumer {
return;
}

await this._redisClient.set(`connection:${this.env.id}`, this.id, "EX", 60 * 60 * 24); // 24 hours
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add cleanup in the stop method.

When the consumer is explicitly stopped (in the stop() method), you should delete the Redis key to immediately release the connection lock.

public async stop(reason: string = "CLI disconnected") {
  if (!this._enabled) {
    return;
  }

  logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env });

  this._enabled = false;
+  
+  // Release the connection lock
+  await this._redisClient.del(`connection:${this.env.id}`);

  // Create the session
  const session = await disconnectSession(this.env.id);
  
  // Rest of the method...
}

Also applies to: 283-295


this._enabled = true;
// Create the session
await createNewSession(this.env, this._options.ipAddress ?? "unknown");
Expand All @@ -252,6 +262,38 @@ export class DevQueueConsumer {
return;
}

const canSendMessage = await this._sender.validateCanSendMessage();

if (!canSendMessage) {
this._connectionLostAt ??= new Date();

if (Date.now() - this._connectionLostAt.getTime() > 60 * 1000) {
logger.debug("Connection lost for more than 60 seconds, stopping the consumer", {
env: this.env,
});

await this.stop("Connection lost for more than 60 seconds");
return;
}

setTimeout(() => this.#doWork(), 1000);
return;
}

this._connectionLostAt = undefined;

const currentConnection = await this._redisClient.get(`connection:${this.env.id}`);

if (currentConnection && currentConnection !== this.id) {
logger.debug("Another connection is active, stopping the consumer", {
currentConnection,
env: this.env,
});

await this.stop("Another connection is active");
return;
}

// Check if the trace has expired
if (
this._perTraceCountdown === 0 ||
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/utils/zodPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ export class ZodPubSub<TMessageCatalog extends ZodMessageCatalogSchema> {
this._publisher = createRedisClient("trigger:zodSubscriber", _options.redis);
}

get redisOptions() {
return this._options.redis;
}

public async publish<K extends keyof TMessageCatalog>(
channel: string,
type: K,
Expand Down