diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 29354fe104..ef8b8f6af3 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -92,7 +92,11 @@ export class RunEngine { }, } ); - this.runLock = new RunLocker({ redis: this.runLockRedis }); + this.runLock = new RunLocker({ + redis: this.runLockRedis, + logger: this.logger, + tracer: trace.getTracer("RunLocker"), + }); const keys = new RunQueueFullKeyProducer(); @@ -491,7 +495,7 @@ export class RunEngine { span.setAttribute("runId", taskRun.id); - await this.runLock.lock([taskRun.id], 5000, async (signal) => { + await this.runLock.lock("trigger", [taskRun.id], 5000, async (signal) => { //create associated waitpoint (this completes when the run completes) const associatedWaitpoint = await this.waitpointSystem.createRunAssociatedWaitpoint( prisma, @@ -1162,7 +1166,7 @@ export class RunEngine { tx?: PrismaClientOrTransaction; }) { const prisma = tx ?? this.prisma; - return await this.runLock.lock([runId], 5_000, async () => { + return await this.runLock.lock("handleStalledSnapshot", [runId], 5_000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { this.logger.log( diff --git a/internal-packages/run-engine/src/engine/locking.ts b/internal-packages/run-engine/src/engine/locking.ts index 1ffed51ba1..c8e790204f 100644 --- a/internal-packages/run-engine/src/engine/locking.ts +++ b/internal-packages/run-engine/src/engine/locking.ts @@ -3,6 +3,9 @@ const { default: Redlock } = require("redlock"); import { AsyncLocalStorage } from "async_hooks"; import { Redis } from "@internal/redis"; import * as redlock from "redlock"; +import { tryCatch } from "@trigger.dev/core"; +import { Logger } from "@trigger.dev/core/logger"; +import { startSpan, Tracer } from "@internal/tracing"; interface LockContext { resources: string; @@ -12,8 +15,10 @@ interface LockContext { export class RunLocker { private redlock: InstanceType; private asyncLocalStorage: AsyncLocalStorage; + private logger: Logger; + private tracer: Tracer; - constructor(options: { redis: Redis }) { + constructor(options: { redis: Redis; logger: Logger; tracer: Tracer }) { this.redlock = new Redlock([options.redis], { driftFactor: 0.01, retryCount: 10, @@ -22,10 +27,13 @@ export class RunLocker { automaticExtensionThreshold: 500, // time in ms }); this.asyncLocalStorage = new AsyncLocalStorage(); + this.logger = options.logger; + this.tracer = options.tracer; } /** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */ async lock( + name: string, resources: string[], duration: number, routine: (signal: redlock.RedlockAbortSignal) => Promise @@ -33,19 +41,40 @@ export class RunLocker { const currentContext = this.asyncLocalStorage.getStore(); const joinedResources = resources.sort().join(","); - if (currentContext && currentContext.resources === joinedResources) { - // We're already inside a lock with the same resources, just run the routine - return routine(currentContext.signal); - } + return startSpan( + this.tracer, + "RunLocker.lock", + async (span) => { + if (currentContext && currentContext.resources === joinedResources) { + span.setAttribute("nested", true); + // We're already inside a lock with the same resources, just run the routine + return routine(currentContext.signal); + } - // Different resources or not in a lock, proceed with new lock - return this.redlock.using(resources, duration, async (signal) => { - const newContext: LockContext = { resources: joinedResources, signal }; + span.setAttribute("nested", false); - return this.asyncLocalStorage.run(newContext, async () => { - return routine(signal); - }); - }); + // Different resources or not in a lock, proceed with new lock + const [error, result] = await tryCatch( + this.redlock.using(resources, duration, async (signal) => { + const newContext: LockContext = { resources: joinedResources, signal }; + + return this.asyncLocalStorage.run(newContext, async () => { + return routine(signal); + }); + }) + ); + + if (error) { + this.logger.error("[RunLocker] Error locking resources", { error, resources, duration }); + throw error; + } + + return result; + }, + { + attributes: { name, resources, timeout: duration }, + } + ); } isInsideLock(): boolean { diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 91a0ad0c9f..bcaf417756 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -53,7 +53,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock([runId], 5_000, async () => { + return await this.$.runLock.lock("createCheckpoint", [runId], 5_000, async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); const isValidSnapshot = @@ -238,7 +238,7 @@ export class CheckpointSystem { }): Promise { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock([runId], 5_000, async () => { + return await this.$.runLock.lock("continueRunExecution", [runId], 5_000, async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); if (snapshot.id !== snapshotId) { diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 6c7e410bac..2511e86f26 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -37,7 +37,7 @@ export class DelayedRunSystem { this.$.tracer, "rescheduleDelayedRun", async () => { - return await this.$.runLock.lock([runId], 5_000, async () => { + return await this.$.runLock.lock("rescheduleDelayedRun", [runId], 5_000, async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if the run isn't just created then we can't reschedule it diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 9776a98518..9610c1accc 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -91,348 +91,359 @@ export class DequeueSystem { //lock the run so nothing else can modify it try { - const dequeuedRun = await this.$.runLock.lock([runId], 5000, async (signal) => { - const snapshot = await getLatestExecutionSnapshot(prisma, runId); - - if (!isDequeueableExecutionStatus(snapshot.executionStatus)) { - //create a failed snapshot - await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { - run: { - id: snapshot.runId, - status: snapshot.runStatus, - }, - snapshot: { - executionStatus: snapshot.executionStatus, - description: - "Tried to dequeue a run that is not in a valid state to be dequeued.", - }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - checkpointId: snapshot.checkpointId ?? undefined, - completedWaitpoints: snapshot.completedWaitpoints, - error: `Tried to dequeue a run that is not in a valid state to be dequeued.`, - workerId, - runnerId, - }); - - //todo is there a way to recover this, so the run can be retried? - //for example should we update the status to a dequeuable status and nack it? - //then at least it has a chance of succeeding and we have the error log above - await this.runAttemptSystem.systemFailure({ - runId, - error: { - type: "INTERNAL_ERROR", - code: "TASK_DEQUEUED_INVALID_STATE", - message: `Task was in the ${snapshot.executionStatus} state when it was dequeued for execution.`, - }, - tx: prisma, - }); - this.$.logger.error( - `RunEngine.dequeueFromMasterQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}` - ); - return null; - } - - if (snapshot.executionStatus === "QUEUED_EXECUTING") { - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - prisma, - { + const dequeuedRun = await this.$.runLock.lock( + "dequeueFromMasterQueue", + [runId], + 5000, + async (signal) => { + const snapshot = await getLatestExecutionSnapshot(prisma, runId); + + if (!isDequeueableExecutionStatus(snapshot.executionStatus)) { + //create a failed snapshot + await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: { - id: runId, + id: snapshot.runId, status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber, }, snapshot: { - executionStatus: "EXECUTING", - description: "Run was continued, whilst still executing.", + executionStatus: snapshot.executionStatus, + description: + "Tried to dequeue a run that is not in a valid state to be dequeued.", }, previousSnapshotId: snapshot.id, environmentId: snapshot.environmentId, environmentType: snapshot.environmentType, projectId: snapshot.projectId, organizationId: snapshot.organizationId, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: snapshot.completedWaitpoints.map((waitpoint) => ({ - id: waitpoint.id, - index: waitpoint.index, - })), - } - ); + checkpointId: snapshot.checkpointId ?? undefined, + completedWaitpoints: snapshot.completedWaitpoints, + error: `Tried to dequeue a run that is not in a valid state to be dequeued.`, + workerId, + runnerId, + }); - if (snapshot.previousSnapshotId) { - await this.releaseConcurrencySystem.refillTokensForSnapshot( - snapshot.previousSnapshotId + //todo is there a way to recover this, so the run can be retried? + //for example should we update the status to a dequeuable status and nack it? + //then at least it has a chance of succeeding and we have the error log above + await this.runAttemptSystem.systemFailure({ + runId, + error: { + type: "INTERNAL_ERROR", + code: "TASK_DEQUEUED_INVALID_STATE", + message: `Task was in the ${snapshot.executionStatus} state when it was dequeued for execution.`, + }, + tx: prisma, + }); + this.$.logger.error( + `RunEngine.dequeueFromMasterQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}` ); + return null; } - await sendNotificationToWorker({ - runId, - snapshot: newSnapshot, - eventBus: this.$.eventBus, - }); + if (snapshot.executionStatus === "QUEUED_EXECUTING") { + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + prisma, + { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber, + }, + snapshot: { + executionStatus: "EXECUTING", + description: "Run was continued, whilst still executing.", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: snapshot.completedWaitpoints.map((waitpoint) => ({ + id: waitpoint.id, + index: waitpoint.index, + })), + } + ); - return null; - } + if (snapshot.previousSnapshotId) { + await this.releaseConcurrencySystem.refillTokensForSnapshot( + snapshot.previousSnapshotId + ); + } - const result = await getRunWithBackgroundWorkerTasks( - prisma, - runId, - backgroundWorkerId - ); + await sendNotificationToWorker({ + runId, + snapshot: newSnapshot, + eventBus: this.$.eventBus, + }); - if (!result.success) { - switch (result.code) { - case "NO_RUN": { - //this should not happen, the run is unrecoverable so we'll ack it - this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No run found", { - runId, - latestSnapshot: snapshot.id, - }); - await this.$.runQueue.acknowledgeMessage(orgId, runId); - return null; + return null; + } + + const result = await getRunWithBackgroundWorkerTasks( + prisma, + runId, + backgroundWorkerId + ); + + if (!result.success) { + switch (result.code) { + case "NO_RUN": { + //this should not happen, the run is unrecoverable so we'll ack it + this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No run found", { + runId, + latestSnapshot: snapshot.id, + }); + await this.$.runQueue.acknowledgeMessage(orgId, runId); + return null; + } + case "NO_WORKER": + case "TASK_NEVER_REGISTERED": + case "QUEUE_NOT_FOUND": + case "TASK_NOT_IN_LATEST": { + this.$.logger.warn(`RunEngine.dequeueFromMasterQueue(): ${result.code}`, { + runId, + latestSnapshot: snapshot.id, + result, + }); + + //not deployed yet, so we'll wait for the deploy + await this.#pendingVersion({ + orgId, + runId, + reason: result.message, + statusReason: result.code, + tx: prisma, + }); + return null; + } + case "BACKGROUND_WORKER_MISMATCH": { + this.$.logger.warn( + "RunEngine.dequeueFromMasterQueue(): Background worker mismatch", + { + runId, + latestSnapshot: snapshot.id, + result, + } + ); + + //worker mismatch so put it back in the queue + await this.$.runQueue.nackMessage({ orgId, messageId: runId }); + + return null; + } + default: { + assertExhaustive(result); + } } - case "NO_WORKER": - case "TASK_NEVER_REGISTERED": - case "QUEUE_NOT_FOUND": - case "TASK_NOT_IN_LATEST": { - this.$.logger.warn(`RunEngine.dequeueFromMasterQueue(): ${result.code}`, { + } + + //check for a valid deployment if it's not a development environment + if (result.run.runtimeEnvironment.type !== "DEVELOPMENT") { + if (!result.deployment || !result.deployment.imageReference) { + this.$.logger.warn("RunEngine.dequeueFromMasterQueue(): No deployment found", { runId, latestSnapshot: snapshot.id, result, }); - //not deployed yet, so we'll wait for the deploy await this.#pendingVersion({ orgId, runId, - reason: result.message, - statusReason: result.code, + reason: "No deployment or deployment image reference found for deployed run", + statusReason: "NO_DEPLOYMENT", tx: prisma, }); + return null; } - case "BACKGROUND_WORKER_MISMATCH": { - this.$.logger.warn( - "RunEngine.dequeueFromMasterQueue(): Background worker mismatch", + } + + const machinePreset = getMachinePreset({ + machines: this.options.machines.machines, + defaultMachine: this.options.machines.defaultMachine, + config: result.task.machineConfig ?? {}, + run: result.run, + }); + + //increment the consumed resources + consumedResources.cpu += machinePreset.cpu; + consumedResources.memory += machinePreset.memory; + + //are we under the limit? + if (maxResources) { + if ( + consumedResources.cpu > maxResources.cpu || + consumedResources.memory > maxResources.memory + ) { + this.$.logger.debug( + "RunEngine.dequeueFromMasterQueue(): Consumed resources over limit, nacking", { runId, - latestSnapshot: snapshot.id, - result, + consumedResources, + maxResources, } ); - //worker mismatch so put it back in the queue - await this.$.runQueue.nackMessage({ orgId, messageId: runId }); - + //put it back in the queue where it was + await this.$.runQueue.nackMessage({ + orgId, + messageId: runId, + incrementAttemptCount: false, + retryAt: result.run.createdAt.getTime() - result.run.priorityMs, + }); return null; } - default: { - assertExhaustive(result); - } } - } - //check for a valid deployment if it's not a development environment - if (result.run.runtimeEnvironment.type !== "DEVELOPMENT") { - if (!result.deployment || !result.deployment.imageReference) { - this.$.logger.warn("RunEngine.dequeueFromMasterQueue(): No deployment found", { - runId, - latestSnapshot: snapshot.id, - result, - }); - //not deployed yet, so we'll wait for the deploy - await this.#pendingVersion({ - orgId, - runId, - reason: "No deployment or deployment image reference found for deployed run", - statusReason: "NO_DEPLOYMENT", - tx: prisma, - }); + // Check max attempts that can optionally be set when triggering a run + let maxAttempts: number | null | undefined = result.run.maxAttempts; - return null; - } - } + // If it's not set, we'll grab it from the task's retry config + if (!maxAttempts) { + const retryConfig = result.task.retryConfig; - const machinePreset = getMachinePreset({ - machines: this.options.machines.machines, - defaultMachine: this.options.machines.defaultMachine, - config: result.task.machineConfig ?? {}, - run: result.run, - }); - - //increment the consumed resources - consumedResources.cpu += machinePreset.cpu; - consumedResources.memory += machinePreset.memory; - - //are we under the limit? - if (maxResources) { - if ( - consumedResources.cpu > maxResources.cpu || - consumedResources.memory > maxResources.memory - ) { this.$.logger.debug( - "RunEngine.dequeueFromMasterQueue(): Consumed resources over limit, nacking", + "RunEngine.dequeueFromMasterQueue(): maxAttempts not set, using task's retry config", { runId, - consumedResources, - maxResources, + task: result.task.id, + rawRetryConfig: retryConfig, } ); - //put it back in the queue where it was - await this.$.runQueue.nackMessage({ - orgId, - messageId: runId, - incrementAttemptCount: false, - retryAt: result.run.createdAt.getTime() - result.run.priorityMs, - }); - return null; - } - } - - // Check max attempts that can optionally be set when triggering a run - let maxAttempts: number | null | undefined = result.run.maxAttempts; - - // If it's not set, we'll grab it from the task's retry config - if (!maxAttempts) { - const retryConfig = result.task.retryConfig; + const parsedConfig = RetryOptions.nullable().safeParse(retryConfig); - this.$.logger.debug( - "RunEngine.dequeueFromMasterQueue(): maxAttempts not set, using task's retry config", - { - runId, - task: result.task.id, - rawRetryConfig: retryConfig, + if (!parsedConfig.success) { + this.$.logger.error( + "RunEngine.dequeueFromMasterQueue(): Invalid retry config", + { + runId, + task: result.task.id, + rawRetryConfig: retryConfig, + } + ); } - ); - - const parsedConfig = RetryOptions.nullable().safeParse(retryConfig); - if (!parsedConfig.success) { - this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Invalid retry config", { - runId, - task: result.task.id, - rawRetryConfig: retryConfig, - }); + maxAttempts = parsedConfig.data?.maxAttempts; } - - maxAttempts = parsedConfig.data?.maxAttempts; - } - //update the run - const lockedTaskRun = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - lockedAt: new Date(), - lockedById: result.task.id, - lockedToVersionId: result.worker.id, - lockedQueueId: result.queue.id, - startedAt: result.run.startedAt ?? new Date(), - baseCostInCents: this.options.machines.baseCostInCents, - machinePreset: machinePreset.name, - taskVersion: result.worker.version, - sdkVersion: result.worker.sdkVersion, - cliVersion: result.worker.cliVersion, - maxDurationInSeconds: getMaxDuration( - result.run.maxDurationInSeconds, - result.task.maxDurationInSeconds - ), - maxAttempts: maxAttempts ?? undefined, - }, - include: { - runtimeEnvironment: true, - tags: true, - }, - }); - - if (!lockedTaskRun) { - this.$.logger.error("RunEngine.dequeueFromMasterQueue(): Failed to lock task run", { - taskRun: result.run.id, - taskIdentifier: result.run.taskIdentifier, - deployment: result.deployment?.id, - worker: result.worker.id, - task: result.task.id, - runId, + //update the run + const lockedTaskRun = await prisma.taskRun.update({ + where: { + id: runId, + }, + data: { + lockedAt: new Date(), + lockedById: result.task.id, + lockedToVersionId: result.worker.id, + lockedQueueId: result.queue.id, + startedAt: result.run.startedAt ?? new Date(), + baseCostInCents: this.options.machines.baseCostInCents, + machinePreset: machinePreset.name, + taskVersion: result.worker.version, + sdkVersion: result.worker.sdkVersion, + cliVersion: result.worker.cliVersion, + maxDurationInSeconds: getMaxDuration( + result.run.maxDurationInSeconds, + result.task.maxDurationInSeconds + ), + maxAttempts: maxAttempts ?? undefined, + }, + include: { + runtimeEnvironment: true, + tags: true, + }, }); - await this.$.runQueue.acknowledgeMessage(orgId, runId); - return null; - } + if (!lockedTaskRun) { + this.$.logger.error( + "RunEngine.dequeueFromMasterQueue(): Failed to lock task run", + { + taskRun: result.run.id, + taskIdentifier: result.run.taskIdentifier, + deployment: result.deployment?.id, + worker: result.worker.id, + task: result.task.id, + runId, + } + ); - const currentAttemptNumber = lockedTaskRun.attemptNumber ?? 0; - const nextAttemptNumber = currentAttemptNumber + 1; + await this.$.runQueue.acknowledgeMessage(orgId, runId); + return null; + } - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - prisma, - { - run: { - id: runId, - status: snapshot.runStatus, - attemptNumber: lockedTaskRun.attemptNumber, - }, + const currentAttemptNumber = lockedTaskRun.attemptNumber ?? 0; + const nextAttemptNumber = currentAttemptNumber + 1; + + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + prisma, + { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: lockedTaskRun.attemptNumber, + }, + snapshot: { + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + checkpointId: snapshot.checkpointId ?? undefined, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: snapshot.completedWaitpoints, + workerId, + runnerId, + } + ); + + return { + version: "1" as const, + dequeuedAt: new Date(), snapshot: { - executionStatus: "PENDING_EXECUTING", - description: "Run was dequeued for execution", + id: newSnapshot.id, + friendlyId: newSnapshot.friendlyId, + executionStatus: newSnapshot.executionStatus, + description: newSnapshot.description, }, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - checkpointId: snapshot.checkpointId ?? undefined, - batchId: snapshot.batchId ?? undefined, + image: result.deployment?.imageReference ?? undefined, + checkpoint: newSnapshot.checkpoint ?? undefined, completedWaitpoints: snapshot.completedWaitpoints, - workerId, - runnerId, - } - ); - - return { - version: "1" as const, - dequeuedAt: new Date(), - snapshot: { - id: newSnapshot.id, - friendlyId: newSnapshot.friendlyId, - executionStatus: newSnapshot.executionStatus, - description: newSnapshot.description, - }, - image: result.deployment?.imageReference ?? undefined, - checkpoint: newSnapshot.checkpoint ?? undefined, - completedWaitpoints: snapshot.completedWaitpoints, - backgroundWorker: { - id: result.worker.id, - friendlyId: result.worker.friendlyId, - version: result.worker.version, - }, - deployment: { - id: result.deployment?.id, - friendlyId: result.deployment?.friendlyId, - }, - run: { - id: lockedTaskRun.id, - friendlyId: lockedTaskRun.friendlyId, - isTest: lockedTaskRun.isTest, - machine: machinePreset, - attemptNumber: nextAttemptNumber, - masterQueue: lockedTaskRun.masterQueue, - traceContext: lockedTaskRun.traceContext as Record, - }, - environment: { - id: lockedTaskRun.runtimeEnvironment.id, - type: lockedTaskRun.runtimeEnvironment.type, - }, - organization: { - id: orgId, - }, - project: { - id: lockedTaskRun.projectId, - }, - } satisfies DequeuedMessage; - }); + backgroundWorker: { + id: result.worker.id, + friendlyId: result.worker.friendlyId, + version: result.worker.version, + }, + deployment: { + id: result.deployment?.id, + friendlyId: result.deployment?.friendlyId, + }, + run: { + id: lockedTaskRun.id, + friendlyId: lockedTaskRun.friendlyId, + isTest: lockedTaskRun.isTest, + machine: machinePreset, + attemptNumber: nextAttemptNumber, + masterQueue: lockedTaskRun.masterQueue, + traceContext: lockedTaskRun.traceContext as Record, + }, + environment: { + id: lockedTaskRun.runtimeEnvironment.id, + type: lockedTaskRun.runtimeEnvironment.type, + }, + organization: { + id: orgId, + }, + project: { + id: lockedTaskRun.projectId, + }, + } satisfies DequeuedMessage; + } + ); if (dequeuedRun !== null) { dequeuedRuns.push(dequeuedRun); @@ -515,7 +526,7 @@ export class DequeueSystem { this.$.tracer, "#pendingVersion", async (span) => { - return this.$.runLock.lock([runId], 5_000, async (signal) => { + return this.$.runLock.lock("pendingVersion", [runId], 5_000, async (signal) => { //mark run as waiting for deploy const run = await prisma.taskRun.update({ where: { id: runId }, diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index a702e516f3..086431821a 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -54,7 +54,7 @@ export class EnqueueSystem { }) { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock([run.id], 5000, async () => { + return await this.$.runLock.lock("enqueueRun", [run.id], 5000, async () => { const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: run, snapshot: { diff --git a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts index 5aa1fa6ffc..9cd721d1a1 100644 --- a/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts +++ b/internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts @@ -197,71 +197,76 @@ export class ReleaseConcurrencySystem { // - Get latest snapshot // - If the run is non suspended or going to be, then bail // - If the run is suspended or going to be, then release the concurrency - return await this.$.runLock.lock([snapshot.runId], 5_000, async () => { - const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId); - - const isValidSnapshot = - latestSnapshot.id === snapshot.id || - // Case 2: The provided snapshotId matches the previous snapshot - // AND we're in SUSPENDED state (which is valid) - (latestSnapshot.previousSnapshotId === snapshot.id && - latestSnapshot.executionStatus === "SUSPENDED"); - - if (!isValidSnapshot) { - this.$.logger.error("Tried to release concurrency on an invalid snapshot", { - latestSnapshot, - snapshot, - }); - - return false; - } + return await this.$.runLock.lock( + "executeReleaseConcurrencyForSnapshot", + [snapshot.runId], + 5_000, + async () => { + const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId); + + const isValidSnapshot = + latestSnapshot.id === snapshot.id || + // Case 2: The provided snapshotId matches the previous snapshot + // AND we're in SUSPENDED state (which is valid) + (latestSnapshot.previousSnapshotId === snapshot.id && + latestSnapshot.executionStatus === "SUSPENDED"); + + if (!isValidSnapshot) { + this.$.logger.error("Tried to release concurrency on an invalid snapshot", { + latestSnapshot, + snapshot, + }); + + return false; + } - if (!canReleaseConcurrency(latestSnapshot.executionStatus)) { - this.$.logger.debug("Run is not in a state to release concurrency", { - runId: snapshot.runId, - snapshot: latestSnapshot, - }); + if (!canReleaseConcurrency(latestSnapshot.executionStatus)) { + this.$.logger.debug("Run is not in a state to release concurrency", { + runId: snapshot.runId, + snapshot: latestSnapshot, + }); - return false; - } + return false; + } - const metadata = this.#parseMetadata(snapshot.metadata); + const metadata = this.#parseMetadata(snapshot.metadata); - if (typeof metadata.releaseConcurrency === "boolean") { - if (metadata.releaseConcurrency) { - await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + if (typeof metadata.releaseConcurrency === "boolean") { + if (metadata.releaseConcurrency) { + await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + + return true; + } + + await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); return true; } - await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); - - return true; - } + // Get the locked queue + const taskQueue = snapshot.run.lockedQueueId + ? await this.$.prisma.taskQueue.findFirst({ + where: { + id: snapshot.run.lockedQueueId, + }, + }) + : undefined; + + if ( + taskQueue && + (typeof taskQueue.concurrencyLimit === "undefined" || + taskQueue.releaseConcurrencyOnWaitpoint) + ) { + await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); - // Get the locked queue - const taskQueue = snapshot.run.lockedQueueId - ? await this.$.prisma.taskQueue.findFirst({ - where: { - id: snapshot.run.lockedQueueId, - }, - }) - : undefined; + return true; + } - if ( - taskQueue && - (typeof taskQueue.concurrencyLimit === "undefined" || - taskQueue.releaseConcurrencyOnWaitpoint) - ) { - await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId); + await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); return true; } - - await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId); - - return true; - }); + ); } #parseMetadata(metadata?: unknown): ReleaseConcurrencyMetadata { diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 76b97c7d60..dcb40d995d 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -76,7 +76,7 @@ export class RunAttemptSystem { this.$.tracer, "startRunAttempt", async (span) => { - return this.$.runLock.lock([runId], 5000, async () => { + return this.$.runLock.lock("startRunAttempt", [runId], 5000, async () => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -412,7 +412,7 @@ export class RunAttemptSystem { this.$.tracer, "#completeRunAttemptSuccess", async (span) => { - return this.$.runLock.lock([runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptSucceeded", [runId], 5_000, async (signal) => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -546,7 +546,7 @@ export class RunAttemptSystem { this.$.tracer, "completeRunAttemptFailure", async (span) => { - return this.$.runLock.lock([runId], 5_000, async (signal) => { + return this.$.runLock.lock("attemptFailed", [runId], 5_000, async (signal) => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); if (latestSnapshot.id !== snapshotId) { @@ -850,7 +850,7 @@ export class RunAttemptSystem { }): Promise<{ wasRequeued: boolean } & ExecutionResult> { const prisma = tx ?? this.$.prisma; - return await this.$.runLock.lock([run.id], 5000, async (signal) => { + return await this.$.runLock.lock("tryNackAndRequeue", [run.id], 5000, async (signal) => { //we nack the message, this allows another work to pick up the run const gotRequeued = await this.$.runQueue.nackMessage({ orgId, @@ -926,7 +926,7 @@ export class RunAttemptSystem { reason = reason ?? "Cancelled by user"; return startSpan(this.$.tracer, "cancelRun", async (span) => { - return this.$.runLock.lock([runId], 5_000, async (signal) => { + return this.$.runLock.lock("cancelRun", [runId], 5_000, async (signal) => { const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId); //already finished, do nothing diff --git a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts index f020fe2b3c..5b40277700 100644 --- a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts @@ -24,7 +24,7 @@ export class TtlSystem { async expireRun({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) { const prisma = tx ?? this.$.prisma; - await this.$.runLock.lock([runId], 5_000, async () => { + await this.$.runLock.lock("expireRun", [runId], 5_000, async () => { const snapshot = await getLatestExecutionSnapshot(prisma, runId); //if we're executing then we won't expire the run diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 425ae8262d..80075b04ca 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -358,7 +358,7 @@ export class WaitpointSystem { let $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints; - return await this.$.runLock.lock([runId], 5000, async () => { + return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], 5000, async () => { let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId); //block the run with the waitpoints, returning how many waitpoints are pending @@ -509,7 +509,7 @@ export class WaitpointSystem { } //4. Continue the run whether it's executing or not - await this.$.runLock.lock([runId], 5000, async () => { + await this.$.runLock.lock("continueRunIfUnblocked", [runId], 5000, async () => { const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); //run is still executing, send a message to the worker diff --git a/internal-packages/run-engine/src/engine/tests/locking.test.ts b/internal-packages/run-engine/src/engine/tests/locking.test.ts index 17831c2c38..bdb5f61242 100644 --- a/internal-packages/run-engine/src/engine/tests/locking.test.ts +++ b/internal-packages/run-engine/src/engine/tests/locking.test.ts @@ -2,16 +2,23 @@ import { createRedisClient } from "@internal/redis"; import { redisTest } from "@internal/testcontainers"; import { expect } from "vitest"; import { RunLocker } from "../locking.js"; +import { trace } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; describe("RunLocker", () => { redisTest("Test acquiring a lock works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); try { - const runLock = new RunLocker({ redis }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ + redis, + logger, + tracer: trace.getTracer("RunLockTest"), + }); expect(runLock.isInsideLock()).toBe(false); - await runLock.lock(["test-1"], 5000, async (signal) => { + await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { expect(signal).toBeDefined(); expect(runLock.isInsideLock()).toBe(true); }); @@ -25,16 +32,17 @@ describe("RunLocker", () => { redisTest("Test double locking works", { timeout: 15_000 }, async ({ redisOptions }) => { const redis = createRedisClient(redisOptions); try { - const runLock = new RunLocker({ redis }); + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); expect(runLock.isInsideLock()).toBe(false); - await runLock.lock(["test-1"], 5000, async (signal) => { + await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { expect(signal).toBeDefined(); expect(runLock.isInsideLock()).toBe(true); //should be able to "lock it again" - await runLock.lock(["test-1"], 5000, async (signal) => { + await runLock.lock("test-lock", ["test-1"], 5000, async (signal) => { expect(signal).toBeDefined(); expect(runLock.isInsideLock()).toBe(true); }); @@ -45,4 +53,180 @@ describe("RunLocker", () => { await redis.quit(); } }); + + redisTest( + "Test lock throws when callback throws", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + try { + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + + expect(runLock.isInsideLock()).toBe(false); + + await expect( + runLock.lock("test-lock", ["test-1"], 5000, async () => { + throw new Error("Test error"); + }) + ).rejects.toThrow("Test error"); + + // Verify the lock was released + expect(runLock.isInsideLock()).toBe(false); + } finally { + await redis.quit(); + } + } + ); + + redisTest( + "Test nested lock throws when inner callback throws", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + try { + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + + expect(runLock.isInsideLock()).toBe(false); + + await expect( + runLock.lock("test-lock", ["test-1"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + + // Nested lock with same resource + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + expect(runLock.isInsideLock()).toBe(true); + throw new Error("Inner lock error"); + }); + }) + ).rejects.toThrow("Inner lock error"); + + // Verify all locks were released + expect(runLock.isInsideLock()).toBe(false); + } finally { + await redis.quit(); + } + } + ); + + redisTest("Test lock throws when it times out", { timeout: 15_000 }, async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + try { + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + + // First, ensure we can acquire the lock normally + let firstLockAcquired = false; + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + firstLockAcquired = true; + }); + //wait for 20ms + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(firstLockAcquired).toBe(true); + + // Now create a long-running lock + const lockPromise1 = runLock.lock("test-lock", ["test-1"], 5000, async () => { + // Hold the lock longer than all possible retry attempts + // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) + await new Promise((resolve) => setTimeout(resolve, 5000)); + }); + + // Try to acquire same lock immediately + await expect( + runLock.lock("test-lock", ["test-1"], 5000, async () => { + // This should never execute + expect(true).toBe(false); + }) + ).rejects.toThrow("unable to achieve a quorum"); + + // Complete the first lock + await lockPromise1; + + // Verify final state + expect(runLock.isInsideLock()).toBe(false); + } finally { + await redis.quit(); + } + }); + + redisTest( + "Test nested lock with same resources doesn't timeout", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + try { + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + // First lock acquired + expect(runLock.isInsideLock()).toBe(true); + + // Try to acquire the same resource with a very short timeout + // This should work because we already hold the lock + await runLock.lock("test-lock", ["test-1"], 100, async () => { + expect(runLock.isInsideLock()).toBe(true); + // Wait longer than the timeout to prove it doesn't matter + await new Promise((resolve) => setTimeout(resolve, 500)); + }); + }); + + // Verify final state + expect(runLock.isInsideLock()).toBe(false); + } finally { + await redis.quit(); + } + } + ); + + redisTest( + "Test nested lock with same resource works regardless of retries", + { timeout: 15_000 }, + async ({ redisOptions }) => { + const redis = createRedisClient(redisOptions); + try { + const logger = new Logger("RunLockTest", "debug"); + const runLock = new RunLocker({ redis, logger, tracer: trace.getTracer("RunLockTest") }); + + // First verify we can acquire the lock normally + let firstLockAcquired = false; + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + firstLockAcquired = true; + }); + expect(firstLockAcquired).toBe(true); + + // Now test the nested lock behavior + let outerLockExecuted = false; + let innerLockExecuted = false; + + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + outerLockExecuted = true; + expect(runLock.isInsideLock()).toBe(true); + expect(runLock.getCurrentResources()).toBe("test-1"); + + // Try to acquire the same resource in a nested lock + // This should work immediately without any retries + // because we already hold the lock + await runLock.lock("test-lock", ["test-1"], 5000, async () => { + innerLockExecuted = true; + expect(runLock.isInsideLock()).toBe(true); + expect(runLock.getCurrentResources()).toBe("test-1"); + + // Sleep longer than retry attempts would take + // (10 retries * (200ms delay + 200ms max jitter) = ~4000ms max) + await new Promise((resolve) => setTimeout(resolve, 5000)); + }); + }); + + // Verify both locks executed + expect(outerLockExecuted).toBe(true); + expect(innerLockExecuted).toBe(true); + expect(runLock.isInsideLock()).toBe(false); + } finally { + await redis.quit(); + } + } + ); }); diff --git a/internal-packages/run-engine/tsconfig.test.json b/internal-packages/run-engine/tsconfig.test.json index b68d234bd7..b4f627aff1 100644 --- a/internal-packages/run-engine/tsconfig.test.json +++ b/internal-packages/run-engine/tsconfig.test.json @@ -15,6 +15,7 @@ "isolatedModules": true, "preserveWatchOutput": true, "skipLibCheck": true, - "strict": true + "strict": true, + "customConditions": ["@triggerdotdev/source"] } }