diff --git a/.changeset/tiny-buckets-teach.md b/.changeset/tiny-buckets-teach.md new file mode 100644 index 0000000000..fdf3ae3a94 --- /dev/null +++ b/.changeset/tiny-buckets-teach.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix stalled run detection diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index b9a085809d..19c8718cce 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10); for await (const _ of setInterval(heartbeatInterval)) { if (_isRunning && _execution) { try { - await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id }); + await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id }); } catch (err) { console.error("Failed to send HEARTBEAT message", err); } diff --git a/packages/cli-v3/src/entryPoints/managed/env.ts b/packages/cli-v3/src/entryPoints/managed/env.ts index 1355f68d82..8f03968084 100644 --- a/packages/cli-v3/src/entryPoints/managed/env.ts +++ b/packages/cli-v3/src/entryPoints/managed/env.ts @@ -43,7 +43,7 @@ const Env = z.object({ TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), TRIGGER_WORKER_INSTANCE_NAME: z.string(), - TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(20), TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0), TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1), diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index bfefeee27f..4b8a11866f 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js"; import { RunnerEnv } from "./env.js"; import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers"; import { setTimeout as sleep } from "timers/promises"; -import { RunExecutionHeartbeat } from "./heartbeat.js"; import { RunExecutionSnapshotPoller } from "./poller.js"; import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils"; import { MetadataClient } from "./overrides.js"; @@ -63,9 +62,10 @@ export class RunExecution { private restoreCount: number; private taskRunProcess?: TaskRunProcess; - private runHeartbeat?: RunExecutionHeartbeat; private snapshotPoller?: RunExecutionSnapshotPoller; + private lastHeartbeat?: Date; + constructor(opts: RunExecutionOptions) { this.id = randomBytes(4).toString("hex"); this.workerManifest = opts.workerManifest; @@ -105,11 +105,12 @@ export class RunExecution { envVars: Record; isWarmStart?: boolean; }) { - return new TaskRunProcess({ + const taskRunProcess = new TaskRunProcess({ workerManifest: this.workerManifest, env: { ...envVars, ...this.env.gatherProcessEnv(), + HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000), }, serverWorker: { id: "managed", @@ -123,6 +124,29 @@ export class RunExecution { }, isWarmStart, }).initialize(); + + taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => { + if (!this.runFriendlyId) { + this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId }); + return; + } + + if (runId !== this.runFriendlyId) { + this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", { + heartbeatRunId: runId, + expectedRunId: this.runFriendlyId, + }); + return; + } + + const [error] = await tryCatch(this.onHeartbeat()); + + if (error) { + this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message }); + } + }); + + return taskRunProcess; } /** @@ -229,7 +253,6 @@ export class RunExecution { this.currentSnapshotId = snapshot.friendlyId; // Update services - this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId); this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId); switch (snapshot.executionStatus) { @@ -450,13 +473,6 @@ export class RunExecution { this.podScheduledAt = runOpts.podScheduledAt; // Create and start services - this.runHeartbeat = new RunExecutionHeartbeat({ - runFriendlyId: this.runFriendlyId, - snapshotFriendlyId: this.currentSnapshotId, - httpClient: this.httpClient, - logger: this.logger, - heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS, - }); this.snapshotPoller = new RunExecutionSnapshotPoller({ runFriendlyId: this.runFriendlyId, snapshotFriendlyId: this.currentSnapshotId, @@ -466,7 +482,6 @@ export class RunExecution { handleSnapshotChange: this.handleSnapshotChange.bind(this), }); - this.runHeartbeat.start(); this.snapshotPoller.start(); const [startError, start] = await tryCatch( @@ -839,9 +854,6 @@ export class RunExecution { this.env.override(overrides); // Update services with new values - if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { - this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000); - } if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000); } @@ -857,6 +869,28 @@ export class RunExecution { } } + private async onHeartbeat() { + if (!this.runFriendlyId) { + this.sendDebugLog("Heartbeat: missing run ID"); + return; + } + + if (!this.currentSnapshotId) { + this.sendDebugLog("Heartbeat: missing snapshot ID"); + return; + } + + this.sendDebugLog("Heartbeat: started"); + + const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId); + + if (!response.success) { + this.sendDebugLog("Heartbeat: failed", { error: response.error }); + } + + this.lastHeartbeat = new Date(); + } + sendDebugLog( message: string, properties?: SendDebugLogOptions["properties"], @@ -871,6 +905,7 @@ export class RunExecution { snapshotId: this.currentSnapshotId, executionId: this.id, executionRestoreCount: this.restoreCount, + lastHeartbeat: this.lastHeartbeat?.toISOString(), }, }); } @@ -917,7 +952,7 @@ export class RunExecution { } private stopServices() { - this.runHeartbeat?.stop(); this.snapshotPoller?.stop(); + this.taskRunProcess?.onTaskRunHeartbeat.detach(); } } diff --git a/packages/cli-v3/src/entryPoints/managed/heartbeat.ts b/packages/cli-v3/src/entryPoints/managed/heartbeat.ts deleted file mode 100644 index 3b3c820c91..0000000000 --- a/packages/cli-v3/src/entryPoints/managed/heartbeat.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { IntervalService } from "@trigger.dev/core/v3"; -import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker"; -import { RunLogger } from "./logger.js"; - -export type RunExecutionHeartbeatOptions = { - runFriendlyId: string; - snapshotFriendlyId: string; - httpClient: WorkloadHttpClient; - logger: RunLogger; - heartbeatIntervalSeconds: number; -}; - -export class RunExecutionHeartbeat { - private readonly runFriendlyId: string; - private snapshotFriendlyId: string; - - private readonly httpClient: WorkloadHttpClient; - private readonly logger: RunLogger; - private readonly heartbeatIntervalMs: number; - private readonly heartbeat: IntervalService; - - constructor(opts: RunExecutionHeartbeatOptions) { - this.runFriendlyId = opts.runFriendlyId; - this.snapshotFriendlyId = opts.snapshotFriendlyId; - this.httpClient = opts.httpClient; - this.logger = opts.logger; - this.heartbeatIntervalMs = opts.heartbeatIntervalSeconds * 1000; - - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "RunExecutionHeartbeat", - properties: { - runFriendlyId: this.runFriendlyId, - snapshotFriendlyId: this.snapshotFriendlyId, - heartbeatIntervalSeconds: opts.heartbeatIntervalSeconds, - }, - }); - - this.heartbeat = new IntervalService({ - onInterval: async () => { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "heartbeat: started", - }); - - const response = await this.httpClient.heartbeatRun( - this.runFriendlyId, - this.snapshotFriendlyId - ); - - if (!response.success) { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "heartbeat: failed", - properties: { - error: response.error, - }, - }); - } - }, - intervalMs: this.heartbeatIntervalMs, - leadingEdge: false, - onError: async (error) => { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: "Failed to send heartbeat", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - }, - }); - } - - resetCurrentInterval() { - this.heartbeat.resetCurrentInterval(); - } - - updateSnapshotId(snapshotFriendlyId: string) { - this.snapshotFriendlyId = snapshotFriendlyId; - } - - updateInterval(intervalMs: number) { - this.heartbeat.updateInterval(intervalMs); - } - - start() { - this.heartbeat.start(); - } - - stop() { - this.heartbeat.stop(); - } -}