Skip to content

Only allow a single dev queue consumer to dequeue at once #1737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 27, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Feb 26, 2025

Summary by CodeRabbit

  • New Features
    • Improved message reliability with a new check that ensures messages are only sent when the connection is active.
    • Enhanced consumer handling by refining connection status tracking and managing multiple connection states for increased stability.
    • Added an accessible configuration option that allows for external review of connection settings.

Copy link

changeset-bot bot commented Feb 26, 2025

⚠️ No Changeset found

Latest commit: 70edf42

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Feb 26, 2025

Walkthrough

This update adds a method to check the WebSocket’s ready state before sending messages and adjusts the parameters used when instantiating a queue consumer by passing a unique connection identifier. In addition, a Redis client is now integrated into the queue consumer to track connection state changes and enforce active connection consistency. A new getter for Redis options has also been provided in the Pub/Sub utility module.

Changes

File(s) Change Summary
apps/.../authenticatedSocketConnection.server.ts & apps/.../devQueueConsumer.server.ts Added canSendMessage to verify WebSocket state; updated DevQueueConsumer instantiation to include this.id; integrated Redis client, added connection lost tracking, and modified work logic to monitor active connection state.
apps/.../zodPubSub.server.ts Introduced a new getter method redisOptions to expose Redis options from the internal configuration.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant A as AuthenticatedSocketConnection
    participant D as DevQueueConsumer
    participant R as Redis

    C->>A: Initiate message send
    A->>A: Execute canSendMessage() check
    alt WebSocket is OPEN
        A->>D: Instantiate DevQueueConsumer (passing id)
        D->>R: Set active connection key
        D->>D: Check message-sending capability in doWork()
        alt Redis key mismatch or timeout
            D->>D: Stop consumer
        else
            D->>A: Process message tasks
        end
    else
        A-->>C: Block sending (connection not open)
    end
Loading

Possibly related PRs

Poem

In bytes I hop with gentle grace,
Checking webs to keep a steady pace,
My ears twitch at Redis’ call,
Monitoring queues, I oversee all,
Code carrots aplenty for this techy space!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (3)

57-58: Consider renaming for clarity
_connectionLostAt can be renamed (e.g., _lastConnectionLossTime) to more clearly express how it's used.


68-71: Confirm keyPrefix usage
Merging devPubSub.redisOptions under the same key prefix could collide across multiple dev queue consumers. Consider incorporating the environment-specific identifier into the prefix to safeguard against collisions.


265-280: Consider backoff for repeated checks
Retrying every second can be noisy if the connection remains unavailable for a long period. Implementing an exponential (or configurable) backoff could reduce resource usage while waiting for reconnection.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 376a0c5 and f9796b2.

