Skip to content

Fix managed run controller edge cases #1987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/tidy-books-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

- Fix polling interval reset bug that could create duplicate intervals
- Protect against unexpected attempt number changes
- Prevent run execution zombies after warm starts
39 changes: 36 additions & 3 deletions packages/cli-v3/src/entryPoints/managed/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,17 @@ export class ManagedRunController {
}

const execution = async () => {
if (!this.currentExecution || !this.currentExecution.isPreparedForNextRun) {
// If we have an existing execution that isn't prepared for the next run, kill it
if (this.currentExecution && !this.currentExecution.canExecute) {
this.sendDebugLog({
runId: runFriendlyId,
message: "killing existing execution before starting new run",
});
await this.currentExecution.kill().catch(() => {});
this.currentExecution = null;
}

if (!this.currentExecution || !this.currentExecution.canExecute) {
this.currentExecution = new RunExecution({
workerManifest: this.workerManifest,
env: this.env,
Expand Down Expand Up @@ -267,11 +277,12 @@ export class ManagedRunController {
if (this.currentExecution?.taskRunEnv) {
this.sendDebugLog({
runId: this.runFriendlyId,
message: "waitForNextRun: eagerly recreating task run process",
message: "waitForNextRun: eagerly creating fresh execution for next run",
});

const previousTaskRunEnv = this.currentExecution.taskRunEnv;

// Create a fresh execution for the next run
this.currentExecution = new RunExecution({
workerManifest: this.workerManifest,
env: this.env,
Expand Down Expand Up @@ -486,10 +497,32 @@ export class ManagedRunController {
});

socket.on("disconnect", (reason, description) => {
const parseDescription = ():
| {
description: string;
context?: string;
}
| undefined => {
if (!description) {
return undefined;
}

if (description instanceof Error) {
return {
description: description.toString(),
};
}

return {
description: description.description,
context: description.context ? String(description.context) : undefined,
};
};

this.sendDebugLog({
runId: this.runFriendlyId,
message: "Socket disconnected from supervisor",
properties: { reason, description: description?.toString() },
properties: { reason, ...parseDescription() },
});
});

Expand Down
66 changes: 53 additions & 13 deletions packages/cli-v3/src/entryPoints/managed/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class RunExecution {

private _runFriendlyId?: string;
private currentSnapshotId?: string;
private currentAttemptNumber?: number;
private currentTaskRunEnv?: Record<string, string>;

private dequeuedAt?: Date;
Expand All @@ -65,6 +66,7 @@ export class RunExecution {
private snapshotPoller?: RunExecutionSnapshotPoller;

private lastHeartbeat?: Date;
private isShuttingDown = false;

constructor(opts: RunExecutionOptions) {
this.id = randomBytes(4).toString("hex");
Expand All @@ -86,10 +88,6 @@ export class RunExecution {
throw new Error("prepareForExecution called after process was already created");
}

if (this.isPreparedForNextRun) {
throw new Error("prepareForExecution called after execution was already prepared");
}

this.taskRunProcess = this.createTaskRunProcess({
envVars: opts.taskRunEnv,
isWarmStart: true,
Expand Down Expand Up @@ -150,9 +148,14 @@ export class RunExecution {
}

/**
* Returns true if the execution has been prepared with task run env.
* Returns true if no run has been started yet and the process is prepared for the next run.
*/
get isPreparedForNextRun(): boolean {
get canExecute(): boolean {
// If we've ever had a run ID, this execution can't be reused
if (this._runFriendlyId) {
return false;
}

return !!this.taskRunProcess?.isPreparedForNextRun;
}

Expand All @@ -161,6 +164,11 @@ export class RunExecution {
* or when the snapshot poller detects a change
*/
public async handleSnapshotChange(runData: RunExecutionData): Promise<void> {
if (this.isShuttingDown) {
this.sendDebugLog("handleSnapshotChange: shutting down, skipping");
return;
}

const { run, snapshot, completedWaitpoints } = runData;

const snapshotMetadata = {
Expand Down Expand Up @@ -191,8 +199,6 @@ export class RunExecution {
return;
}

this.sendDebugLog(`enqueued snapshot change: ${snapshot.executionStatus}`, snapshotMetadata);

this.snapshotChangeQueue.push(runData);
await this.processSnapshotChangeQueue();
}
Expand Down Expand Up @@ -240,11 +246,16 @@ export class RunExecution {
}

if (snapshot.friendlyId === this.currentSnapshotId) {
this.sendDebugLog("handleSnapshotChange: snapshot not changed", snapshotMetadata);
return;
}

this.sendDebugLog(`snapshot change: ${snapshot.executionStatus}`, snapshotMetadata);
if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) {
this.sendDebugLog("ERROR: attempt number mismatch", snapshotMetadata);
await this.taskRunProcess?.suspend();
return;
}

this.sendDebugLog(`snapshot has changed to: ${snapshot.executionStatus}`, snapshotMetadata);

// Reset the snapshot poll interval so we don't do unnecessary work
this.snapshotPoller?.resetCurrentInterval();
Expand Down Expand Up @@ -456,6 +467,16 @@ export class RunExecution {
// A snapshot was just created, so update the snapshot ID
this.currentSnapshotId = start.data.snapshot.friendlyId;

// Also set or update the attempt number - we do this to detect illegal attempt number changes, e.g. from stalled runners coming back online
const attemptNumber = start.data.run.attemptNumber;
if (attemptNumber && attemptNumber > 0) {
this.currentAttemptNumber = attemptNumber;
} else {
this.sendDebugLog("ERROR: invalid attempt number returned from start attempt", {
attemptNumber: String(attemptNumber),
});
}

const metrics = this.measureExecutionMetrics({
attemptCreatedAt: attemptStartedAt,
dequeuedAt: this.dequeuedAt?.getTime(),
Expand Down Expand Up @@ -597,8 +618,18 @@ export class RunExecution {
metrics: TaskRunExecutionMetrics;
isWarmStart?: boolean;
}) {
// For immediate retries, we need to ensure the task run process is prepared for the next attempt
if (
this.runFriendlyId &&
this.taskRunProcess &&
!this.taskRunProcess.isPreparedForNextAttempt
) {
this.sendDebugLog("killing existing task run process before executing next attempt");
await this.kill().catch(() => {});
}

// To skip this step and eagerly create the task run process, run prepareForExecution first
if (!this.taskRunProcess || !this.isPreparedForNextRun) {
if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
this.taskRunProcess = this.createTaskRunProcess({ envVars, isWarmStart });
}

Expand Down Expand Up @@ -655,11 +686,15 @@ export class RunExecution {
}

public exit() {
if (this.isPreparedForNextRun) {
if (this.taskRunProcess?.isPreparedForNextRun) {
this.taskRunProcess?.forceExit();
}
}

public async kill() {
await this.taskRunProcess?.kill("SIGKILL");
}

private async complete({ completion }: { completion: TaskRunExecutionResult }): Promise<void> {
if (!this.runFriendlyId || !this.currentSnapshotId) {
throw new Error("Cannot complete run: missing run or snapshot ID");
Expand Down Expand Up @@ -897,7 +932,7 @@ export class RunExecution {
this.lastHeartbeat = new Date();
}

sendDebugLog(
private sendDebugLog(
message: string,
properties?: SendDebugLogOptions["properties"],
runIdOverride?: string
Expand Down Expand Up @@ -958,6 +993,11 @@ export class RunExecution {
}

private stopServices() {
if (this.isShuttingDown) {
return;
}

this.isShuttingDown = true;
this.snapshotPoller?.stop();
this.taskRunProcess?.onTaskRunHeartbeat.detach();
}
Expand Down
93 changes: 43 additions & 50 deletions packages/cli-v3/src/entryPoints/managed/poller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
import { RunLogger } from "./logger.js";
import { RunLogger, SendDebugLogOptions } from "./logger.js";
import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3";

export type RunExecutionSnapshotPollerOptions = {
Expand All @@ -14,89 +14,70 @@ export type RunExecutionSnapshotPollerOptions = {
export class RunExecutionSnapshotPoller {
private runFriendlyId: string;
private snapshotFriendlyId: string;
private enabled: boolean;

private readonly httpClient: WorkloadHttpClient;
private readonly logger: RunLogger;
private readonly snapshotPollIntervalMs: number;
private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise<void>;
private readonly poller: IntervalService;

constructor(opts: RunExecutionSnapshotPollerOptions) {
this.enabled = false;

this.runFriendlyId = opts.runFriendlyId;
this.snapshotFriendlyId = opts.snapshotFriendlyId;
this.httpClient = opts.httpClient;
this.logger = opts.logger;
this.snapshotPollIntervalMs = opts.snapshotPollIntervalSeconds * 1000;
this.handleSnapshotChange = opts.handleSnapshotChange;

this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: "RunExecutionSnapshotPoller",
properties: {
runFriendlyId: this.runFriendlyId,
snapshotFriendlyId: this.snapshotFriendlyId,
snapshotPollIntervalSeconds: opts.snapshotPollIntervalSeconds,
},
});
const intervalMs = opts.snapshotPollIntervalSeconds * 1000;

this.poller = new IntervalService({
onInterval: async () => {
if (!this.runFriendlyId) {
this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: "Skipping snapshot poll, no run ID",
});
if (!this.enabled) {
this.sendDebugLog("poller disabled, skipping snapshot change handler (pre)");
return;
}

this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: "Polling for latest snapshot",
});

this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: `snapshot poll: started`,
properties: {
snapshotId: this.snapshotFriendlyId,
},
});
this.sendDebugLog("polling for latest snapshot");

const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);

if (!response.success) {
this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: "Snapshot poll failed",
properties: {
error: response.error,
},
});

this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: `snapshot poll: failed`,
properties: {
snapshotId: this.snapshotFriendlyId,
error: response.error,
},
});
this.sendDebugLog("failed to get run execution data", { error: response.error });
return;
}

if (!this.enabled) {
this.sendDebugLog("poller disabled, skipping snapshot change handler (post)");
return;
}

await this.handleSnapshotChange(response.data.execution);
},
intervalMs: this.snapshotPollIntervalMs,
intervalMs,
leadingEdge: false,
onError: async (error) => {
this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: "Failed to poll for snapshot",
properties: { error: error instanceof Error ? error.message : String(error) },
this.sendDebugLog("failed to poll for snapshot", {
error: error instanceof Error ? error.message : String(error),
});
},
});

this.sendDebugLog("created");
}

private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) {
this.logger.sendDebugLog({
runId: this.runFriendlyId,
message: `[poller] ${message}`,
properties: {
...properties,
runId: this.runFriendlyId,
snapshotId: this.snapshotFriendlyId,
pollIntervalMs: this.poller.intervalMs,
},
});
}

resetCurrentInterval() {
Expand All @@ -112,10 +93,22 @@ export class RunExecutionSnapshotPoller {
}

start() {
if (this.enabled) {
this.sendDebugLog("already started");
return;
}

this.enabled = true;
this.poller.start();
}

stop() {
if (!this.enabled) {
this.sendDebugLog("already stopped");
return;
}

this.enabled = false;
this.poller.stop();
}
}
Loading
Loading