Skip to content

Fix stalled run detection #1934

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tiny-buckets-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Fix stalled run detection
2 changes: 1 addition & 1 deletion packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
for await (const _ of setInterval(heartbeatInterval)) {
if (_isRunning && _execution) {
try {
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
} catch (err) {
console.error("Failed to send HEARTBEAT message", err);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-v3/src/entryPoints/managed/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const Env = z.object({
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(),
TRIGGER_WORKER_INSTANCE_NAME: z.string(),
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(20),
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5),
TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0),
TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1),
Expand Down
67 changes: 51 additions & 16 deletions packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { RunnerEnv } from "./env.js";
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
import { setTimeout as sleep } from "timers/promises";
import { RunExecutionHeartbeat } from "./heartbeat.js";
import { RunExecutionSnapshotPoller } from "./poller.js";
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
import { MetadataClient } from "./overrides.js";
Expand Down Expand Up @@ -63,9 +62,10 @@ export class RunExecution {
private restoreCount: number;

private taskRunProcess?: TaskRunProcess;
private runHeartbeat?: RunExecutionHeartbeat;
private snapshotPoller?: RunExecutionSnapshotPoller;

private lastHeartbeat?: Date;

constructor(opts: RunExecutionOptions) {
this.id = randomBytes(4).toString("hex");
this.workerManifest = opts.workerManifest;
Expand Down Expand Up @@ -105,11 +105,12 @@ export class RunExecution {
envVars: Record<string, string>;
isWarmStart?: boolean;
}) {
return new TaskRunProcess({
const taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: {
...envVars,
...this.env.gatherProcessEnv(),
HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000),
},
serverWorker: {
id: "managed",
Expand All @@ -123,6 +124,29 @@ export class RunExecution {
},
isWarmStart,
}).initialize();

taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
if (!this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
return;
}

if (runId !== this.runFriendlyId) {
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
heartbeatRunId: runId,
expectedRunId: this.runFriendlyId,
});
return;
}

const [error] = await tryCatch(this.onHeartbeat());

if (error) {
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
}
});

return taskRunProcess;
}

/**
Expand Down Expand Up @@ -229,7 +253,6 @@ export class RunExecution {
this.currentSnapshotId = snapshot.friendlyId;

// Update services
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);

switch (snapshot.executionStatus) {
Expand Down Expand Up @@ -450,13 +473,6 @@ export class RunExecution {
this.podScheduledAt = runOpts.podScheduledAt;

// Create and start services
this.runHeartbeat = new RunExecutionHeartbeat({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
httpClient: this.httpClient,
logger: this.logger,
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
});
this.snapshotPoller = new RunExecutionSnapshotPoller({
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.currentSnapshotId,
Expand All @@ -466,7 +482,6 @@ export class RunExecution {
handleSnapshotChange: this.handleSnapshotChange.bind(this),
});

this.runHeartbeat.start();
this.snapshotPoller.start();

const [startError, start] = await tryCatch(
Expand Down Expand Up @@ -839,9 +854,6 @@ export class RunExecution {
this.env.override(overrides);

// Update services with new values
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
}
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
}
Expand All @@ -857,6 +869,28 @@ export class RunExecution {
}
}

private async onHeartbeat() {
if (!this.runFriendlyId) {
this.sendDebugLog("Heartbeat: missing run ID");
return;
}

if (!this.currentSnapshotId) {
this.sendDebugLog("Heartbeat: missing snapshot ID");
return;
}

this.sendDebugLog("Heartbeat: started");

const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);

if (!response.success) {
this.sendDebugLog("Heartbeat: failed", { error: response.error });
}

this.lastHeartbeat = new Date();
}

sendDebugLog(
message: string,
properties?: SendDebugLogOptions["properties"],
Expand All @@ -871,6 +905,7 @@ export class RunExecution {
snapshotId: this.currentSnapshotId,
executionId: this.id,
executionRestoreCount: this.restoreCount,
lastHeartbeat: this.lastHeartbeat?.toISOString(),
},
});
}
Expand Down Expand Up @@ -917,7 +952,7 @@ export class RunExecution {
}

private stopServices() {
this.runHeartbeat?.stop();
this.snapshotPoller?.stop();
this.taskRunProcess?.onTaskRunHeartbeat.detach();
}
}
92 changes: 0 additions & 92 deletions packages/cli-v3/src/entryPoints/managed/heartbeat.ts

This file was deleted.

Loading