📒 Files selected for processing (3)
  • apps/webapp/app/v3/authenticatedSocketConnection.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (4 hunks)
  • apps/webapp/app/v3/utils/zodPubSub.server.ts (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (5)
apps/webapp/app/v3/utils/zodPubSub.server.ts (1)

119-121: Ensure sensitive data is not inadvertently exposed
Returning the entire redis configuration may include connection secrets if present. Please review whether exposing these options is necessary or if a redacted or read-only object is more suitable.

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (2)

25-25: Import usage looks good
No issues spotted with importing createRedisClient and RedisClient here.


61-61: Constructor parameter addition
Introducing a separate id parameter is a helpful way to uniquely identify each consumer. Ensure callers consistently provide meaningful or globally unique IDs.

apps/webapp/app/v3/authenticatedSocketConnection.server.ts (2)

46-46: Clever readiness check
Using ws.readyState === WebSocket.OPEN for canSendMessage is straightforward. Double-check edge cases around CLOSING or CLOSED states to avoid unexpected failures.


49-49: Validate uniqueness of connection ID
Passing this.id is convenient; confirm if collisions are possible in multi-tenant or multi-connection scenarios. If collisions are a concern, consider prefixing with relevant environment information.

Comment on lines 246 to 247
await this._redisClient.set(`connection:${this.env.id}`, this.id);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add TTL or cleanup routine for the Redis key
Setting connection:${this.env.id} may lead to stale entries if the consumer crashes. Consider using an expiration or a cleanup process, so inactive connections won't block new consumers.

Comment on lines 282 to 294
this._connectionLostAt = undefined;

const currentConnection = await this._redisClient.get(`connection:${this.env.id}`);

if (currentConnection !== this.id) {
logger.debug("Another connection is active, stopping the consumer", {
currentConnection,
env: this.env,
});

await this.stop("Another connection is active");
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Assess concurrency handling
Using Redis to store the active connection ID is a good start, but there may be a race condition if multiple consumers try to claim the same environment concurrently. For robust exclusivity, consider using distributed locks or atomic check-and-set operations.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently, please check "Code review limits" under "Moderation" settings.

Actionable comments posted: 4

🧹 Nitpick comments (3)
packages/core/src/v3/logger/taskLogger.ts (1)

112-114: Method signature should match the interface declaration.

While functionally correct, the method signature in NoopTaskLogger doesn't match the interface declaration. For better code clarity and maintainability, it should include the same parameters as defined in the interface.

-  startSpan(): Span {
+  startSpan(name: string, options?: SpanOptions): Span {
     return {} as Span;
   }
packages/core/src/v3/streams/asyncIterableStream.ts (2)

1-1: Consider clarifying the hybrid type signature.

AsyncIterableStream<T> merges AsyncIterable<T> with ReadableStream<T>. While this approach allows flexible usage, be mindful that some consumers might expect standard ReadableStream methods without implementing async iteration themselves, or vice versa. Consider adding documentation clarifying usage and any limitations.


7-7: Avoid using the any type for transformedStream.

Using any defeats the benefits of static typing. You can leverage the known type structure of pipeThrough to provide more precise type definitions:

- const transformedStream: any = source.pipeThrough(new TransformStream(transformer));
+ const transformedStream = source.pipeThrough(new TransformStream<S, T>(transformer));
🛑 Comments failed to post (4)
packages/python/README.md (2)

58-73: ⚠️ Potential issue

Undefined variable usage in streaming loop.

It appears that result is assigned from python.stream.runScript(...), but the subsequent for await (const chunk of streamingResult) references a different variable, streamingResult, which is never defined. This will cause a runtime error. A fix is to use the same variable identifier consistently:

-    const result = python.stream.runScript("my_script.py", ["hello", "world"]);
+    const streamingResult = python.stream.runScript("my_script.py", ["hello", "world"]);

     // result is an async iterable/readable stream
-    for await (const chunk of streamingResult) {
+    for await (const chunk of streamingResult) {
       logger.debug("convert-url-to-markdown", {
         url: payload.url,
         chunk,
       });
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

export const myStreamingScript = task({
  id: "my-streaming-python-script",
  run: async () => {
    // You can also stream the output of the script
-    const result = python.stream.runScript("my_script.py", ["hello", "world"]);
+    const streamingResult = python.stream.runScript("my_script.py", ["hello", "world"]);

    // result is an async iterable/readable stream
-    for await (const chunk of streamingResult) {
+    for await (const chunk of streamingResult) {
      logger.debug("convert-url-to-markdown", {
        url: payload.url,
        chunk,
      });
    }
  },
});

66-71: ⚠️ Potential issue

Missing definition of 'payload'.

payload.url is accessed inside the loop, but no definition of payload is visible within this snippet. This will lead to a ReferenceError. Verify or define payload before it’s used:

 for await (const chunk of streamingResult) {
-  logger.debug("convert-url-to-markdown", {
-    url: payload.url,
-    chunk,
-  });
+  const url = "some_url_value_or_variable";
+  logger.debug("convert-url-to-markdown", {
+    url,
+    chunk,
+  });
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

    for await (const chunk of streamingResult) {
      const url = "some_url_value_or_variable";
      logger.debug("convert-url-to-markdown", {
        url,
        chunk,
      });
    }
packages/core/src/v3/streams/asyncIterableStream.ts (2)

88-90: 🛠️ Refactor suggestion

Selective cancellation approach might be too specific.

The code checks if ("kill" in asyncIterable) { (asyncIterable as any).kill(); }, but not all async iterables will have a kill method. This can introduce fragile assumptions. Consider a more generic abort or cleanup mechanism or properly type-guard to handle process-like objects separately.


12-14: ⚠️ Potential issue

Review the hardcoded IteratorResult<string> return type.

The iterator’s next method specifies Promise<IteratorResult<string>>, but the value read from reader.read() likely matches the generic type T (or a transformed version). Consider using a type parameter instead of string:

- async next(): Promise<IteratorResult<string>> {
+ async next(): Promise<IteratorResult<T>> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

      async next(): Promise<IteratorResult<T>> {
        const { done, value } = await reader.read();
        return done ? { done: true, value: undefined } : { done: false, value };

Copy link
Member

@matt-aitken matt-aitken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a return

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (1)

265-295: Add periodic Redis key refresh.

To prevent the Redis key from expiring while the connection is still active, implement a periodic refresh mechanism.

async #enable() {
  if (this._enabled) {
    return;
  }

  await this._redisClient.set(`connection:${this.env.id}`, this.id, "EX", 60 * 60 * 24); // 24 hours

+  // Set up periodic refresh of the connection lock
+  this._refreshInterval = setInterval(async () => {
+    if (this._enabled) {
+      await this._redisClient.expire(`connection:${this.env.id}`, 60 * 60 * 24);
+    }
+  }, 60 * 60 * 1000); // Refresh every hour

  this._enabled = true;
  // Rest of the method...
}

public async stop(reason: string = "CLI disconnected") {
  if (!this._enabled) {
    return;
  }

  logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env });

  this._enabled = false;
+  
+  // Clear the refresh interval
+  if (this._refreshInterval) {
+    clearInterval(this._refreshInterval);
+    this._refreshInterval = undefined;
+  }
+
+  // Release the connection lock
+  await this._redisClient.del(`connection:${this.env.id}`);

  // Rest of the method...
}

Don't forget to add the corresponding property to the class:

private _refreshInterval?: NodeJS.Timeout;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fa6acc6 and 70edf42.

📒 Files selected for processing (1)
  • apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (7)

25-25: Good import addition.

Adding the Redis client import is necessary for the new connection tracking functionality.


57-58: Well-structured property definitions.

The private properties for connection tracking are appropriately defined. The optional _connectionLostAt property will help detect long disconnections.


61-61: Constructor parameter appropriately added.

Adding the ID parameter is essential for uniquely identifying each consumer connection.


68-71: Initialize Redis client with proper configuration.

The Redis client initialization looks good, leveraging the existing Redis options from the pubsub module.


246-246: Add TTL or cleanup routine for the Redis key.

Setting connection:${this.env.id} may lead to stale entries if the consumer crashes. Consider using an expiration or a cleanup process, so inactive connections won't block new consumers.

The 24-hour expiration is a good start, but consider implementing a periodic refresh of this key while the connection is active, or a cleanup routine when the consumer is explicitly stopped.


265-281: Good connection validation implementation.

The implementation correctly checks if messages can be sent and handles connection losses gracefully:

  1. Tracks connection loss time
  2. Stops the consumer if disconnected for more than 60 seconds
  3. Returns early to prevent further processing
  4. Schedules next check appropriately

Regarding line 281, the return statement is indeed necessary to prevent execution from continuing, addressing Matt's comment.


283-295: Assess concurrency handling.

Using Redis to store the active connection ID is a good start, but there may be a race condition if multiple consumers try to claim the same environment concurrently. For robust exclusivity, consider using distributed locks or atomic check-and-set operations.

Additionally, consider implementing a heartbeat mechanism to periodically refresh the Redis key, ensuring that if a consumer becomes unresponsive without proper cleanup, the lock will eventually expire allowing new consumers to connect.

@@ -235,6 +243,8 @@ export class DevQueueConsumer {
return;
}

await this._redisClient.set(`connection:${this.env.id}`, this.id, "EX", 60 * 60 * 24); // 24 hours
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add cleanup in the stop method.

When the consumer is explicitly stopped (in the stop() method), you should delete the Redis key to immediately release the connection lock.

public async stop(reason: string = "CLI disconnected") {
  if (!this._enabled) {
    return;
  }

  logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env });

  this._enabled = false;
+  
+  // Release the connection lock
+  await this._redisClient.del(`connection:${this.env.id}`);

  // Create the session
  const session = await disconnectSession(this.env.id);
  
  // Rest of the method...
}

Also applies to: 283-295

@matt-aitken matt-aitken merged commit 0e5ec8b into main Feb 27, 2025
11 checks passed
@matt-aitken matt-aitken deleted the ea-branch-18 branch February 27, 2025 15:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants