-
-
Couldn't load subscription status.
- Fork 870
Run engine: Using root queue timestamp to prioritize completing runs #1818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
WalkthroughThis pull request refactors several batch and task triggering services. The main change replaces the Changes
Sequence Diagram(s)Batch Processing FlowsequenceDiagram
participant Client as API Client
participant API as Batch API Endpoint
participant BatchService as RunEngineBatchTriggerService
participant Worker as Worker Server
Client->>API: Sends batch task request
API->>BatchService: Instantiate and initiate batch processing
Worker->>BatchService: Process batch task run (using updated job key)
BatchService-->>Worker: Return batch processing result
Triggered Task FlowsequenceDiagram
participant Client as API Client
participant API as Trigger Endpoint
participant TriggerService as RunEngineTriggerTaskService
participant Engine as RunEngine
participant Queue as Job Queue
Client->>API: Request task trigger
API->>TriggerService: Instantiate service and call trigger
TriggerService->>Engine: Call trigger method with queueTimestamp
Engine->>Engine: Determine effective timestamp (from parameter or default)
Engine->>Queue: Enqueue task run with new timestamp logic
Queue-->>Engine: Acknowledge task run enqueued
Engine-->>TriggerService: Return task run details
TriggerService-->>API: Respond with execution status
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
apps/webapp/app/runEngine/services/batchTrigger.server.ts (1)
646-649: Consider updating the jobKey to match the new queue name.The queue name is
"runengine.processBatchTaskRun", but the jobKey retainsBatchTriggerV3Service.process:.... Renaming it to something likeRunEngineBatchTriggerService.process:...would help maintain naming consistency.- jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`, + jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,internal-packages/run-engine/src/engine/tests/priority.test.ts (1)
13-26: Refactor time-based logic to minimize test flakinessThese test lines rely on real-time delays and sleeps. While it appears consistent overall, consider using time mocking or a faster approach to reduce potential flakiness on slower or loaded CI environments.
internal-packages/run-engine/src/engine/index.ts (1)
337-338: NewqueueTimestamppropertyBy introducing the
queueTimestampparameter, you’ve improved scheduling flexibility. However, ifpriorityMsandqueueTimestampare both supplied, ensure that the system’s ordering logic is clear to avoid confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
apps/webapp/app/routes/api.v2.tasks.batch.ts(2 hunks)apps/webapp/app/runEngine/services/batchTrigger.server.ts(3 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts(1 hunks)apps/webapp/app/services/worker.server.ts(3 hunks)apps/webapp/app/v3/services/triggerTask.server.ts(2 hunks)internal-packages/run-engine/src/engine/index.ts(2 hunks)internal-packages/run-engine/src/engine/systems/checkpointSystem.ts(0 hunks)internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts(0 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts(1 hunks)internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts(0 hunks)internal-packages/run-engine/src/engine/systems/waitpointSystem.ts(0 hunks)internal-packages/run-engine/src/engine/tests/priority.test.ts(1 hunks)internal-packages/run-engine/src/engine/types.ts(1 hunks)references/test-tasks/src/trigger/helpers.ts(1 hunks)
💤 Files with no reviewable changes (4)
- internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts
- internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
- internal-packages/run-engine/src/engine/systems/checkpointSystem.ts
- internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts
🧰 Additional context used
🧬 Code Definitions (2)
apps/webapp/app/routes/api.v2.tasks.batch.ts (1)
apps/webapp/app/runEngine/services/batchTrigger.server.ts (1)
RunEngineBatchTriggerService(52-683)
apps/webapp/app/v3/services/triggerTask.server.ts (1)
apps/webapp/app/runEngine/services/triggerTask.server.ts (1)
RunEngineTriggerTaskService(43-567)
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (28)
internal-packages/run-engine/src/engine/types.ts (1)
90-90: Added newqueueTimestampparameter to support run prioritizationThe addition of an optional
queueTimestampparameter to theTriggerParamstype aligns with the PR objective of propagating the original timestamp of the root run to all descendant runs. This will allow the system to prioritize run completion based on this timestamp.references/test-tasks/src/trigger/helpers.ts (1)
79-79: Function parameter structure improved for better readabilityThe
handleErrorfunction signature has been updated to use object destructuring instead of separate parameters. This improves readability and makes the code more maintainable, especially when adding more parameters in the future.apps/webapp/app/routes/api.v2.tasks.batch.ts (1)
15-15:❓ Verification inconclusive
Migrating to
RunEngineBatchTriggerServicefor better run prioritizationThis change replaces the old
BatchTriggerV4Servicewith the newRunEngineBatchTriggerService, which will use the newly addedqueueTimestampparameter to properly prioritize runs. This migration is a critical part of implementing the PR's objective of using root queue timestamp for run prioritization.Also applies to: 77-77
🏁 Script executed:
#!/bin/bash # Check if there are any remaining references to BatchTriggerV4Service that might need updating rg "BatchTriggerV4Service" --type tsLength of output: 36
Action Required: Verify Complete Migration to RunEngineBatchTriggerService
The updated code now imports and uses
RunEngineBatchTriggerService(with the newqueueTimestampparameter) in place ofBatchTriggerV4Service. An automated search forBatchTriggerV4Serviceproduced no results; however, this outcome is slightly inconclusive. Please manually verify that all legacy references have been removed, especially in the primary context (line 15) and the related area (line 77).internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1)
82-82: EnsurepriorityMsis non-negative or handled gracefully.Subtracting
priorityMsfrom(run.queueTimestamp ?? run.createdAt).getTime()is valid to prioritize runs. However, ifpriorityMswere uninitialized or unexpectedly large, the resulting timestamp might become negative or otherwise skewed. Ensure thatpriorityMsis guaranteed a safe range or defaulted to 0 where appropriate.apps/webapp/app/runEngine/services/batchTrigger.server.ts (2)
18-21: Imports look consistent with the new RunEngine structure.The updated imports align with the refactoring to RunEngine-based services.
52-52:✅ Verification successful
Class renaming aligns with the new naming scheme.
Renaming from a previous “BatchTriggerV4Service” to
RunEngineBatchTriggerServiceis clear and consistent with the run engine approach. To ensure no stale references exist, please verify any references toBatchTriggerV4Serviceare fully removed.
🏁 Script executed:
#!/bin/bash ast-grep --pattern $'class BatchTriggerV4Service { $$$ }'Length of output: 60
I'll run an additional verification using a ripgrep search to double‐check that there are no lingering references to
BatchTriggerV4Servicein the repository:
🏁 Script executed:
#!/bin/bash rg "BatchTriggerV4Service" .Length of output: 28
Review Approved: Class Renaming Confirmed
The verification using bothast-grepandripgrepdid not detect any lingering references toBatchTriggerV4Service. This confirms that the renaming toRunEngineBatchTriggerServicealigns with the new naming scheme and that stale references have been fully removed.apps/webapp/app/services/worker.server.ts (3)
32-34: New imports for RunEngine batch processing.Using the renamed
BatchProcessingOptionsandRunEngineBatchTriggerServiceensures a clear separation from older v3 services.
102-102: New queue configuration for RunEngine batch processing.Declaring
"runengine.processBatchTaskRun": RunEngineBatchProcessingOptionsin the worker catalog properly integrates the run engine batch flow into the existing queue system.
348-348: Instantiate new RunEngine-based batch trigger service.Creating the
RunEngineBatchTriggerServicewithpayload.strategyhere is consistent with the architectural shift away from the older batch service.apps/webapp/app/runEngine/services/triggerTask.server.ts (15)
1-42: Imports and preliminary constants.All newly introduced imports and constants align with the run-engine approach.
43-56: Well-definedRunEngineTriggerTaskServiceconstructor andcallsignature.The class extends
WithRunEngine, and the method signature is descriptive with environment, options, and an optionalattempt. No immediate issues.
57-79: Max attempts and delay logic.The
attemptcheck againstMAX_ATTEMPTSplus theparseDelayusage are a solid approach for fail-fast and delaying runs when requested. This logic looks good.
80-179: Idempotency key handling and existing run checks.Robust logic for detecting existing runs, handling expired idempotency keys, and blocking the parent run if needed. The approach is coherent and ensures no duplicate runs under the same key.
181-206: Entitlement and queue size checks.Verifying entitlement for non-development environments and enforcing queue size limits through
guardQueueSizeLimitsForEnvhelps keep resource usage in check.
208-216: Limiting the number of tags per run.Ensuring no more than
MAX_TAGS_PER_RUNfosters data cleanliness and prevents excessive tagging overhead.
218-248: Payload packet creation and parent run validation.Creating a
payloadPacketand rejecting triggers for terminal parent runs is a good approach to maintain safe run lineages.
251-411: Main event trace block with auto-increment logic.The event-based tracing with a locked counter ensures unique run numbering. The usage of
traceEventprovides good observability, and storing tags in the DB is well-executed.
414-448: Error handling with retries and unique constraint detection.Catching
RunDuplicateIdempotencyKeyErrorto retry is practical, and handling Prisma unique constraints for one-time use tokens prevents repeated usage.
452-471:#getMasterQueueForEnvironmentmethod.Ensures a default worker group queue is found for production environments. Throwing a
ServiceValidationErrorif absent is appropriate to fail fast.
473-521:#getQueueNamemethod.Looks up a queue name from the environment’s worker/task records if available; otherwise defaults to
task/<slug>. Balances flexibility with a fallback.
523-554:#handlePayloadPacketfor offloading large payloads.Nicely leverages
packetRequiresOffloadingto store oversized payloads in object storage. Minimizes DB overhead and keeps runs efficient.
556-566:#createPayloadPacketfor type-based payload handling.This method correctly handles JSON vs. string payload types, defaulting to an empty packet if data is missing. Straightforward and robust.
569-575:getMaximumSizeForEnvironment.This utility returns a consistent maximum queue size for dev vs. deployed environments. Straightforward approach.
577-596:guardQueueSizeLimitsForEnvensures environment queue constraints.The function calculates projected queue size properly, returning a helpful object with limit info. Good usage in
call()for pre-trigger checks.internal-packages/run-engine/src/engine/tests/priority.test.ts (3)
58-100: Validate the negative priority flowThe test sequence correctly verifies that negative priorities cause a delayed dequeue. Just ensure that in environments under heavier load, a 2-second delay is always sufficient to demonstrate the intended behavior.
106-194: Thorough coverage of queueTimestamp orderingThis test sequence effectively checks run ordering by timestamp. It offers solid coverage for the newly introduced
queueTimestamp. Great job validating multiple distinct timestamps.
200-241: Validate combined usage ofpriorityMsandqueueTimestampThe updated
triggerRunsfunction now supports runs with bothpriorityMsandqueueTimestamp. Ensure edge cases are tested if a single run object specifies both fields, and confirm the desired ordering logic is preserved in that scenario.Do you have any test that exercises both fields together in a single run?
internal-packages/run-engine/src/engine/index.ts (1)
418-418: Defaulting todelayUntil ?? new Date()This fallback logic is sensible. Just verify that when a negative or small
priorityMsis used, we don’t unintentionally overshadow it due to a laterqueueTimestampfallback.
| import { WithRunEngine } from "./baseService.server"; | ||
| import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; | ||
| import { TriggerTaskServiceV2 } from "./triggerTaskV2.server"; | ||
| import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Updated task service to support queue timestamp prioritization
The code has been updated to use the new RunEngineTriggerTaskService instead of the previous TriggerTaskServiceV2, incorporating the system for using queue timestamps to prioritize runs. The service is now initialized with both Prisma and the run engine instance, providing better integration with the run engine.
Also applies to: 81-84
🏁 Script executed:
#!/bin/bash
# Check to ensure there are no references to the old TriggerTaskServiceV2 that might need updating
rg "TriggerTaskServiceV2" --type tsLength of output: 145
Action Required: Resolve Remaining Reference to Deprecated Service
- The service update in
apps/webapp/app/v3/services/triggerTask.server.tscorrectly imports and usesRunEngineTriggerTaskServicefor queue timestamp prioritization. - However, the search output shows that
TriggerTaskServiceV2still exists inapps/webapp/app/v3/services/triggerTaskV2.server.ts. Please verify whether this file should be updated or removed. - Also, ensure that any usages of
TriggerTaskServiceV2in the referenced code (e.g., lines 81-84) are also updated to the new service implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
apps/webapp/app/models/taskRunTag.server.ts (1)
33-68: Consider using Promise.all for better performanceThe implementation of
createTagsis correct, but it processes tags sequentially. For better performance, especially with multiple tags, consider usingPromise.allto create tags in parallel.- const tagRecords: TagRecord[] = []; - for (const tag of tagsArray) { - const tagRecord = await createTag( - { - tag, - projectId, - }, - prismaClient - ); - if (tagRecord) { - tagRecords.push({ id: tagRecord.id, name: tagRecord.name }); - } - } + const tagRecordsPromises = tagsArray.map(tag => + createTag( + { + tag, + projectId, + }, + prismaClient + ) + ); + + const completedTagRecords = await Promise.all(tagRecordsPromises); + const tagRecords: TagRecord[] = completedTagRecords + .filter(Boolean) + .map(tagRecord => ({ id: tagRecord.id, name: tagRecord.name }));references/hello-world/src/trigger/tags.ts (1)
1-1: Remove unused importThe
waitfunction is imported but not used in the code. Consider removing it to keep imports clean.-import { logger, task, wait } from "@trigger.dev/sdk"; +import { logger, task } from "@trigger.dev/sdk";
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
apps/webapp/app/models/taskRunTag.server.ts(2 hunks)apps/webapp/app/runEngine/services/triggerTask.server.ts(4 hunks)references/hello-world/src/trigger/tags.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/webapp/app/runEngine/services/triggerTask.server.ts
🧰 Additional context used
🧬 Code Definitions (1)
references/hello-world/src/trigger/tags.ts (1)
apps/webapp/app/runEngine/services/triggerTask.server.ts (2)
payload(516-547)payload(549-559)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
🔇 Additional comments (6)
apps/webapp/app/models/taskRunTag.server.ts (4)
3-3: Good addition of Prisma transaction supportAdding
PrismaClientOrTransactionimport is a good addition that will enable transactional tag creation, which is important for maintaining data consistency.
7-10: LGTM: Function signature improvementModifying the
createTagfunction to accept a custom Prisma client or transaction improves flexibility and allows for atomic operations when creating tags within a larger transaction.
12-12: Good update to use the provided Prisma clientUsing the passed
prismaClientparameter correctly implements the transaction support.
28-31: Clean type definitionThe
TagRecordtype is well-defined and clearly represents the structure of tag records returned by the functions.references/hello-world/src/trigger/tags.ts (2)
3-15: LGTM: Well-defined task for testing tagsThe
tagsTestertask is properly defined and demonstrates how to trigger a child task with different tag configurations.
17-22: LGTM: Child task implementationThe
tagsChildTaskis well-defined and correctly logs the payload.
This PR uses the
queueTimestampfiled on TaskRun to propagate the original timestamp of the root run across all descendent runs. This queueTimestamp is then used whenever enqueuing a run to executing in the RunQueue, for all initial, resumes, and retry enqueuings. This means that run hierarchies will complete quicker and not be blocked by new runs.Summary by CodeRabbit
New Features
Refactor
Tests