Skip to content

Engine v1 improvements #1627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 21, 2025
Merged

Engine v1 improvements #1627

merged 4 commits into from
Jan 21, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Jan 21, 2025

  • Maximum nack count
  • Make sure to resolve the TaskQueue record correctly and don't just keep trying and failing to find it
  • Use far fewer redis commands when dequeuing a message
  • Make dequeuing a message more reliable by iterating through all the possible queues until we get a dequeued message
  • Improve the queue windowing logic
  • Handle coordinator emitWithAck timeout errors more gracefully, and decrease the timeout
  • Speed up building resume messages

Summary by CodeRabbit

Release Notes

  • Environment Configuration

    • Added new environment variables for queue and task management.
    • Introduced configuration options for queue consumer and MARQS settings.
  • Queue Management

    • Enhanced queue selection and prioritization strategies.
    • Improved message handling with new negative acknowledgment (nack) tracking.
    • Added maximum nack count limit for message processing.
  • Task Processing

    • Refined task queue retrieval and management.
    • Improved error handling and logging for task runs.
    • Updated concurrency management for task execution.
  • System Improvements

    • Streamlined code structure for better maintainability.
    • Enhanced type safety for environment and queue-related operations.
    • Introduced more robust tracing and error tracking.
  • New Features

    • Added new tasks for managing and executing queues.

These changes focus on improving the reliability, performance, and flexibility of queue and task management within the system.

Copy link

changeset-bot bot commented Jan 21, 2025

⚠️ No Changeset found

Latest commit: 55fb1f3

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Jan 21, 2025

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 eslint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

apps/webapp/app/env.server.ts

