-
-
Notifications
You must be signed in to change notification settings - Fork 699
improve batch completion system for run engine v1 #1656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Warning There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure. 🔧 ESLint
apps/webapp/app/env.server.tsOops! Something went wrong! :( ESLint: 8.45.0 ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct. The config "custom" was referenced from the config file in "/.eslintrc.js". If you still have problems, please stop by https://eslint.org/chat/help to chat with the team. apps/webapp/app/routes/api.v1.tasks.batch.tsOops! Something went wrong! :( ESLint: 8.45.0 ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct. The config "custom" was referenced from the config file in "/.eslintrc.js". If you still have problems, please stop by https://eslint.org/chat/help to chat with the team. internal-packages/database/src/transaction.tsOops! Something went wrong! :( ESLint: 8.45.0 ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct. The config "custom" was referenced from the config file in "/.eslintrc.js". If you still have problems, please stop by https://eslint.org/chat/help to chat with the team.
WalkthroughThe changes update multiple services and routes to improve clarity and consistency by renaming variables from "run" to "result." Upgrades include transitioning from V2 to V3 batch triggering services, enhanced error handling, and additional response fields such as "isCached." Several services now incorporate version-specific logic for batch processing—including sealing, dynamic waits, and dependency management—while database migrations extend the schema to track batch progress and completion. New tasks in the catalog facilitate sequential triggering and checkpoint testing. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant TS as TriggerTaskService
participant BT3 as BatchTriggerV3Service
participant DB as Database
participant RBR as ResumeBatchRunService
participant FTR as FinalizeTaskRunService
User->>TS: Initiate batch trigger request
TS->>BT3: Call service with parameters
BT3->>DB: Query/insert batch run data (using sealing & idempotency)
DB-->>BT3: Return run data
BT3-->>TS: Return { run, isCached } result
TS-->>User: Return task run details
%% Optionally, for resuming batch runs
User->>RBR: Request resume of batch run
RBR->>DB: Retrieve batch run data
alt Batch version is "v3"
RBR->>FTR: Invoke completeBatchTaskRunItemV3
else Legacy batch
RBR->>DB: Update run item status as COMPLETED
end
RBR-->>User: Return resume confirmation
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (13)
apps/webapp/app/v3/services/resumeBatchRun.server.ts (3)
55-79
: Inconsistent log message within v3 check
Line 57 logs “Batch run is already completed”, yet the condition isbatchRun.status !== "COMPLETED"
. Consider clarifying or rewording for correctness.- logger.debug("ResumeBatchRunService: Batch run is already completed", { + logger.debug("ResumeBatchRunService: Batch run is not completed, cannot resume", {
242-242
: Consistent call for resumed state
Same concurrency note as above—consider verifying whether#setBatchToResumedOnce
can be called twice concurrently.
340-360
: Helper function for retrieving batch
Centralizing the fetch logic is tidy. This function may warrant test coverage for edge cases (e.g., missing batch).apps/webapp/app/v3/services/triggerTask.server.ts (1)
624-640
: Recursive retry for idempotency
This approach is effective for handling race conditions. Ensure that your logs or metrics can surface repeated collisions.apps/webapp/app/v3/services/batchTriggerV2.server.ts (3)
87-87
: Consider an index for faster lookups.
Retrieving by(runtimeEnvironmentId, idempotencyKey)
may benefit from an index on those fields for performance.Also applies to: 89-90
398-403
: Recommend exponential backoff on errors.
Currently, a single error logs and triggers a requeue. Consider exponential or limited retries.
745-745
: Preserve original stack using “cause.”
Wrapping errors in a newError(String(error))
loses stack details.Use the
cause
option if runtime supports it:- error: error instanceof Error ? error : new Error(String(error)) + error: error instanceof Error ? error : new Error(String(error), { cause: error })apps/webapp/app/v3/services/testTask.server.ts (1)
18-30
: Consider scoping the declaration in a block inside the switch.
Static analysis suggests wrappingresult
in a block to limit accessibility to this switch clause. Otherwise, the variable is accessible in subsequent cases.case "STANDARD": { + { const result = await triggerTaskService.call( ... ); return result?.run; + } }🧰 Tools
🪛 Biome (1.9.4)
[error] 18-28: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
apps/webapp/app/v3/services/batchTriggerTask.server.ts (1)
126-135
: Consider adding transaction for batch item creation.The batch item creation and status update should be wrapped in a transaction to ensure atomicity.
if (result) { + await $transaction(this._prisma, async (tx) => { await this._prisma.batchTaskRunItem.create({ data: { batchTaskRunId: batch.id, taskRunId: result.run.id, status: batchTaskRunItemStatusForRunStatus(result.run.status), }, }); runs.push(result.run.friendlyId); + }); }references/v3-catalog/src/trigger/checkpoints.ts (1)
20-21
: Consider documenting the reason for commenting out code.Instead of keeping commented code, either remove it if it's no longer needed or add a comment explaining why it's temporarily disabled.
apps/webapp/app/v3/services/createCheckpoint.server.ts (1)
97-102
: Consider removing or documenting test code.The commented sleep code appears to be for testing slow checkpoints. If this is needed for testing, consider moving it to a test helper function or removing it if no longer required.
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (1)
230-244
: Plan to remove deprecated batch completion logic.This code block is marked as deprecated and only used for non-v3 batch versions. Consider creating a timeline for removing this code once all batches are migrated to v3.
internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql (1)
1-7
: Review BatchTaskRun Alterations in SQL Migration.
The migration adds five new columns to the “BatchTaskRun” table:
• completedAt (TIMESTAMP(3))
• completedCount (INTEGER NOT NULL DEFAULT 0)
• expectedCount (INTEGER NOT NULL DEFAULT 0)
• sealed (BOOLEAN NOT NULL DEFAULT false)
• sealedAt (TIMESTAMP(3))These changes appear to align with the new tracking requirements for batch completion. Please ensure that these definitions match the intended business logic—especially that the default values for counters (completedCount and expectedCount) and the boolean “sealed” flag correctly reflect a new, open batch state. Also, consider if any of these fields will be used in frequent query filters. If so, adding indexes may improve performance.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
(2 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(1 hunks)apps/webapp/app/v3/services/batchTriggerTask.server.ts
(2 hunks)apps/webapp/app/v3/services/batchTriggerV2.server.ts
(16 hunks)apps/webapp/app/v3/services/createCheckpoint.server.ts
(3 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(3 hunks)apps/webapp/app/v3/services/replayTaskRun.server.ts
(2 hunks)apps/webapp/app/v3/services/resumeBatchRun.server.ts
(8 hunks)apps/webapp/app/v3/services/resumeDependentParents.server.ts
(3 hunks)apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts
(3 hunks)apps/webapp/app/v3/services/testTask.server.ts
(3 hunks)apps/webapp/app/v3/services/triggerScheduledTask.server.ts
(2 hunks)apps/webapp/app/v3/services/triggerTask.server.ts
(6 hunks)internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/schema.prisma
(2 hunks)references/v3-catalog/src/trigger/batch.ts
(2 hunks)references/v3-catalog/src/trigger/checkpoints.ts
(1 hunks)references/v3-catalog/src/trigger/queues.ts
(1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/services/testTask.server.ts
[error] 18-28: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (69)
apps/webapp/app/v3/services/resumeDependentParents.server.ts (2)
8-8
: Import usage is consistent
Good addition to support v3 logic.
49-49
: Aligned data retrieval for batch version
IncludingbatchVersion
in the selection aligns well with the new v3 path.apps/webapp/app/v3/services/resumeBatchRun.server.ts (7)
6-6
: Importing BatchTaskRun
This import ensures typed references to batch runs. No concerns here.
10-11
: Type helps keep the code self-documenting
DefiningRetrieveBatchRunResult
clarifies method returns.
48-53
: Conditional flow for batch versions
Splitting logic bybatchVersion
is clear. Good maintainability for future expansions.
138-139
: Single call to handle dependent attempt
This direct call fosters better separation of legacy vs. v3 logic. No issues.
184-184
: Set batch to resumed once
Check for concurrency conflicts if multiple processes may update the same batch simultaneously. Otherwise looks fine.
277-298
: Differentiating v3 from legacy
This code correctly updatesresumedAt
for v3 while preserving the legacy approach of marking runs as “COMPLETED”. Looks solid.
320-325
: Flexibility with jobKey
AllowingskipJobKey
is a good addition to handle deduplication or concurrency. No concerns.Also applies to: 334-334
apps/webapp/app/v3/services/triggerTask.server.ts (8)
29-29
: Expanded database imports
ImportingPrisma
andTaskRun
is standard. No issues.
52-55
: Enhanced return type
IncludingisCached
is helpful for upstream optimizations or client messaging.
57-57
: Limiting attempts
DefiningMAX_ATTEMPTS
is a clear safeguard to avoid infinite loops.
64-66
: Addedattempt
parameter
Makes the recursion approach explicit. Good for controlling re-tries.
69-75
: Guarding against excessive recursion
Throwing an error after the final attempt ensures we don’t overload the system.
92-92
:findFirst
usage
Switching tofindFirst
forexistingRun
is flexible; ensure it doesn’t return an unintended record if there are multiple matches.
119-119
: Returns existing run
Returning{ run, isCached: true }
is an elegant solution for idempotency.
588-588
: New run property
Returning{ run, isCached: false }
aligns with the newly created run path.apps/webapp/app/v3/services/batchTriggerV2.server.ts (27)
25-25
: No issues with the new import.
53-59
: Type definition appears clear and consistent.
TheRunItemData
type provides clarity on the shape of each run item.
82-84
: Good validation check for non-empty item array.
Throwing aServiceValidationError
ensures the caller supplies at least one batch item.
165-165
: Logic to prepare run data is invoked safely.
No immediate concerns here.
168-168
: Clear determination of new run count.
Filtering for uncached runs is straightforward.
363-363
: Argument signature looks clear.
No issues spotted.
371-371
: Inline processing threshold.
The conditional approach to inline vs. async batch processing is reasonable.
379-379
: Accurate reference to runs.length.
No problems identified.
384-384
: Upgrading to “v3.”
Good to see consistent version usage.
405-414
: Requeue is handled cleanly.
The flow is clear: upon error, enqueue the given range for reprocessing.
416-416
: Returning batch object post-error analysis.
This maintains consistency in the calling flow.
419-423
: Ensure partial runs are truly complete before sealing.
Sealing the batch might mask runs still in progress if additional concurrency is possible.
425-425
: Returning the batch.
End-of-processing return is consistent with the surrounding flow.
440-440
: Repeated specification of “v3.”
Assuming this is intentional for clarity.
447-456
: Sequential processing queue approach
Implementation is straightforward, no issues found.
468-474
: Parallel jobs logic
StoringprocessingJobsExpectedCount
is a clean approach to track partial completions.
614-636
: Handling errors by requeuing
Consider partial-complete scenarios: a subset might have succeeded, but the entire block re-queues. Validate that reprocessing won't create duplicate runs or data conflicts.
638-639
: Early return
Skipping further processing if payload is unavailable. No further concerns.
641-657
: Completing batch in “sequential” mode
Sealing after the final chunk is clean and consistent.
658-658
: Requeue next chunk
No further notes.
673-673
: Parallel case
Implementation aligns with incremental counting.
713-713
: Method signature & return structure
Returning{ workingIndex; error? }
is concise.
751-751
: Returning final working index
No issues found.
769-769
: Invoking TriggerTaskService
Implementation looks sensible and consistent.
790-790
: Error if result is missing
Appropriate defensive check.
794-795
: Ensuring fresh run logic
Skipping the batchTaskRunItem creation for cached runs is consistent.
852-913
: Potential concurrent completion increments
When updating from PENDING to COMPLETED, multiple parallel updates could lead to repeated increments ofcompletedCount
. Confirm that concurrency is limited or handled.apps/webapp/app/v3/services/testTask.server.ts (1)
Line range hint
43-53
: Analogous adjustment in “SCHEDULED” branch
Likewise, consider block scoping for consistency. No logic issues found otherwise.references/v3-catalog/src/trigger/queues.ts (4)
1-1
: New import usage
Importingauth
from@trigger.dev/sdk/v3
is fine.
5-16
: Extended signature with execution context
Introducing{ ctx }
is consistent with other task patterns.
17-26
: Review logging of public tokens
Be cautious about logging sensitive tokens, even for debugging. Confirm these tokens are safe to share.
31-31
: Reusing “ctx.run.id” as idempotencyKey
If invoked multiple times, collisions are unlikely unless the same run is triggered. Validate that is the desired effect.Also applies to: 37-37, 43-43, 49-49, 55-55
apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (3)
1-6
: LGTM! Import statements are properly organized.The imports are well-structured, with related types grouped together.
Also applies to: 12-12
21-24
: LGTM! Query includes necessary related data.The query properly includes
batchTaskRun
data which is required for the enhanced batch version handling.
66-87
: Verify error handling in the batch completion logic.The method now handles different batch versions appropriately, but there's no explicit error handling in either branch. Consider adding try-catch blocks to handle potential errors during batch completion.
async #resumeBatchItem( batchItem: BatchTaskRunItem, batchTaskRun: BatchTaskRun, taskAttempt: TaskRunAttempt ) { + try { if (batchTaskRun.batchVersion === "v3") { await completeBatchTaskRunItemV3(batchItem.id, batchTaskRun.id, this._prisma, true); } else { await $transaction(this._prisma, async (tx) => { await tx.batchTaskRunItem.update({ where: { id: batchItem.id, }, data: { status: "COMPLETED", taskRunAttemptId: taskAttempt.id, }, }); await ResumeBatchRunService.enqueue(batchItem.batchTaskRunId, false, tx); }); } + } catch (error) { + logger.error("Failed to resume batch item", { + batchItemId: batchItem.id, + error, + }); + throw error; + } }apps/webapp/app/v3/services/replayTaskRun.server.ts (2)
Line range hint
90-116
: LGTM! Service call result is properly stored.The result from
triggerTaskService.call
is properly stored in a variable, improving code readability and reusability.
117-117
: LGTM! Optional chaining is used appropriately.The return statement correctly uses optional chaining to safely access the run property.
apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts (2)
Line range hint
77-85
: LGTM! Service call result is properly handled.The service call result is properly stored and checked, maintaining consistent error handling.
98-99
: Verify API response schema documentation.The response now includes an
isCached
field. Ensure this change is documented in the API schema/documentation.apps/webapp/app/v3/services/batchTriggerTask.server.ts (1)
Line range hint
107-125
: LGTM! Batch trigger options are properly configured.The service call includes appropriate batch-related options and maintains proper parent-child relationships.
apps/webapp/app/routes/api.v1.tasks.batch.ts (1)
121-121
: LGTM! Good security practice.Using a generic error message instead of exposing internal error details helps prevent potential security vulnerabilities.
references/v3-catalog/src/trigger/checkpoints.ts (1)
7-12
: LGTM! Good test coverage addition.The new scheduled task helps test the checkpoint batch resume functionality with a controlled count of 1.
apps/webapp/app/v3/services/triggerScheduledTask.server.ts (1)
Line range hint
134-150
: LGTM! Improved variable naming.The renaming from 'run' to 'result' enhances code clarity and maintains consistency with the naming pattern across the codebase.
apps/webapp/app/v3/services/createCheckpoint.server.ts (1)
Line range hint
254-289
: LGTM! Good version handling.The addition of batchVersion field and its usage in ResumeBatchRunService.enqueue ensures proper handling of v3 batch runs.
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (2)
21-22
: LGTM!The new imports are correctly defined and are used in the updated batch finalization logic.
227-229
: LGTM!The batch version check is correctly implemented to determine which batch completion logic to use.
references/v3-catalog/src/trigger/batch.ts (2)
303-303
: LGTM!The
largeBatchSize
default value has been updated to 20, which aligns with the test requirements.
691-725
: LGTM!The new
batchTriggerSequentiallyTask
is well-implemented with:
- Flexible parameters for controlling batch behavior
- Proper use of existing batch trigger functions
- Clear retry configuration
internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql (1)
1-2
: LGTM!The migration correctly adds the
resumedAt
timestamp column to theBatchTaskRun
table.internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql (1)
1-3
: LGTM!The migration correctly adds two integer columns with appropriate defaults to track processing jobs in the
BatchTaskRun
table.internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql (1)
8-9
: Review BatchTaskRunItem Alteration in SQL Migration.
The migration adds the completedAt column (TIMESTAMP(3)) to the “BatchTaskRunItem” table, which is useful for tracking the completion timestamp of individual batch items. Verify that the application code correctly updates and uses this new field.internal-packages/database/prisma/schema.prisma (1)
2169-2183
: Review New BatchTaskRun Fields in Prisma Schema.
This section of the Prisma schema introduces several new fields for the BatchTaskRun model to support v3 batch processing:
- sealed: A Boolean to indicate that no more items can be added.
- sealedAt: A DateTime to record when the batch was sealed.
- expectedCount: An Integer to track the anticipated number of items.
- completedCount: An Integer to track how many items have been completed.
- completedAt: A DateTime to mark when the batch completes.
- resumedAt: A DateTime to capture when a batch is resumed.
- processingJobsCount and processingJobsExpectedCount: Integers for monitoring asynchronous job triggering and processing under the “parallel” processing strategy.
The inline comments are clear and describe the intent behind each field. Ensure that your application logic and subsequent migrations are updated to account for the additional fields (such as resumedAt and the processing jobs counters) even if they are not covered by the current migration file. Additionally, consider whether any of these fields should be indexed for performance improvements if they will be used in frequent queries.
@trigger.dev/build
@trigger.dev/react-hooks
@trigger.dev/rsc
trigger.dev
@trigger.dev/sdk
@trigger.dev/core
commit: |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
apps/webapp/app/v3/services/batchTriggerV3.server.ts (3)
60-87
: Minor style fix for doc comments.
In the sentence “v3 BatchTaskRun's now must be ‘sealed,’” consider removing the apostrophe.Proposed diff:
- v3 BatchTaskRun's now must be "sealed" + v3 BatchTaskRuns now must be "sealed"
307-386
: Run data preparation logic looks solid.
Handling expired cached runs and generating new run IDs is well-structured. Consider chunking large queries if performance becomes a concern for very large batches.
426-444
: Graceful fallback to enqueue processing.
Requeueing on error is sensible to avoid blocking further processing, but consider logging failure if enqueue itself encounters issues.apps/webapp/app/routes/api.v1.tasks.batch.ts (1)
121-121
: Generic error response.
Returning"Something went wrong"
hides internal details, improving security but reducing traceability. Consider capturing more context in logs if needed.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
(3 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(3 hunks)apps/webapp/app/services/worker.server.ts
(2 hunks)apps/webapp/app/v3/services/batchTriggerV3.server.ts
(16 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(4 hunks)apps/webapp/app/v3/services/resumeDependentParents.server.ts
(3 hunks)apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts
(3 hunks)references/v3-catalog/src/trigger/batch.ts
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (27)
apps/webapp/app/v3/services/batchTriggerV3.server.ts (13)
25-25
: Import usage looks good.
No issues found with adding theResumeBatchRunService
import.
53-59
: Data structure is concise and clear.
DefiningRunItemData
clarifies what data is needed for processing each run.
109-112
: Validation check is correct.
Throwing an error when items are missing helps prevent invalid requests.
114-117
: Confirm use of findFirst vs. findUnique.
findFirst
may return the first matching record if there are multiple with the same idempotencyKey. Ensure only one record is expected.Would you like me to generate a script to scan for potential duplicates in the codebase?
192-192
: Sufficient approach for retrieving run data and counting new runs.
No immediate issues; logic is straightforward for preparing new runs.Also applies to: 195-195
390-390
: Well-typed parameter.
Addingruns: Array<RunItemData>
enhances clarity for subsequent processing.
398-398
: Threshold-based strategy is clear.
Inline vs. async processing based onASYNC_BATCH_PROCESS_SIZE_THRESHOLD
is a clean approach.
446-452
: Batch sealing logic is correct.
Marking the batch assealed
ensures subsequent completion logic can proceed safely.
641-666
: Error-handling flow re-queues unprocessed items.
This pattern prevents partial failures from stalling the entire batch, which is beneficial.
668-684
: Sequential strategy re-queue is well-structured.
Sealing the batch once the current index reachesrunCount
ensures no leftover items remain.Also applies to: 698-698
700-700
: Parallel strategy logic is clear.
IncrementingprocessingJobsCount
and sealing upon reachingprocessingJobsExpectedCount
aligns with concurrency best practices.Also applies to: 716-725
817-819
: Conditional creation of batch task run items is coherent.
IncrementingexpectedCount
only whenisCached
is false ensures batch metrics’ accuracy.Also applies to: 821-836
879-940
: Completion logic is comprehensive.
Transitioning item status from PENDING to COMPLETED and sealing the batch if conditions are met is robust. Be mindful of potential concurrency if multiple run items complete simultaneously, though transactions should mitigate race conditions.Would you like a script to check for parallel updates on the same BatchTaskRun ID to ensure no concurrency issues arise?
apps/webapp/app/routes/api.v1.tasks.batch.ts (2)
15-16
: Updated service import aligns with new version.
Switching toBatchTriggerV3Service
is consistent with the updated batch system.
88-88
: Service instantiation logic is appropriate.
Selecting a default strategy if none is provided is nicely handled with?? undefined
.apps/webapp/app/presenters/v3/BatchListPresenter.server.ts (2)
103-103
: IncludingcompletedAt
supports more accurate completion timestamps.
This field cleanly differentiates between partial completion and final completion times.
201-205
: Priority ofcompletedAt
forfinishedAt
is logical.
Falling back toupdatedAt
only when status is completed without acompletedAt
ensures integrity.apps/webapp/app/v3/services/resumeDependentParents.server.ts (3)
8-8
: LGTM!The import of
completeBatchTaskRunItemV3
aligns with the new v3 batch completion functionality.
49-49
: LGTM!Adding
batchVersion
to the selection enables version-specific batch handling.
247-288
: Add error handling for v3 completion path.The v3 completion path lacks error handling, which could lead to silent failures.
Apply this diff to add error handling:
if (dependency.dependentBatchRun!.batchVersion === "v3") { const batchTaskRunItem = await this._prisma.batchTaskRunItem.findFirst({ where: { batchTaskRunId: dependency.dependentBatchRun!.id, taskRunId: dependency.taskRunId, }, }); if (batchTaskRunItem) { - await completeBatchTaskRunItemV3( - batchTaskRunItem.id, - batchTaskRunItem.batchTaskRunId, - this._prisma, - true - ); + try { + await completeBatchTaskRunItemV3( + batchTaskRunItem.id, + batchTaskRunItem.batchTaskRunId, + this._prisma, + true + ); + } catch (err) { + logger.error("Failed to complete batchTaskRunItem v3", { error: err }); + throw err; // or handle gracefully + } }apps/webapp/app/v3/services/finalizeTaskRun.server.ts (2)
3-6
: LGTM!The new imports support the enhanced functionality for environment and queue management.
226-243
: LGTM!The conditional logic for v3 batch completion is well-structured and maintains backward compatibility.
references/v3-catalog/src/trigger/batch.ts (3)
303-303
: LGTM!Reducing the default
largeBatchSize
to 20 aligns with typical batch processing limits.
691-725
: LGTM!The
batchTriggerSequentiallyTask
implementation provides a clean interface for testing sequential vs concurrent batch processing.
796-832
: LGTM!The
batchTriggerIdempotencyKeyTest
provides comprehensive test coverage for idempotency key handling in batch operations.apps/webapp/app/services/worker.server.ts (2)
58-58
: LGTM!The import statement correctly updates to use the V3 batch trigger service.
736-736
: LGTM!The service instantiation is updated to use
BatchTriggerV3Service
, maintaining consistency with the V3 transition.
80a5c4d
to
ad4334c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (1)
71-73
: 🛠️ Refactor suggestionAdd error handling for V3 batch completion.
The V3 batch completion path lacks error handling which could lead to silent failures.
if (batchTaskRun.batchVersion === "v3") { - await completeBatchTaskRunItemV3(batchItem.id, batchTaskRun.id, this._prisma, true); + try { + await completeBatchTaskRunItemV3(batchItem.id, batchTaskRun.id, this._prisma, true); + } catch (error) { + logger.error("Failed to complete batch task run item V3", { error, batchItemId: batchItem.id }); + throw error; + } }apps/webapp/app/v3/services/resumeDependentParents.server.ts (1)
247-271
: 🛠️ Refactor suggestionAdd error handling for V3 batch completion.
The V3 batch completion path should include error handling to catch and log potential failures.
if (dependency.dependentBatchRun!.batchVersion === "v3") { const batchTaskRunItem = await this._prisma.batchTaskRunItem.findFirst({ where: { batchTaskRunId: dependency.dependentBatchRun!.id, taskRunId: dependency.taskRunId, }, }); if (batchTaskRunItem) { - await completeBatchTaskRunItemV3( - batchTaskRunItem.id, - batchTaskRunItem.batchTaskRunId, - this._prisma, - true - ); + try { + await completeBatchTaskRunItemV3( + batchTaskRunItem.id, + batchTaskRunItem.batchTaskRunId, + this._prisma, + true + ); + } catch (error) { + logger.error("Failed to complete batch task run item V3", { + error, + batchTaskRunItemId: batchTaskRunItem.id, + }); + throw error; + } }
🧹 Nitpick comments (5)
apps/webapp/app/v3/services/testTask.server.ts (1)
18-30
: Wrap the case in a block to prevent variable leakage.The changes look good, with proper optional chaining and consistent variable naming. However, to prevent the
result
variable from being accessible in other switch cases, wrap this case in a block.Apply this diff to fix the switch clause declaration:
case "STANDARD": + { const result = await triggerTaskService.call( data.taskIdentifier, authenticatedEnvironment, { payload: data.payload, options: { test: true, metadata: data.metadata, }, } ); return result?.run; + }🧰 Tools
🪛 Biome (1.9.4)
[error] 18-28: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
references/v3-catalog/src/trigger/queues.ts (1)
17-23
: Consider logging token creation failures.The auth token creation should include error handling to catch and log potential failures.
- const publicToken = await auth.createPublicToken({ - scopes: { - read: { - runs: true, - }, - }, - }); + try { + const publicToken = await auth.createPublicToken({ + scopes: { + read: { + runs: true, + }, + }, + }); + logger.debug("Public token created successfully", { publicToken }); + } catch (error) { + logger.error("Failed to create public token", { error }); + throw error; + }references/v3-catalog/src/trigger/checkpoints.ts (1)
20-21
: Remove commented code.Instead of keeping commented code, consider documenting the reason for removal in a comment or commit message.
- // await noop.batchTriggerAndWait(Array.from({ length: count }, (_, i) => ({}))); - // logger.info(`Successfully 2/2 resumed after ${count} runs`);apps/webapp/app/v3/services/batchTriggerV3.server.ts (2)
673-721
: Consider using transactions for atomic batch updates.While the sealing mechanism is well-implemented, consider wrapping the status and seal updates in a transaction to ensure atomicity:
-await this._prisma.batchTaskRun.update({ +await $transaction(this._prisma, async (tx) => { + await tx.batchTaskRun.update({ where: { id: batch.id }, data: { sealed: true, sealedAt: new Date() }, }); +});
879-940
: Add error handling for item not found.Consider adding explicit error handling when the item is not found or already completed:
if (updated.count === 0) { + logger.warn("BatchTaskRunItem not found or already completed", { + itemId, + batchTaskRunId, + }); return; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
(3 hunks)apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
(2 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(3 hunks)apps/webapp/app/services/worker.server.ts
(2 hunks)apps/webapp/app/v3/services/batchTriggerTask.server.ts
(2 hunks)apps/webapp/app/v3/services/batchTriggerV3.server.ts
(16 hunks)apps/webapp/app/v3/services/createCheckpoint.server.ts
(3 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(4 hunks)apps/webapp/app/v3/services/replayTaskRun.server.ts
(2 hunks)apps/webapp/app/v3/services/resumeBatchRun.server.ts
(8 hunks)apps/webapp/app/v3/services/resumeDependentParents.server.ts
(3 hunks)apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts
(3 hunks)apps/webapp/app/v3/services/testTask.server.ts
(3 hunks)apps/webapp/app/v3/services/triggerScheduledTask.server.ts
(2 hunks)apps/webapp/app/v3/services/triggerTask.server.ts
(6 hunks)internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql
(1 hunks)internal-packages/database/prisma/schema.prisma
(2 hunks)references/v3-catalog/src/trigger/batch.ts
(3 hunks)references/v3-catalog/src/trigger/checkpoints.ts
(1 hunks)references/v3-catalog/src/trigger/queues.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (12)
- internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql
- apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
- apps/webapp/app/v3/services/triggerScheduledTask.server.ts
- apps/webapp/app/v3/services/replayTaskRun.server.ts
- apps/webapp/app/services/worker.server.ts
- internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql
- apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
- apps/webapp/app/v3/services/createCheckpoint.server.ts
- internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql
- apps/webapp/app/routes/api.v1.tasks.batch.ts
- apps/webapp/app/v3/services/batchTriggerTask.server.ts
- apps/webapp/app/v3/services/resumeBatchRun.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
apps/webapp/app/v3/services/testTask.server.ts
[error] 18-28: Other switch clauses can erroneously access this declaration.
Wrap the declaration in a block to restrict its access to the switch clause.
The declaration is defined in this switch clause:
Unsafe fix: Wrap the declaration in a block.
(lint/correctness/noSwitchDeclarations)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (15)
apps/webapp/app/v3/services/testTask.server.ts (2)
43-53
: LGTM!The changes look good, with proper optional chaining and consistent variable naming. The case is already wrapped in a block, which prevents variable leakage.
55-57
: LGTM!The error handling is appropriate, and the switch statement is exhaustive with a clear error message for invalid trigger sources.
references/v3-catalog/src/trigger/queues.ts (1)
31-31
: LGTM! Consistent idempotency key usage.Using
ctx.run.id
as the idempotency key across all triggers ensures consistent deduplication.Also applies to: 37-37, 43-43, 49-49, 55-55
apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts (1)
74-87
: LGTM! Proper transaction usage for non-V3 batches.The transaction ensures atomicity between updating the batch item and enqueueing the resume service.
references/v3-catalog/src/trigger/checkpoints.ts (1)
7-12
: LGTM! Well-structured test task.The scheduled test task provides a controlled way to verify batch resuming functionality.
apps/webapp/app/v3/services/resumeDependentParents.server.ts (1)
272-288
: LGTM! Proper transaction usage for non-V3 batches.The transaction ensures atomicity between updating the batch item and enqueueing the resume service.
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (1)
200-243
: LGTM! Clear version-specific handling of batch task run items.The code correctly implements version-specific handling:
- For v3 batches: Uses the new completion mechanism via
completeBatchTaskRunItemV3
- For non-v3 batches: Maintains backward compatibility with the old completion logic
references/v3-catalog/src/trigger/batch.ts (3)
303-303
: Verify the reason for reducing the batch size limit.The default
largeBatchSize
has been reduced from 21 to 20. Please clarify if this change is related to any specific batch processing constraints or requirements.
691-725
: LGTM! Well-structured task for testing sequential batch triggering.The task provides a clean interface for testing batch triggering with configurable parameters:
- Supports both wait and non-wait modes
- Allows control over sequential vs concurrent processing
- Uses sensible defaults
796-832
: LGTM! Comprehensive test coverage for batch idempotency key behavior.The test task effectively covers key idempotency scenarios:
- Batch with duplicate idempotency keys
- Large batch with mixed idempotency keys
- Concurrent triggering with shared keys
apps/webapp/app/v3/services/triggerTask.server.ts (2)
52-58
: LGTM! Improved type safety and retry handling.Good additions:
TriggerTaskServiceResult
type provides clear structure for the trigger responseMAX_ATTEMPTS
constant establishes a clear retry limit
624-641
: LGTM! Enhanced error handling for idempotency key conflicts.Good improvements to error handling:
- Specific detection of idempotency key violations
- Automatic retry mechanism for resolving conflicts
- Clear logging for debugging purposes
apps/webapp/app/v3/services/batchTriggerV3.server.ts (2)
53-86
: LGTM! Clear documentation and type definitions.Excellent documentation of v3 changes:
- Clear explanation of batch sealing mechanism
- Well-documented completion tracking changes
- Good comparison with v2 behavior
307-386
: LGTM! Robust run data preparation with idempotency handling.Well-implemented method that:
- Properly handles cached and non-cached runs
- Manages expired idempotency keys
- Updates database records appropriately
internal-packages/database/prisma/schema.prisma (1)
2169-2183
: New Batch Tracking Fields for V3 BatchesThe new fields added to the
BatchTaskRun
model clearly document the intent to support batch sealing and processing logic for v3 operations. In particular:
sealed
&sealedAt
: These fields aptly indicate when a batch is no longer accepting new items.expectedCount
&completedCount
: Their defaults and inline comments explain how the system determines completion once the batch is sealed.completedAt
&resumedAt
: These provide necessary timestamps for tracking batch lifecycle events.processingJobsCount
&processingJobsExpectedCount
: These are useful for managing and monitoring asynchronous trigger operations, especially under a parallel processing strategy.Recommendations:
- Indexing Consideration: If these fields (particularly
sealed
,completedAt
, orprocessingJobsCount
) will be used frequently in query filters to determine batch status, you might consider adding indexes to improve query performance.- Data Integrity: Ensure that application-level checks or database constraints (if supported by your backend) are in place so that
completedCount
does not exceedexpectedCount
.Overall, these changes are well-documented, align with your v3 upgrade objectives, and integrate properly within the existing schema design.
… different limits for batchTrigger and batchTriggerAndWait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (1)
apps/webapp/app/v3/services/batchTriggerV3.server.ts (1)
738-784
: Consider adding retry mechanism for failed items.The error handling in
#processBatchTaskRunItems
stops processing on the first error. Consider implementing a retry mechanism for individual items.async #processBatchTaskRunItems( batch: BatchTaskRun, environment: AuthenticatedEnvironment, currentIndex: number, batchSize: number, items: BatchTriggerTaskV2RequestBody["items"], options?: BatchTriggerTaskServiceOptions ): Promise<{ workingIndex: number; error?: Error }> { + const MAX_RETRIES = 3; + const failedItems: Array<{ index: number; error: Error }> = []; for (const item of itemsToProcess) { + let retries = 0; + while (retries < MAX_RETRIES) { try { await this.#processBatchTaskRunItem(batch, environment, item, workingIndex, options); workingIndex++; + break; } catch (error) { + retries++; + if (retries === MAX_RETRIES) { + failedItems.push({ index: workingIndex, error: error instanceof Error ? error : new Error(String(error)) }); + } + await new Promise(resolve => setTimeout(resolve, Math.pow(2, retries) * 1000)); } + } } + + if (failedItems.length > 0) { + return { + error: new Error(`Failed to process ${failedItems.length} items`), + workingIndex, + failedItems + }; + } return { workingIndex }; }
🧹 Nitpick comments (4)
internal-packages/database/src/transaction.ts (1)
60-89
: Add JSDoc documentation for better code maintainability.The implementation is correct and well-structured, but adding JSDoc documentation would improve maintainability by clearly documenting the purpose, parameters, and return value.
+/** + * Checks if an error is a Prisma unique constraint violation for specific columns. + * @param error - The error to check + * @param columns - Array of column names to verify against the constraint + * @returns true if the error is a unique constraint violation for the specified columns + * @example + * ```ts + * if (isUniqueConstraintError(error, ["email"])) { + * // Handle duplicate email error + * } + * ``` + */ export function isUniqueConstraintError<T extends readonly string[]>( error: unknown, columns: T ): boolean {apps/webapp/app/env.server.ts (1)
365-365
: Add descriptive comment for the environment variable.The implementation is correct, but adding a comment would help explain the purpose and impact of this limit.
+ /** Maximum number of items allowed in a batch when using batch trigger and wait in V2. Default: 500 */ MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
apps/webapp/app/v3/services/batchTriggerV3.server.ts (2)
312-391
: Consider caching optimization for batch processing.The
#prepareRunData
method efficiently handles cached runs, but consider adding a bulk database query optimization when checking for expired runs.Instead of updating expired runs one by one, consider:
if (expiredRunIds.size) { - await this._prisma.taskRun.updateMany({ + // Batch update all expired runs in a single query + await tx.taskRun.updateMany({ where: { friendlyId: { in: Array.from(expiredRunIds) } }, data: { idempotencyKey: null }, }); }
673-735
: Add logging for batch sealing events.The batch sealing logic is correct but could benefit from more detailed logging for debugging purposes.
switch (options.strategy) { case "sequential": { if (result.workingIndex >= batch.runCount) { + logger.debug("[BatchTriggerV2][processBatchTaskRun] Sealing batch", { + batchId: batch.friendlyId, + strategy: "sequential", + runCount: batch.runCount, + workingIndex: result.workingIndex + }); await this._prisma.batchTaskRun.update({ where: { id: batch.id }, data: { sealed: true, sealedAt: new Date() }, });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(4 hunks)apps/webapp/app/v3/services/batchTriggerV3.server.ts
(17 hunks)internal-packages/database/src/transaction.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- apps/webapp/app/routes/api.v1.tasks.batch.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (1)
apps/webapp/app/v3/services/batchTriggerV3.server.ts (1)
58-63
: LGTM! Well-structured type definition.The
RunItemData
type clearly defines the structure for batch run items with appropriate fields.
Summary by CodeRabbit
New Features
Database Enhancements
User Experience
Testing Improvements