Skip to content

Commit 8b182ce

Browse files
authored
Fix stalled run detection (#1934)
* drive run heartbeats from child process * track last heartbeat * add changeset * use v4-specific heartbeat interval env var and decrease to 20s * delete old heartbeat service * remove heartbeat handler during execution cleanup
1 parent 1f6a283 commit 8b182ce

File tree

5 files changed

+58
-110
lines changed

5 files changed

+58
-110
lines changed

.changeset/tiny-buckets-teach.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Fix stalled run detection

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
576576
for await (const _ of setInterval(heartbeatInterval)) {
577577
if (_isRunning && _execution) {
578578
try {
579-
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
579+
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
580580
} catch (err) {
581581
console.error("Failed to send HEARTBEAT message", err);
582582
}

packages/cli-v3/src/entryPoints/managed/env.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const Env = z.object({
4343
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
4444
TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(),
4545
TRIGGER_WORKER_INSTANCE_NAME: z.string(),
46-
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
46+
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(20),
4747
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5),
4848
TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0),
4949
TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1),

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
1414
import { RunnerEnv } from "./env.js";
1515
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
1616
import { setTimeout as sleep } from "timers/promises";
17-
import { RunExecutionHeartbeat } from "./heartbeat.js";
1817
import { RunExecutionSnapshotPoller } from "./poller.js";
1918
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
2019
import { MetadataClient } from "./overrides.js";
@@ -63,9 +62,10 @@ export class RunExecution {
6362
private restoreCount: number;
6463

6564
private taskRunProcess?: TaskRunProcess;
66-
private runHeartbeat?: RunExecutionHeartbeat;
6765
private snapshotPoller?: RunExecutionSnapshotPoller;
6866

67+
private lastHeartbeat?: Date;
68+
6969
constructor(opts: RunExecutionOptions) {
7070
this.id = randomBytes(4).toString("hex");
7171
this.workerManifest = opts.workerManifest;
@@ -105,11 +105,12 @@ export class RunExecution {
105105
envVars: Record<string, string>;
106106
isWarmStart?: boolean;
107107
}) {
108-
return new TaskRunProcess({
108+
const taskRunProcess = new TaskRunProcess({
109109
workerManifest: this.workerManifest,
110110
env: {
111111
...envVars,
112112
...this.env.gatherProcessEnv(),
113+
HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000),
113114
},
114115
serverWorker: {
115116
id: "managed",
@@ -123,6 +124,29 @@ export class RunExecution {
123124
},
124125
isWarmStart,
125126
}).initialize();
127+
128+
taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
129+
if (!this.runFriendlyId) {
130+
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
131+
return;
132+
}
133+
134+
if (runId !== this.runFriendlyId) {
135+
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
136+
heartbeatRunId: runId,
137+
expectedRunId: this.runFriendlyId,
138+
});
139+
return;
140+
}
141+
142+
const [error] = await tryCatch(this.onHeartbeat());
143+
144+
if (error) {
145+
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
146+
}
147+
});
148+
149+
return taskRunProcess;
126150
}
127151

128152
/**
@@ -229,7 +253,6 @@ export class RunExecution {
229253
this.currentSnapshotId = snapshot.friendlyId;
230254

231255
// Update services
232-
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
233256
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);
234257

235258
switch (snapshot.executionStatus) {
@@ -450,13 +473,6 @@ export class RunExecution {
450473
this.podScheduledAt = runOpts.podScheduledAt;
451474

452475
// Create and start services
453-
this.runHeartbeat = new RunExecutionHeartbeat({
454-
runFriendlyId: this.runFriendlyId,
455-
snapshotFriendlyId: this.currentSnapshotId,
456-
httpClient: this.httpClient,
457-
logger: this.logger,
458-
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
459-
});
460476
this.snapshotPoller = new RunExecutionSnapshotPoller({
461477
runFriendlyId: this.runFriendlyId,
462478
snapshotFriendlyId: this.currentSnapshotId,
@@ -466,7 +482,6 @@ export class RunExecution {
466482
handleSnapshotChange: this.handleSnapshotChange.bind(this),
467483
});
468484

469-
this.runHeartbeat.start();
470485
this.snapshotPoller.start();
471486

472487
const [startError, start] = await tryCatch(
@@ -839,9 +854,6 @@ export class RunExecution {
839854
this.env.override(overrides);
840855

841856
// Update services with new values
842-
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
843-
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
844-
}
845857
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
846858
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
847859
}
@@ -857,6 +869,28 @@ export class RunExecution {
857869
}
858870
}
859871

872+
private async onHeartbeat() {
873+
if (!this.runFriendlyId) {
874+
this.sendDebugLog("Heartbeat: missing run ID");
875+
return;
876+
}
877+
878+
if (!this.currentSnapshotId) {
879+
this.sendDebugLog("Heartbeat: missing snapshot ID");
880+
return;
881+
}
882+
883+
this.sendDebugLog("Heartbeat: started");
884+
885+
const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);
886+
887+
if (!response.success) {
888+
this.sendDebugLog("Heartbeat: failed", { error: response.error });
889+
}
890+
891+
this.lastHeartbeat = new Date();
892+
}
893+
860894
sendDebugLog(
861895
message: string,
862896
properties?: SendDebugLogOptions["properties"],
@@ -871,6 +905,7 @@ export class RunExecution {
871905
snapshotId: this.currentSnapshotId,
872906
executionId: this.id,
873907
executionRestoreCount: this.restoreCount,
908+
lastHeartbeat: this.lastHeartbeat?.toISOString(),
874909
},
875910
});
876911
}
@@ -917,7 +952,7 @@ export class RunExecution {
917952
}
918953

919954
private stopServices() {
920-
this.runHeartbeat?.stop();
921955
this.snapshotPoller?.stop();
956+
this.taskRunProcess?.onTaskRunHeartbeat.detach();
922957
}
923958
}

packages/cli-v3/src/entryPoints/managed/heartbeat.ts

Lines changed: 0 additions & 92 deletions
This file was deleted.

0 commit comments

Comments
 (0)