Oops! Something went wrong! :(

ESLint: 8.45.0

ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct.

The config "custom" was referenced from the config file in "/.eslintrc.js".

If you still have problems, please stop by https://eslint.org/chat/help to chat with the team.

Walkthrough

This pull request introduces significant enhancements to the MarQS (Message Queue System) and related infrastructure, focusing on improving queue management, concurrency control, and error handling. The changes span multiple files across the application, introducing new configuration options, refactoring queue selection strategies, and adding more robust error tracking mechanisms. Key modifications include adding environment variables for queue consumer settings, updating queue selection logic, and implementing a maximum negative acknowledgment count for messages.

Changes

File Change Summary
apps/webapp/app/env.server.ts Added 5 new environment configuration properties for queue consumer and MARQS settings
apps/webapp/app/models/taskQueue.server.ts Introduced findQueueInEnvironment and sanitizeQueueName functions for improved queue retrieval
apps/webapp/app/routes/admin.api.v1.marqs.ts Removed API route loader function for admin operations
apps/webapp/app/services/apiAuth.server.ts Updated parameter type for generateJWTTokenForEnvironment function
apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts Added new types and updated function signatures for environment variable resolution
apps/webapp/app/v3/marqs/devQueueConsumer.server.ts Deprecated taskHeartbeat and added taskRunHeartbeat, updated task queue retrieval logic
apps/webapp/app/v3/marqs/index.server.ts Significant updates to MarQS class, including concurrency management and error handling improvements
apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts Refactored queue selection strategy, replacing chooseQueue with moveToNextRange and distributeQueues
apps/webapp/app/v3/marqs/types.ts Removed chooseQueue method and added distributeQueues and moveToNextRange methods
apps/webapp/app/v3/marqs/v2.server.ts Added maximumNackCount property to MarQS configuration
apps/webapp/app/v3/models/workerDeployment.server.ts Updated WorkerDeploymentWithWorkerTasks type to include queueConfig
apps/webapp/app/v3/services/createBackgroundWorker.server.ts Updated import for sanitizeQueueName function
apps/webapp/app/v3/services/createTaskRunAttempt.server.ts Updated queue retrieval logic to use findQueueInEnvironment
apps/webapp/app/v3/services/finalizeTaskRun.server.ts Updated queue retrieval logic to use findQueueInEnvironment
apps/webapp/app/v3/services/triggerTask.server.ts Updated import for sanitizeQueueName function
packages/core/src/v3/schemas/messages.ts Introduced new AckCallbackResult type and updated related callback structures
references/v3-catalog/src/trigger/queues.ts Added new tasks for queue management

Sequence Diagram

sequenceDiagram
    participant Client
    participant MarQS
    participant QueueConsumer
    participant MessageQueue
    
    Client->>MarQS: Enqueue Message
    MarQS->>MessageQueue: Store Message
    MarQS-->>Client: Message Enqueued
    
    QueueConsumer->>MarQS: Dequeue Message
    MarQS->>MessageQueue: Retrieve Message
    MarQS->>QueueConsumer: Message Details
    
    alt Message Processing Successful
        QueueConsumer->>MarQS: Acknowledge Message
        MarQS->>MessageQueue: Remove Message
    else Message Processing Failed
        QueueConsumer->>MarQS: Negative Acknowledge
        MarQS->>MessageQueue: Check Nack Count
        alt Nack Count Exceeded
            MarQS->>MessageQueue: Permanently Fail Message
        else Nack Count Within Limit
            MarQS->>MessageQueue: Requeue Message
        end
    end
Loading

Possibly related PRs

Poem

🐰 Hop, hop, queue the code!
MarQS dances with grace and might
Messages flow, errors take flight
Concurrency bounds, now set just right
A rabbit's queue, a software delight! 🚀

✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (1)

Line range hint 100-105: Fix incorrect totalWeight assignment overwriting calculated value.

In the #calculateQueueWeights method, after calculating totalWeight based on size and age, the return statement assigns totalWeight: age, overwriting the calculated totalWeight. This causes the weighting logic to be ineffective.

Apply this diff to fix the issue:

       return {
         queue,
-        totalWeight: age,
+        totalWeight: totalWeight,
       };
🧹 Nitpick comments (6)
references/v3-catalog/src/trigger/queues.ts (1)

34-42: Consider making queue name configurable.

The hardcoded queue name "named-queue" might limit reusability. Consider making it configurable through environment variables or task parameters.

apps/webapp/app/models/taskQueue.server.ts (1)

5-58: Add error logging and improve type safety.

The function could benefit from error logging when queue lookups fail, and explicit return type annotations.

Consider these improvements:

 export async function findQueueInEnvironment(
   queueName: string,
   environmentId: string,
   backgroundWorkerTaskId?: string,
   backgroundTask?: { queueConfig?: unknown }
-): Promise<TaskQueue | undefined> {
+): Promise<TaskQueue | undefined> {
   const sanitizedQueueName = sanitizeQueueName(queueName);
 
   const queue = await prisma.taskQueue.findFirst({
     where: {
       runtimeEnvironmentId: environmentId,
       name: sanitizedQueueName,
     },
   });
 
   if (queue) {
     return queue;
   }
+
+  logger.debug("Queue not found with sanitized name", {
+    sanitizedQueueName,
+    environmentId,
+  });
apps/webapp/app/v3/marqs/v2.server.ts (1)

85-85: Consider making maximumNackCount configurable.

The hardcoded value of 10 for maximumNackCount might need to be adjusted based on different environments or use cases. Consider making this configurable through environment variables, similar to other MarQS settings.

-    maximumNackCount: 10,
+    maximumNackCount: env.V2_MARQS_MAXIMUM_NACK_COUNT ?? 10,
apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (1)

Line range hint 705-767: Consider refactoring duplicated OpenTelemetry configuration logic.

The OpenTelemetry configuration logic is duplicated between resolveBuiltInDevVariables and resolveBuiltInProdVariables. Consider extracting this into a shared helper function to improve maintainability.

+function getOtelBatchConfig(config: {
+  enabled: string;
+  spanMaxExportBatchSize: string;
+  spanScheduledDelayMillis: string;
+  spanExportTimeoutMillis: string;
+  spanMaxQueueSize: string;
+  logMaxExportBatchSize: string;
+  logScheduledDelayMillis: string;
+  logExportTimeoutMillis: string;
+  logMaxQueueSize: string;
+}): EnvironmentVariable[] {
+  if (config.enabled !== "1") return [];
+  return [
+    { key: "OTEL_BATCH_PROCESSING_ENABLED", value: "1" },
+    { key: "OTEL_SPAN_MAX_EXPORT_BATCH_SIZE", value: config.spanMaxExportBatchSize },
+    { key: "OTEL_SPAN_SCHEDULED_DELAY_MILLIS", value: config.spanScheduledDelayMillis },
+    { key: "OTEL_SPAN_EXPORT_TIMEOUT_MILLIS", value: config.spanExportTimeoutMillis },
+    { key: "OTEL_SPAN_MAX_QUEUE_SIZE", value: config.spanMaxQueueSize },
+    { key: "OTEL_LOG_MAX_EXPORT_BATCH_SIZE", value: config.logMaxExportBatchSize },
+    { key: "OTEL_LOG_SCHEDULED_DELAY_MILLIS", value: config.logScheduledDelayMillis },
+    { key: "OTEL_LOG_EXPORT_TIMEOUT_MILLIS", value: config.logExportTimeoutMillis },
+    { key: "OTEL_LOG_MAX_QUEUE_SIZE", value: config.logMaxQueueSize },
+  ];
+}

Also applies to: 767-860

apps/webapp/app/env.server.ts (1)

165-166: LGTM! Consider adding documentation for the new environment variables.

The new environment variables for queue management and message handling look good. However, it would be helpful to add JSDoc comments explaining:

  • The purpose and impact of each variable
  • The recommended values for different scenarios
  • The relationship between these variables (e.g., how MARQS_SHARED_QUEUE_SELECTION_COUNT relates to MARQS_DEV_QUEUE_SELECTION_COUNT)

Also applies to: 224-226

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (1)

436-441: LGTM! Consider adding error handling for edge cases.

The use of findQueueInEnvironment improves queue retrieval by encapsulating the logic and providing better context. However, consider adding error handling for cases where:

  • The background task is undefined
  • The queue name format is invalid
  • The environment ID is invalid
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c855e4a and 9affe8e.

📒 Files selected for processing (19)
  • apps/webapp/app/env.server.ts (2 hunks)
  • apps/webapp/app/models/taskQueue.server.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.marqs.ts (0 hunks)
  • apps/webapp/app/services/apiAuth.server.ts (2 hunks)
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (5 hunks)
  • apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (2 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (26 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (31 hunks)
  • apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (2 hunks)
  • apps/webapp/app/v3/marqs/types.ts (2 hunks)
  • apps/webapp/app/v3/marqs/v2.server.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (1 hunks)
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts (2 hunks)
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (7 hunks)
  • apps/webapp/app/v3/services/finalizeTaskRun.server.ts (4 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • packages/core/src/v3/schemas/messages.ts (4 hunks)
  • references/v3-catalog/src/trigger/queues.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • apps/webapp/app/routes/admin.api.v1.marqs.ts
✅ Files skipped from review due to trivial changes (2)
  • apps/webapp/app/v3/services/triggerTask.server.ts
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts

[error] 450-450: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (16)
apps/webapp/app/v3/marqs/index.server.ts (1)

450-450: Retain the continue statement for correct loop control flow.

Although the static analysis suggests that the continue statement is unnecessary, it is essential here. The continue ensures that after logging the error, the loop proceeds to the next iteration. Removing it would alter the loop behavior and potentially skip necessary steps.

🧰 Tools
🪛 Biome (1.9.4)

[error] 450-450: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2)

750-755: Avoid logging potentially sensitive data in debug logs.

In the debug log statement, queueMessage: message includes the entire message payload, which may contain sensitive information. To prevent potential PII leakage, avoid logging full message contents.

[security/PII leakage issue]

Apply this diff to fix the issue:

        if (!queue) {
          logger.debug("SharedQueueConsumer queue not found, so nacking message", {
-            queueMessage: message,
+            messageId: message.messageId,
            taskRunQueue: lockedTaskRun.queue,
            runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
          });

Line range hint 1049-1054: Prevent PII leakage in logs by avoiding sensitive data.

The debug log statement includes queueMessage: message.data, which may contain sensitive information. Logging such data can lead to PII leakage. It's advisable to log only necessary identifiers.

[security/PII leakage issue]

Apply this diff to fix the issue:

          logger.debug("SharedQueueConsumer queue not found, so nacking message", {
-            queueMessage: message.data,
+            messageId: message.messageId,
            queueName: sanitizeQueueName(resumableRun.queue),
            attempt: resumableAttempt,
          });
references/v3-catalog/src/trigger/queues.ts (1)

3-25: Add input validation for numberOfQueues parameter.

The numberOfQueues parameter should be validated to ensure it's greater than 0 to prevent potential division by zero in the modulo operation.

Consider adding validation:

 export const queuesController = task({
   id: "queues/controller",
   run: async ({
     numberOfQueues = 20,
     length = 20,
     waitSeconds = 3,
   }: {
     numberOfQueues?: number;
     length?: number;
     waitSeconds?: number;
   }) => {
+    if (numberOfQueues <= 0) {
+      throw new Error("numberOfQueues must be greater than 0");
+    }
     await queuesTest.batchTriggerAndWait(
apps/webapp/app/v3/models/workerDeployment.server.ts (1)

47-47: LGTM! Good addition for queue configuration.

The addition of queueConfig to task selection aligns well with the PR's objectives for improved queue management and TaskQueue record resolution.

apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (1)

145-147: LGTM! Well-structured key generation method.

The nackCounterKey method follows the established key generation pattern and maintains consistency with other methods in the class.

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (3)

99-100: LGTM! Good variable extraction.

Extracting lockedBy improves code readability and reduces repeated property access.

Also applies to: 101-104


105-110: LGTM! Improved queue lookup.

Using findQueueInEnvironment centralizes queue lookup logic and provides better error handling.


125-128: LGTM! Enhanced error messaging.

The error message now provides better guidance based on worker capabilities.

apps/webapp/app/v3/services/finalizeTaskRun.server.ts (2)

295-295: LGTM! Added queue configuration support.

Including queueConfig in the worker task selection enables enhanced queue configuration capabilities.


307-312: LGTM! Consistent queue lookup pattern.

Using findQueueInEnvironment maintains consistency with the centralized queue lookup pattern across the codebase.

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (3)

657-670: Well-structured type definitions with improved type safety!

The new RuntimeEnvironmentForEnvRepoPayload and RuntimeEnvironmentForEnvRepo types provide better type safety by explicitly defining the required fields for runtime environment data.


674-676: LGTM! Clear separation of variable resolution logic.

The function effectively combines project secrets, overridable trigger variables, and environment-specific built-in variables.


Line range hint 705-724: Verify fallback values for optional environment variables.

The code uses optional chaining for several environment variables (e.g., env.DEV_OTEL_EXPORTER_OTLP_ENDPOINT ?? env.APP_ORIGIN). Ensure that at least one of the fallback values is always defined to prevent undefined values.

✅ Verification successful

Fallback chains are properly implemented

The environment variables are correctly configured with APP_ORIGIN (defaulting to "http://localhost:3030") serving as the guaranteed fallback value for all chains. The optional variables (API_ORIGIN, STREAM_ORIGIN, DEV_OTEL_EXPORTER_OTLP_ENDPOINT) are properly handled with the nullish coalescing operator, ensuring no undefined values can occur.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for environment variable definitions to verify fallback values
rg -A 1 'APP_ORIGIN|API_ORIGIN|STREAM_ORIGIN' .env*

Length of output: 345


Script:

#!/bin/bash
# Search for environment variable validation and configuration
ast-grep --pattern 'const env = $$$' 
rg -A 5 'process\.env\.' apps/webapp/app/
rg -A 5 'env\.' apps/webapp/app/config/

Length of output: 25409


Script:

#!/bin/bash
# Check environment variable schema validation
rg -A 10 'APP_ORIGIN|API_ORIGIN|STREAM_ORIGIN|DEV_OTEL_EXPORTER_OTLP_ENDPOINT' apps/webapp/app/env.server.ts

Length of output: 1578

apps/webapp/app/services/apiAuth.server.ts (1)

414-414: Verify the impact of the type change on JWT token generation.

The parameter type change from RuntimeEnvironment to RuntimeEnvironmentForEnvRepo looks good, but let's ensure all required properties are available in the new type.

Run the following script to verify the type compatibility:

✅ Verification successful

Type change is safe for JWT token generation

The RuntimeEnvironmentForEnvRepo type includes all required properties (id, organizationId, and projectId) used in JWT token generation. The type is actually more precise as it explicitly declares the available fields through a Prisma payload selection.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if RuntimeEnvironmentForEnvRepo has all required properties used in JWT token generation

# Test: Search for properties accessed on the environment parameter
ast-grep --pattern 'environment.$_'

# Test: Compare with properties available in RuntimeEnvironmentForEnvRepo
ast-grep --pattern 'type RuntimeEnvironmentForEnvRepo = {
  $$$
}'

Length of output: 52426


Script:

#!/bin/bash
# Search for RuntimeEnvironmentForEnvRepo type definition with a more flexible pattern
ast-grep --pattern 'type RuntimeEnvironmentForEnvRepo'

# Get the content of the file where the type is used
cat apps/webapp/app/services/apiAuth.server.ts

Length of output: 14161


Script:

#!/bin/bash
# Get the content of the file containing the type definition
cat apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Length of output: 23576

packages/core/src/v3/schemas/messages.ts (1)

Line range hint 19-34: LGTM! Type safety improvements look good.

The changes improve type safety and maintainability by:

  1. Following TypeScript naming conventions with the AckCallbackResult export
  2. Making the type available for external use
  3. Ensuring consistent usage in PlatformToProviderMessages and PlatformToCoordinatorMessages

Also applies to: 292-292, 535-535

Copy link

pkg-pr-new bot commented Jan 21, 2025

@trigger.dev/core

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/core@1627

@trigger.dev/build

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/build@1627

@trigger.dev/rsc

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/rsc@1627

@trigger.dev/sdk

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/sdk@1627

trigger.dev

npm i https://pkg.pr.new/triggerdotdev/trigger.dev@1627

@trigger.dev/react-hooks

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/react-hooks@1627

commit: 55fb1f3

@ericallam ericallam force-pushed the engine-v1-fixes-improvements branch from 9affe8e to 55fb1f3 Compare January 21, 2025 14:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
apps/webapp/app/models/taskQueue.server.ts (1)

61-63: 🛠️ Refactor suggestion

Add validation for empty queue names.

The sanitization function should handle empty string cases after sanitization.

 export function sanitizeQueueName(queueName: string) {
-  return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
+  const sanitized = queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
+  if (!sanitized) {
+    throw new Error("Queue name cannot be empty after sanitization");
+  }
+  return sanitized;
 }
🧹 Nitpick comments (5)
references/v3-catalog/src/trigger/queues.ts (1)

3-25: Add input validation for numeric parameters.

The function accepts numeric parameters without validation. Consider adding checks for negative numbers and zero values.

 export const queuesController = task({
   id: "queues/controller",
   run: async ({
     numberOfQueues = 20,
     length = 20,
     waitSeconds = 3,
   }: {
     numberOfQueues?: number;
     length?: number;
     waitSeconds?: number;
   }) => {
+    if (numberOfQueues <= 0) throw new Error("numberOfQueues must be positive");
+    if (length <= 0) throw new Error("length must be positive");
+    if (waitSeconds <= 0) throw new Error("waitSeconds must be positive");
     await queuesTest.batchTriggerAndWait(
       Array.from({ length }, (_, i) => ({
         payload: { waitSeconds },
         options: {
           queue: {
             name: `queue-${i % numberOfQueues}`,
           },
         },
       }))
     );
   },
 });
apps/webapp/app/models/taskQueue.server.ts (1)

5-58: Add error handling and consider caching.

The function performs multiple database operations without error handling. Additionally, for frequently accessed queues, consider implementing a caching mechanism.

 export async function findQueueInEnvironment(
   queueName: string,
   environmentId: string,
   backgroundWorkerTaskId?: string,
   backgroundTask?: { queueConfig?: unknown }
 ): Promise<TaskQueue | undefined> {
   const sanitizedQueueName = sanitizeQueueName(queueName);
 
+  try {
     const queue = await prisma.taskQueue.findFirst({
       where: {
         runtimeEnvironmentId: environmentId,
         name: sanitizedQueueName,
       },
     });
 
     if (queue) {
       return queue;
     }
 
     const task = backgroundTask
       ? backgroundTask
       : backgroundWorkerTaskId
       ? await prisma.backgroundWorkerTask.findFirst({
           where: {
             id: backgroundWorkerTaskId,
           },
         })
       : undefined;
 
     if (!task) {
       return;
     }
 
     const queueConfig = QueueOptions.safeParse(task.queueConfig);
 
     if (queueConfig.success) {
       const taskQueueName = queueConfig.data.name
         ? sanitizeQueueName(queueConfig.data.name)
         : undefined;
 
       if (taskQueueName && taskQueueName !== sanitizedQueueName) {
         const queue = await prisma.taskQueue.findFirst({
           where: {
             runtimeEnvironmentId: environmentId,
             name: taskQueueName,
           },
         });
 
         if (queue) {
           return queue;
         }
       }
     }
+  } catch (error) {
+    throw new Error(`Failed to find queue: ${error.message}`);
+  }
 }
apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (1)

51-89: Extract magic numbers into named constants.

The randomization logic uses magic numbers that should be extracted into named constants for better maintainability.

+const PROBABILITY_THRESHOLD = 0.2;  // 20% threshold for maintaining weight order
+const RANDOM_FACTOR_RANGE = 0.2;    // ±10% random adjustment
+
 distributeQueues(queues: QueueWithScores[]): Array<string> {
   // ... existing code ...
   
   const shuffledWeightedQueues = weightedQueues
     .map((queueInfo, index) => ({
       ...queueInfo,
       // Add some controlled randomness while maintaining general weight order
-      randomFactor: Math.random() * 0.2 - 0.1, // ±10% random adjustment
+      randomFactor: Math.random() * RANDOM_FACTOR_RANGE - (RANDOM_FACTOR_RANGE / 2),
       originalIndex: index,
     }))
     .sort((a, b) => {
       // If probability difference is significant (>20%), maintain order
-      if (Math.abs(a.probability - b.probability) > 0.2) {
+      if (Math.abs(a.probability - b.probability) > PROBABILITY_THRESHOLD) {
         return b.probability - a.probability;
       }
       // Otherwise, allow some randomization while keeping similar weights roughly together
       return b.probability + b.randomFactor - (a.probability + a.randomFactor);
     })
apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (2)

1092-1120: Improve timeout handling for coordinator communication.

The coordinator emitWithAck timeout is hardcoded to an environment variable. Consider:

  1. Adding a fallback timeout value
  2. Implementing exponential backoff for retries
-              .timeout(env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS)
+              .timeout(env.SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS ?? 5000)

1514-1518: Consider using a utility library for array chunking.

The custom chunk function could be replaced with a well-tested utility library like Lodash's _.chunk.

-function chunk<T>(arr: T[], chunkSize: number): T[][] {
-  return Array.from({ length: Math.ceil(arr.length / chunkSize) }, (_, i) =>
-    arr.slice(i * chunkSize, i * chunkSize + chunkSize)
-  );
-}
+import { chunk } from 'lodash';
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9affe8e and 55fb1f3.

📒 Files selected for processing (19)
  • apps/webapp/app/env.server.ts (2 hunks)
  • apps/webapp/app/models/taskQueue.server.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.marqs.ts (0 hunks)
  • apps/webapp/app/services/apiAuth.server.ts (2 hunks)
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts (5 hunks)
  • apps/webapp/app/v3/marqs/devQueueConsumer.server.ts (2 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (26 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (31 hunks)
  • apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (2 hunks)
  • apps/webapp/app/v3/marqs/types.ts (2 hunks)
  • apps/webapp/app/v3/marqs/v2.server.ts (1 hunks)
  • apps/webapp/app/v3/models/workerDeployment.server.ts (1 hunks)
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts (2 hunks)
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts (7 hunks)
  • apps/webapp/app/v3/services/finalizeTaskRun.server.ts (4 hunks)
  • apps/webapp/app/v3/services/triggerTask.server.ts (2 hunks)
  • packages/core/src/v3/schemas/messages.ts (4 hunks)
  • references/v3-catalog/src/trigger/queues.ts (1 hunks)
💤 Files with no reviewable changes (1)
  • apps/webapp/app/routes/admin.api.v1.marqs.ts
🚧 Files skipped from review as they are similar to previous changes (8)
  • apps/webapp/app/v3/models/workerDeployment.server.ts
  • apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
  • apps/webapp/app/v3/marqs/v2.server.ts
  • apps/webapp/app/v3/services/triggerTask.server.ts
  • apps/webapp/app/services/apiAuth.server.ts
  • apps/webapp/app/v3/services/createBackgroundWorker.server.ts
  • apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
  • apps/webapp/app/env.server.ts
👮 Files not reviewed due to content moderation or server errors (5)
  • apps/webapp/app/v3/services/finalizeTaskRun.server.ts
  • apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
  • packages/core/src/v3/schemas/messages.ts
  • apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts
  • apps/webapp/app/v3/marqs/index.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/marqs/index.server.ts

[error] 450-450: Unnecessary continue statement

Unsafe fix: Delete the unnecessary continue statement

(lint/correctness/noUnnecessaryContinue)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (buildjet-8vcpu-ubuntu-2204 - npm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (11)
references/v3-catalog/src/trigger/queues.ts (2)

27-32: LGTM!

The implementation is clean and includes a sensible default for the wait duration.


34-42: LGTM!

The task is well-defined with a specific queue name and simple logging.

apps/webapp/app/v3/marqs/types.ts (2)

50-50: LGTM!

The method signature is clear and consistent with other key generation methods in the interface.


69-76: Add JSDoc documentation for new methods.

The new methods lack documentation explaining their purpose and parameters.

+  /**
+   * Distributes queues based on their scores and capacities
+   * @param queues Array of queues with their associated scores and capacities
+   * @returns Array of queue names in their distributed order
+   */
   distributeQueues(queues: Array<QueueWithScores>): Array<string>;

+  /**
+   * Calculates the next range of queues to process
+   * @param parentQueue The parent queue that holds the candidate queues
+   * @param consumerId The consumer requesting the next range
+   * @param currentRange Current processing range
+   * @param queueSize Total size of the queue
+   * @returns Next range to process
+   */
   moveToNextRange(
     parentQueue: string,
     consumerId: string,
     currentRange: QueueRange,
     queueSize: number
   ): QueueRange;
apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts (2)

29-49: LGTM!

The implementation handles range progression and reset correctly with clear comments explaining the logic.


133-149: LGTM!

The no-op implementation correctly provides default values and empty results.

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts (5)

12-16: LGTM! New types enhance message handling capabilities.

The addition of AckCallbackResult and QueueOptions types improves type safety for queue operations.


132-139: Enhanced error tracking with new outcome field.

The addition of the outcome field in DoWorkInternalResult improves error tracking and debugging capabilities by categorizing results into specific outcomes.


1416-1420: LGTM! Efficient batch processing of completed attempts.

The chunking of completed attempt IDs improves performance by processing them in batches.


1496-1512: LGTM! Well-structured error handling classes.

The new error classes provide clear error categorization and improve error handling capabilities.


750-755: Verify queue resolution edge cases.

The new queue resolution logic using findQueueInEnvironment could potentially return undefined. Consider adding retry logic or graceful degradation for transient failures.

✅ Verification successful

Existing implementation already handles queue resolution edge cases appropriately

The code already implements proper error handling for queue resolution failures:

  • Explicit null check with if (!queue)
  • Debug logging of the failure
  • Graceful degradation by nacking the message for retry

This pattern is consistently implemented across all usages of findQueueInEnvironment in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for potential queue resolution failures
rg -A 5 "findQueueInEnvironment.*undefined"

Length of output: 45


Script:

#!/bin/bash
# Find findQueueInEnvironment implementation
ast-grep --pattern 'function findQueueInEnvironment($_) {
  $$$
}'

# Check how queue is used after resolution
rg -B 2 -A 10 "const queue = await findQueueInEnvironment"

# Look for error handling patterns around queue resolution
rg -B 5 -A 5 "try.*findQueueInEnvironment"

Length of output: 5370

@ericallam ericallam merged commit f96bf72 into main Jan 21, 2025
13 checks passed
@ericallam ericallam deleted the engine-v1-fixes-improvements branch January 21, 2025 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant