Skip to content

Runlock telemetry #1974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ export class RunEngine {
},
}
);
this.runLock = new RunLocker({ redis: this.runLockRedis });
this.runLock = new RunLocker({
redis: this.runLockRedis,
logger: this.logger,
tracer: trace.getTracer("RunLocker"),
});

const keys = new RunQueueFullKeyProducer();

Expand Down Expand Up @@ -491,7 +495,7 @@ export class RunEngine {

span.setAttribute("runId", taskRun.id);

await this.runLock.lock([taskRun.id], 5000, async (signal) => {
await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => {
//create associated waitpoint (this completes when the run completes)
const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint(
prisma,
Expand Down Expand Up @@ -1162,7 +1166,7 @@ export class RunEngine {
tx?: PrismaClientOrTransaction;
}) {
const prisma = tx ?? this.prisma;
return await this.runLock.lock([runId], 5_000, async () => {
return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => {
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
Comment on lines +1169 to 1170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Long-running callback risks lock expiry; propagate signal checks

#handleStalledSnapshot performs multiple DB round-trips and conditional branches that can easily exceed the 5 s lock TTL. Unlike the trigger path, the callback never calls signal.throwIfAborted() nor refreshes the lock, so a competing worker could acquire the same lock mid-execution.

Add periodic checks or extend the lock inside the callback, e.g.:

await this.$.runLock.lock("handleStalledSnapshot", [runId], 10_000, async (signal) => {
  const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
  signal.throwIfAborted();              // ✅ after IO

  // … extra logic …

  if (complexCondition) {
    signal.throwIfAborted();            // ✅ before continuing
  }
});

Also consider a slightly larger TTL (e.g. 10–15 s) to cover worst-case DB latency.


🏁 Script executed:

#!/bin/bash
set -e

echo "----- Searching runLock usage in index.ts -----"
rg -n "runLock" internal-packages/run-engine/src/engine/index.ts

echo "----- Searching import of runLock -----"
rg -n "import.*runLock" internal-packages/run-engine/src/engine/index.ts

echo "----- Searching lock method signature usage -----"
rg -n "runLock.lock" -R internal-packages

echo "----- Searching class definitions for ‘Lock’ -----"
rg -n "class .*Lock" -R internal-packages

Length of output: 988


🏁 Script executed:

#!/bin/bash
set -e

echo "----- Locating RunLocker class -----"
FILE=$(rg -l "class RunLocker" -R internal-packages | head -n1)
if [[ -z "$FILE" ]]; then
  echo "RunLocker class not found."
  exit 1
fi
echo "Found RunLocker in $FILE"

echo "----- Displaying lock method signature -----"
rg -n "lock(" "$FILE" | head -n20

Length of output: 650


Ensure handleStalledSnapshot lock callback handles abort signal and extends TTL

The callback passed to
internal-packages/run-engine/src/engine/index.ts:1169
currently omits the signal parameter, preventing any signal.throwIfAborted() checks and risking lock expiry during its multiple DB round-trips. To prevent a competing worker from stealing the lock mid-execution and to handle timeouts:

  • Change the callback signature to accept signal: AbortSignal and bump the TTL:

    // before (lines 1169–1170)
    await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => {
      const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
    // after
    await this.runLock.lock("handleStalledSnapshot", [runId], 10_000, async (signal) => {
      const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
      signal.throwIfAborted();            // ✅ after DB read
    
      // … additional logic …
    
      if (complexCondition) {
        signal.throwIfAborted();          // ✅ before continuing
      }
    });
  • Insert signal.throwIfAborted() after each heavy I/O or before key branches.

  • Consider a TTL of 10 000–15 000 ms to cover worst-case DB latency.

if (latestSnapshot.id !== snapshotId) {
this.logger.log(
Expand Down
53 changes: 41 additions & 12 deletions internal-packages/run-engine/src/engine/locking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ const { default: Redlock } = require("redlock");
import { AsyncLocalStorage } from "async_hooks";
import { Redis } from "@internal/redis";
import * as redlock from "redlock";
import { tryCatch } from "@trigger.dev/core";
import { Logger } from "@trigger.dev/core/logger";
import { startSpan, Tracer } from "@internal/tracing";

interface LockContext {
resources: string;
Expand All @@ -12,8 +15,10 @@ interface LockContext {
export class RunLocker {
private redlock: InstanceType<typeof redlock.default>;
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
private logger: Logger;
private tracer: Tracer;

constructor(options: { redis: Redis }) {
constructor(options: { redis: Redis; logger: Logger; tracer: Tracer }) {
this.redlock = new Redlock([options.redis], {
driftFactor: 0.01,
retryCount: 10,
Expand All @@ -22,30 +27,54 @@ export class RunLocker {
automaticExtensionThreshold: 500, // time in ms
});
this.asyncLocalStorage = new AsyncLocalStorage<LockContext>();
this.logger = options.logger;
this.tracer = options.tracer;
}

/** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
async lock<T>(
name: string,
resources: string[],
duration: number,
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
): Promise<T> {
const currentContext = this.asyncLocalStorage.getStore();
const joinedResources = resources.sort().join(",");

if (currentContext && currentContext.resources === joinedResources) {
// We're already inside a lock with the same resources, just run the routine
return routine(currentContext.signal);
}
return startSpan(
this.tracer,
"RunLocker.lock",
async (span) => {
if (currentContext && currentContext.resources === joinedResources) {
span.setAttribute("nested", true);
// We're already inside a lock with the same resources, just run the routine
return routine(currentContext.signal);
}

// Different resources or not in a lock, proceed with new lock
return this.redlock.using(resources, duration, async (signal) => {
const newContext: LockContext = { resources: joinedResources, signal };
span.setAttribute("nested", false);

return this.asyncLocalStorage.run(newContext, async () => {
return routine(signal);
});
});
// Different resources or not in a lock, proceed with new lock
const [error, result] = await tryCatch(
this.redlock.using(resources, duration, async (signal) => {
const newContext: LockContext = { resources: joinedResources, signal };

return this.asyncLocalStorage.run(newContext, async () => {
return routine(signal);
});
})
);

if (error) {
this.logger.error("[RunLocker] Error locking resources", { error, resources, duration });
throw error;
}

return result;
},
{
attributes: { name, resources, timeout: duration },
}
);
}

isInsideLock(): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class CheckpointSystem {
}): Promise<CreateCheckpointResult> {
const prisma = tx ?? this.$.prisma;

return await this.$.runLock.lock([runId], 5_000, async () => {
return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => {
const snapshot = await getLatestExecutionSnapshot(prisma, runId);

const isValidSnapshot =
Expand Down Expand Up @@ -238,7 +238,7 @@ export class CheckpointSystem {
}): Promise<ExecutionResult> {
const prisma = tx ?? this.$.prisma;

return await this.$.runLock.lock([runId], 5_000, async () => {
return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => {
const snapshot = await getLatestExecutionSnapshot(prisma, runId);

if (snapshot.id !== snapshotId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class DelayedRunSystem {
this.$.tracer,
"rescheduleDelayedRun",
async () => {
return await this.$.runLock.lock([runId], 5_000, async () => {
return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => {
const snapshot = await getLatestExecutionSnapshot(prisma, runId);

//if the run isn't just created then we can't reschedule it
Expand Down
Loading
Loading