-
-
Notifications
You must be signed in to change notification settings - Fork 710
Runlock telemetry #1974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Runlock telemetry #1974
Conversation
WalkthroughThis set of changes refactors the locking mechanism within the run-engine package by updating the Changes
Sequence Diagram(s)sequenceDiagram
participant System as Engine System (e.g., DequeueSystem)
participant RunLocker
participant Logger
participant Tracer
System->>RunLocker: lock("operationName", [resourceId], timeout, callback)
RunLocker->>Tracer: startSpan("RunLocker.lock", {name, resources, timeout})
alt Lock is nested
RunLocker->>Tracer: setAttribute("nested", true)
RunLocker->>callback: execute with existing lock signal
else New lock needed
RunLocker->>Tracer: setAttribute("nested", false)
RunLocker->>Logger: log lock attempt
RunLocker->>callback: execute with new lock signal
alt Error occurs
RunLocker->>Logger: log error details
RunLocker->>Tracer: record error
RunLocker->>System: throw error
end
end
RunLocker->>Tracer: endSpan
RunLocker->>System: return result
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (3)
internal-packages/run-engine/src/engine/index.ts (2)
95-99
: Reuse the engine’s tracer instead of instantiating a separate one
RunEngine
receives an externally-configuredTracer
inoptions.tracer
(assigned later on line 179). Creating a second tracer here breaks span correlation and doubles the instrumentation overhead.- this.runLock = new RunLocker({ - redis: this.runLockRedis, - logger: this.logger, - tracer: trace.getTracer("RunLocker"), - }); + // Re-use the same tracer that the rest of the engine will use + this.runLock = new RunLocker({ + redis: this.runLockRedis, + logger: this.logger, + tracer: options.tracer ?? trace.getTracer("RunLocker"), + });If
options.tracer
isn’t available yet, consider moving thethis.tracer
assignment above this block so you can passthis.tracer
directly.
498-498
: Minor: consider extracting lock name strings to enums/constantsSeveral callers now pass literal strings (
"trigger"
,"dequeueFromMasterQueue"
, …). A centralenum
(e.g.RunLockSpan
) prevents typos and eases refactors/searchability.Not critical, just a maintainability suggestion.
internal-packages/run-engine/src/engine/tests/locking.test.ts (1)
130-143
: Make the timeout test assertion implementation-agnosticThe test expects the error message to literally contain “unable to achieve a quorum”, which couples the suite to the wording of the underlying redlock library. Any library upgrade or customisation will break the test even though functionality is intact. Assert on the error type instead:
-).rejects.toThrow("unable to achieve a quorum"); +).rejects.toThrowError(/quorum|Lock/i);or, better, check for a custom
LockTimeoutError
class thrown byRunLocker
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
internal-packages/run-engine/src/engine/index.ts
(3 hunks)internal-packages/run-engine/src/engine/locking.ts
(3 hunks)internal-packages/run-engine/src/engine/systems/checkpointSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/systems/enqueueSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
(5 hunks)internal-packages/run-engine/src/engine/systems/ttlSystem.ts
(1 hunks)internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
(2 hunks)internal-packages/run-engine/src/engine/tests/locking.test.ts
(3 hunks)internal-packages/run-engine/tsconfig.test.json
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (1)
internal-packages/run-engine/src/engine/index.ts (1)
runId
(1159-1313)
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (2)
internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts (1)
getLatestExecutionSnapshot
(28-100)internal-packages/run-engine/src/engine/statuses.ts (1)
canReleaseConcurrency
(49-52)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: units / 🧪 Unit Tests
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (21)
internal-packages/run-engine/src/engine/locking.ts (6)
6-8
: Good addition of necessary importsThe imports for
tryCatch
,Logger
, and tracing utilities provide the foundation for enhanced observability of locking operations.
18-19
: Well-structured class property additionsAdding the logger and tracer as private properties with proper initialization in the constructor follows good OOP practices and enables the tracing enhancements.
Also applies to: 30-31
36-36
: API extension with descriptive lock name parameterAdding the name parameter is a good practice for better identification and debugging of locks. This makes lock usage more explicit and traceable.
44-77
: Excellent implementation of tracing spans for lock operationsThe lock method implementation has several well-designed features:
- Proper wrapping of locking logic in a tracing span
- Detailed span attributes for observability
- Efficient handling of nested locks with the same resources
- Robust error handling with logging
- Preservation of the original behavior while adding observability
This approach will significantly improve debugging and monitoring of lock-related issues.
57-70
: Well-implemented error handling with tryCatchThe use of tryCatch to capture and log errors before rethrowing them is an excellent pattern that enhances observability without changing behavior.
21-21
:✅ Verification successful
Breaking change to constructor signature handled appropriately
Expanding the constructor parameters to require logger and tracer instances is a breaking change, but necessary for the enhanced functionality. Make sure all instantiations of RunLocker across the codebase have been updated.
🏁 Script executed:
#!/bin/bash # Find all instances where RunLocker is instantiated to verify they include logger and tracer echo "Looking for RunLocker instantiations..." rg -A 3 "new RunLocker" --type tsLength of output: 3373
All RunLocker instances updated to include
logger
andtracer
– no further action required.Verification confirmed that every
new RunLocker({...})
call in
internal-packages/run-engine/src/engine/index.ts
internal-packages/run-engine/src/engine/tests/locking.test.ts
already supplies
redis
,logger
, andtracer
.internal-packages/run-engine/tsconfig.test.json (1)
19-19
: Compiler configuration updateAdding customConditions for @triggerdotdev/source supports the test environment requirements for the new features.
internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts (1)
40-40
: Consistent usage of the updated lock APIThe lock call has been properly updated to include the descriptive name "rescheduleDelayedRun" as the first parameter, following the new API signature.
internal-packages/run-engine/src/engine/systems/ttlSystem.ts (2)
27-27
: Consistent usage of the updated lock APIThe lock call has been correctly updated to include the descriptive name "expireRun" as the first parameter, maintaining consistency with the updated API signature.
25-118
:✅ Verification successful
Verify all lock calls across systems are updated
The changes to include a descriptive lock name are consistent in the files reviewed. However, we should verify all lock calls across the codebase have been updated to maintain consistency.
🏁 Script executed:
#!/bin/bash # Find all calls to runLock.lock to verify they include the name parameter echo "Looking for runLock.lock calls..." rg "runLock\.lock\(" --type tsLength of output: 4511
All runLock.lock calls include descriptive names
Verified that every call torunLock.lock
across the codebase specifies a descriptive lock name as the first argument.internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts (5)
79-79
: Updated lock method call with descriptive nameThe lock call now includes a descriptive string identifier "startRunAttempt" as the first parameter, which improves traceability.
415-415
: Consistent lock naming for improved observabilityThe lock call now includes a descriptive name "attemptSucceeded" that clearly identifies the operation context.
549-549
: Enhanced lock telemetryAdding the descriptive identifier "attemptFailed" to the lock operation provides better context for tracing and debugging.
853-853
: Clear lock operation labelingThe lock call now includes a meaningful operation name "tryNackAndRequeue" which will help with telemetry and error tracking.
929-929
: Improved lock traceabilityThe addition of "cancelRun" as a lock name provides explicit context for this critical operation in telemetry data.
internal-packages/run-engine/src/engine/systems/enqueueSystem.ts (1)
57-57
: Added descriptive lock name for telemetryThe lock method call now includes "enqueueRun" as the first parameter, consistent with the updated RunLocker API and improving observability across the system.
internal-packages/run-engine/src/engine/systems/checkpointSystem.ts (2)
56-56
: Enhanced lock telemetry with descriptive nameAdding "createCheckpoint" as the lock name provides clear context for this operation in traces and logs, consistent with the new locking API pattern.
241-241
: Named lock for better observabilityThe lock call now includes "continueRunExecution" as an identifier, which improves tracing and debugging capabilities.
internal-packages/run-engine/src/engine/systems/waitpointSystem.ts (2)
361-361
: Improved lock operation identificationThe lock method call now includes "blockRunWithWaitpoint" as a descriptive identifier, enhancing observability and fitting with the updated locking API.
512-512
: Consistent lock naming patternThe addition of "continueRunIfUnblocked" as a lock name maintains the consistent pattern of named locks across the codebase for better telemetry.
internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts (1)
200-205
:❓ Verification inconclusive
Callback may outlive 5 s lock TTL – use
signal
or increase TTLThe callback performs several network calls (DB fetch, queue ops). If latency spikes, the 5 000 ms TTL may lapse, causing a second worker to enter the same critical section.
Either
• raise the TTL to something less brittle (e.g. 15 000 ms), or
• callsignal.throwIfAborted()
(and/orextend()
) after every awaited statement to guarantee exclusive ownership.- [snapshot.runId], - 5_000, + [snapshot.runId], + 15_000, // or keep 5 000 and add periodic signal checks
🏁 Script executed:
#!/bin/bash # Show the top of releaseConcurrencySystem.ts to understand imports and context sed -n '1,100p' internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts # Find all occurrences of runLock.lock to see usage patterns rg -n "runLock\.lock" -n internal-packages # Locate the runLock implementation file to inspect its lock() signature RUNLOCK_FILE=$(fd runLock.ts internal-packages | head -n1) echo "Found runLock implementation at: $RUNLOCK_FILE" sed -n '1,200p' "$RUNLOCK_FILE"Length of output: 7621
Callback may outlive 5s lock TTL – verify
signal
support or increase TTL
The lock callback in releaseConcurrencySystem performs multiple awaited operations (DB fetch, queue ops). If its execution exceeds the 5 000 ms TTL, the lock will expire and a second worker can enter the same critical section.• Increase the TTL to a more robust value (e.g. 15 000 ms)
• Or leverage thesignal
passed into the callback—callsignal.throwIfAborted()
after each await (and/or uselock.extend()
if your implementation supports it)
• Confirm therunLock.lock
signature onSystemResources
actually passes anAbortSignal
and exposes an extension API before relying on signal checks
return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => { | ||
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Long-running callback risks lock expiry; propagate signal
checks
#handleStalledSnapshot
performs multiple DB round-trips and conditional branches that can easily exceed the 5 s lock TTL. Unlike the trigger
path, the callback never calls signal.throwIfAborted()
nor refreshes the lock, so a competing worker could acquire the same lock mid-execution.
Add periodic checks or extend the lock inside the callback, e.g.:
await this.$.runLock.lock("handleStalledSnapshot", [runId], 10_000, async (signal) => {
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
signal.throwIfAborted(); // ✅ after IO
// … extra logic …
if (complexCondition) {
signal.throwIfAborted(); // ✅ before continuing
}
});
Also consider a slightly larger TTL (e.g. 10–15 s) to cover worst-case DB latency.
🏁 Script executed:
#!/bin/bash
set -e
echo "----- Searching runLock usage in index.ts -----"
rg -n "runLock" internal-packages/run-engine/src/engine/index.ts
echo "----- Searching import of runLock -----"
rg -n "import.*runLock" internal-packages/run-engine/src/engine/index.ts
echo "----- Searching lock method signature usage -----"
rg -n "runLock.lock" -R internal-packages
echo "----- Searching class definitions for ‘Lock’ -----"
rg -n "class .*Lock" -R internal-packages
Length of output: 988
🏁 Script executed:
#!/bin/bash
set -e
echo "----- Locating RunLocker class -----"
FILE=$(rg -l "class RunLocker" -R internal-packages | head -n1)
if [[ -z "$FILE" ]]; then
echo "RunLocker class not found."
exit 1
fi
echo "Found RunLocker in $FILE"
echo "----- Displaying lock method signature -----"
rg -n "lock(" "$FILE" | head -n20
Length of output: 650
Ensure handleStalledSnapshot
lock callback handles abort signal and extends TTL
The callback passed to
internal-packages/run-engine/src/engine/index.ts:1169
currently omits the signal
parameter, preventing any signal.throwIfAborted()
checks and risking lock expiry during its multiple DB round-trips. To prevent a competing worker from stealing the lock mid-execution and to handle timeouts:
-
Change the callback signature to accept
signal: AbortSignal
and bump the TTL:// before (lines 1169–1170) await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
// after await this.runLock.lock("handleStalledSnapshot", [runId], 10_000, async (signal) => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); signal.throwIfAborted(); // ✅ after DB read // … additional logic … if (complexCondition) { signal.throwIfAborted(); // ✅ before continuing } });
-
Insert
signal.throwIfAborted()
after each heavy I/O or before key branches. -
Consider a TTL of 10 000–15 000 ms to cover worst-case DB latency.
const parsedConfig = RetryOptions.nullable().safeParse(retryConfig); | ||
|
||
this.$.logger.debug( | ||
"RunEngine.dequeueFromMasterQueue(): maxAttempts not set, using task's retry config", | ||
{ | ||
runId, | ||
task: result.task.id, | ||
rawRetryConfig: retryConfig, | ||
if (!parsedConfig.success) { | ||
this.$.logger.error( | ||
"RunEngine.dequeueFromMasterQueue(): Invalid retry config", | ||
{ | ||
runId, | ||
task: result.task.id, | ||
rawRetryConfig: retryConfig, | ||
} | ||
); | ||
} | ||
); | ||
|
||
const parsedConfig = RetryOptions.nullable().safeParse(retryConfig); | ||
|
||
if (!parsedConfig.success) { | ||
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Invalid retry config", { | ||
runId, | ||
task: result.task.id, | ||
rawRetryConfig: retryConfig, | ||
}); | ||
maxAttempts = parsedConfig.data?.maxAttempts; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type-safety bug: accessing .data
on failed Zod parse
When parsedConfig.success === false
, the union type no longer exposes .data
. Accessing it unconditionally causes a compile-time error and will break strict builds.
-const parsedConfig = RetryOptions.nullable().safeParse(retryConfig);
-
-if (!parsedConfig.success) {
- this.$.logger.error(/* … */);
-}
-
-maxAttempts = parsedConfig.data?.maxAttempts;
+const retryParse = RetryOptions.nullable().safeParse(retryConfig);
+
+if (retryParse.success) {
+ maxAttempts = retryParse.data?.maxAttempts;
+} else {
+ this.$.logger.error(/* … */);
+}
This guarantees both sound typings and correct runtime behaviour.
Committable suggestion skipped: line range outside the PR's diff.
consumedResources.cpu += machinePreset.cpu; | ||
consumedResources.memory += machinePreset.memory; | ||
|
||
//are we under the limit? | ||
if (maxResources) { | ||
if ( | ||
consumedResources.cpu > maxResources.cpu || | ||
consumedResources.memory > maxResources.memory | ||
) { | ||
this.$.logger.debug( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource accounting can exceed limits by one message
consumedResources
is incremented before you validate the totals. If the next run tips the totals over maxResources
, the function nacks the message but leaves the over-counted values in the accumulator, potentially blocking subsequent (valid) runs in the same loop iteration.
Compute the prospective totals first:
-consumedResources.cpu += machinePreset.cpu;
-consumedResources.memory += machinePreset.memory;
-
-//are we under the limit?
-if (maxResources) {
- if (consumedResources.cpu > maxResources.cpu ||
- consumedResources.memory > maxResources.memory) {
+const prospectiveCpu = consumedResources.cpu + machinePreset.cpu;
+const prospectiveMem = consumedResources.memory + machinePreset.memory;
+
+if (maxResources &&
+ (prospectiveCpu > maxResources.cpu || prospectiveMem > maxResources.memory)) {
// …
return null;
}
}
+
+consumedResources.cpu = prospectiveCpu;
+consumedResources.memory = prospectiveMem;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
consumedResources.cpu += machinePreset.cpu; | |
consumedResources.memory += machinePreset.memory; | |
//are we under the limit? | |
if (maxResources) { | |
if ( | |
consumedResources.cpu > maxResources.cpu || | |
consumedResources.memory > maxResources.memory | |
) { | |
this.$.logger.debug( | |
// compute what the totals would be *if* we accepted this message | |
const prospectiveCpu = consumedResources.cpu + machinePreset.cpu; | |
const prospectiveMem = consumedResources.memory + machinePreset.memory; | |
// reject early if we’d exceed our budget | |
if (maxResources && | |
(prospectiveCpu > maxResources.cpu || prospectiveMem > maxResources.memory)) { | |
// …preserve existing debug logging | |
this.$.logger.debug( | |
`Resource limit exceeded: CPU ${prospectiveCpu}/${maxResources.cpu}, ` + | |
`Memory ${prospectiveMem}/${maxResources.memory}` | |
); | |
return null; | |
} | |
// now that we know it's safe, commit the consumption | |
consumedResources.cpu = prospectiveCpu; | |
consumedResources.memory = prospectiveMem; |
Added spans to the RunLock
Summary by CodeRabbit