Skip to content

Commit 09a859d

Browse files
authored
Fix managed run controller edge cases (#1987)
* parse disconnect description * decrease snapshot poller logs * remove another useless log * attempt number change detection * ensure no executions or child processes are being reused * prevent snapshot change handler from running after execution stopped * prevent creating zombie intervals on reset * add changeset * add enabled poller check before getting latest snapshot * make attempt number update more explicit, improve logs * remove deprecated heartbeat service * ..also remove the import
1 parent 7bdbbdc commit 09a859d

File tree

8 files changed

+168
-165
lines changed

8 files changed

+168
-165
lines changed

.changeset/tidy-books-smell.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
- Fix polling interval reset bug that could create duplicate intervals
7+
- Protect against unexpected attempt number changes
8+
- Prevent run execution zombies after warm starts

packages/cli-v3/src/entryPoints/managed/controller.ts

+36-3
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,17 @@ export class ManagedRunController {
178178
}
179179

180180
const execution = async () => {
181-
if (!this.currentExecution || !this.currentExecution.isPreparedForNextRun) {
181+
// If we have an existing execution that isn't prepared for the next run, kill it
182+
if (this.currentExecution && !this.currentExecution.canExecute) {
183+
this.sendDebugLog({
184+
runId: runFriendlyId,
185+
message: "killing existing execution before starting new run",
186+
});
187+
await this.currentExecution.kill().catch(() => {});
188+
this.currentExecution = null;
189+
}
190+
191+
if (!this.currentExecution || !this.currentExecution.canExecute) {
182192
this.currentExecution = new RunExecution({
183193
workerManifest: this.workerManifest,
184194
env: this.env,
@@ -267,11 +277,12 @@ export class ManagedRunController {
267277
if (this.currentExecution?.taskRunEnv) {
268278
this.sendDebugLog({
269279
runId: this.runFriendlyId,
270-
message: "waitForNextRun: eagerly recreating task run process",
280+
message: "waitForNextRun: eagerly creating fresh execution for next run",
271281
});
272282

273283
const previousTaskRunEnv = this.currentExecution.taskRunEnv;
274284

285+
// Create a fresh execution for the next run
275286
this.currentExecution = new RunExecution({
276287
workerManifest: this.workerManifest,
277288
env: this.env,
@@ -486,10 +497,32 @@ export class ManagedRunController {
486497
});
487498

488499
socket.on("disconnect", (reason, description) => {
500+
const parseDescription = ():
501+
| {
502+
description: string;
503+
context?: string;
504+
}
505+
| undefined => {
506+
if (!description) {
507+
return undefined;
508+
}
509+
510+
if (description instanceof Error) {
511+
return {
512+
description: description.toString(),
513+
};
514+
}
515+
516+
return {
517+
description: description.description,
518+
context: description.context ? String(description.context) : undefined,
519+
};
520+
};
521+
489522
this.sendDebugLog({
490523
runId: this.runFriendlyId,
491524
message: "Socket disconnected from supervisor",
492-
properties: { reason, description: description?.toString() },
525+
properties: { reason, ...parseDescription() },
493526
});
494527
});
495528

packages/cli-v3/src/entryPoints/managed/execution.ts

+53-13
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export class RunExecution {
5151

5252
private _runFriendlyId?: string;
5353
private currentSnapshotId?: string;
54+
private currentAttemptNumber?: number;
5455
private currentTaskRunEnv?: Record<string, string>;
5556

5657
private dequeuedAt?: Date;
@@ -65,6 +66,7 @@ export class RunExecution {
6566
private snapshotPoller?: RunExecutionSnapshotPoller;
6667

6768
private lastHeartbeat?: Date;
69+
private isShuttingDown = false;
6870

6971
constructor(opts: RunExecutionOptions) {
7072
this.id = randomBytes(4).toString("hex");
@@ -86,10 +88,6 @@ export class RunExecution {
8688
throw new Error("prepareForExecution called after process was already created");
8789
}
8890

89-
if (this.isPreparedForNextRun) {
90-
throw new Error("prepareForExecution called after execution was already prepared");
91-
}
92-
9391
this.taskRunProcess = this.createTaskRunProcess({
9492
envVars: opts.taskRunEnv,
9593
isWarmStart: true,
@@ -150,9 +148,14 @@ export class RunExecution {
150148
}
151149

152150
/**
153-
* Returns true if the execution has been prepared with task run env.
151+
* Returns true if no run has been started yet and the process is prepared for the next run.
154152
*/
155-
get isPreparedForNextRun(): boolean {
153+
get canExecute(): boolean {
154+
// If we've ever had a run ID, this execution can't be reused
155+
if (this._runFriendlyId) {
156+
return false;
157+
}
158+
156159
return !!this.taskRunProcess?.isPreparedForNextRun;
157160
}
158161

@@ -161,6 +164,11 @@ export class RunExecution {
161164
* or when the snapshot poller detects a change
162165
*/
163166
public async handleSnapshotChange(runData: RunExecutionData): Promise<void> {
167+
if (this.isShuttingDown) {
168+
this.sendDebugLog("handleSnapshotChange: shutting down, skipping");
169+
return;
170+
}
171+
164172
const { run, snapshot, completedWaitpoints } = runData;
165173

166174
const snapshotMetadata = {
@@ -191,8 +199,6 @@ export class RunExecution {
191199
return;
192200
}
193201

194-
this.sendDebugLog(`enqueued snapshot change: ${snapshot.executionStatus}`, snapshotMetadata);
195-
196202
this.snapshotChangeQueue.push(runData);
197203
await this.processSnapshotChangeQueue();
198204
}
@@ -240,11 +246,16 @@ export class RunExecution {
240246
}
241247

242248
if (snapshot.friendlyId === this.currentSnapshotId) {
243-
this.sendDebugLog("handleSnapshotChange: snapshot not changed", snapshotMetadata);
244249
return;
245250
}
246251

247-
this.sendDebugLog(`snapshot change: ${snapshot.executionStatus}`, snapshotMetadata);
252+
if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) {
253+
this.sendDebugLog("ERROR: attempt number mismatch", snapshotMetadata);
254+
await this.taskRunProcess?.suspend();
255+
return;
256+
}
257+
258+
this.sendDebugLog(`snapshot has changed to: ${snapshot.executionStatus}`, snapshotMetadata);
248259

249260
// Reset the snapshot poll interval so we don't do unnecessary work
250261
this.snapshotPoller?.resetCurrentInterval();
@@ -456,6 +467,16 @@ export class RunExecution {
456467
// A snapshot was just created, so update the snapshot ID
457468
this.currentSnapshotId = start.data.snapshot.friendlyId;
458469

470+
// Also set or update the attempt number - we do this to detect illegal attempt number changes, e.g. from stalled runners coming back online
471+
const attemptNumber = start.data.run.attemptNumber;
472+
if (attemptNumber && attemptNumber > 0) {
473+
this.currentAttemptNumber = attemptNumber;
474+
} else {
475+
this.sendDebugLog("ERROR: invalid attempt number returned from start attempt", {
476+
attemptNumber: String(attemptNumber),
477+
});
478+
}
479+
459480
const metrics = this.measureExecutionMetrics({
460481
attemptCreatedAt: attemptStartedAt,
461482
dequeuedAt: this.dequeuedAt?.getTime(),
@@ -597,8 +618,18 @@ export class RunExecution {
597618
metrics: TaskRunExecutionMetrics;
598619
isWarmStart?: boolean;
599620
}) {
621+
// For immediate retries, we need to ensure the task run process is prepared for the next attempt
622+
if (
623+
this.runFriendlyId &&
624+
this.taskRunProcess &&
625+
!this.taskRunProcess.isPreparedForNextAttempt
626+
) {
627+
this.sendDebugLog("killing existing task run process before executing next attempt");
628+
await this.kill().catch(() => {});
629+
}
630+
600631
// To skip this step and eagerly create the task run process, run prepareForExecution first
601-
if (!this.taskRunProcess || !this.isPreparedForNextRun) {
632+
if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
602633
this.taskRunProcess = this.createTaskRunProcess({ envVars, isWarmStart });
603634
}
604635

@@ -655,11 +686,15 @@ export class RunExecution {
655686
}
656687

657688
public exit() {
658-
if (this.isPreparedForNextRun) {
689+
if (this.taskRunProcess?.isPreparedForNextRun) {
659690
this.taskRunProcess?.forceExit();
660691
}
661692
}
662693

694+
public async kill() {
695+
await this.taskRunProcess?.kill("SIGKILL");
696+
}
697+
663698
private async complete({ completion }: { completion: TaskRunExecutionResult }): Promise<void> {
664699
if (!this.runFriendlyId || !this.currentSnapshotId) {
665700
throw new Error("Cannot complete run: missing run or snapshot ID");
@@ -897,7 +932,7 @@ export class RunExecution {
897932
this.lastHeartbeat = new Date();
898933
}
899934

900-
sendDebugLog(
935+
private sendDebugLog(
901936
message: string,
902937
properties?: SendDebugLogOptions["properties"],
903938
runIdOverride?: string
@@ -958,6 +993,11 @@ export class RunExecution {
958993
}
959994

960995
private stopServices() {
996+
if (this.isShuttingDown) {
997+
return;
998+
}
999+
1000+
this.isShuttingDown = true;
9611001
this.snapshotPoller?.stop();
9621002
this.taskRunProcess?.onTaskRunHeartbeat.detach();
9631003
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
2-
import { RunLogger } from "./logger.js";
2+
import { RunLogger, SendDebugLogOptions } from "./logger.js";
33
import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3";
44

55
export type RunExecutionSnapshotPollerOptions = {
@@ -14,89 +14,70 @@ export type RunExecutionSnapshotPollerOptions = {
1414
export class RunExecutionSnapshotPoller {
1515
private runFriendlyId: string;
1616
private snapshotFriendlyId: string;
17+
private enabled: boolean;
1718

1819
private readonly httpClient: WorkloadHttpClient;
1920
private readonly logger: RunLogger;
20-
private readonly snapshotPollIntervalMs: number;
2121
private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise<void>;
2222
private readonly poller: IntervalService;
2323

2424
constructor(opts: RunExecutionSnapshotPollerOptions) {
25+
this.enabled = false;
26+
2527
this.runFriendlyId = opts.runFriendlyId;
2628
this.snapshotFriendlyId = opts.snapshotFriendlyId;
2729
this.httpClient = opts.httpClient;
2830
this.logger = opts.logger;
29-
this.snapshotPollIntervalMs = opts.snapshotPollIntervalSeconds * 1000;
3031
this.handleSnapshotChange = opts.handleSnapshotChange;
3132

32-
this.logger.sendDebugLog({
33-
runId: this.runFriendlyId,
34-
message: "RunExecutionSnapshotPoller",
35-
properties: {
36-
runFriendlyId: this.runFriendlyId,
37-
snapshotFriendlyId: this.snapshotFriendlyId,
38-
snapshotPollIntervalSeconds: opts.snapshotPollIntervalSeconds,
39-
},
40-
});
33+
const intervalMs = opts.snapshotPollIntervalSeconds * 1000;
4134

4235
this.poller = new IntervalService({
4336
onInterval: async () => {
44-
if (!this.runFriendlyId) {
45-
this.logger.sendDebugLog({
46-
runId: this.runFriendlyId,
47-
message: "Skipping snapshot poll, no run ID",
48-
});
37+
if (!this.enabled) {
38+
this.sendDebugLog("poller disabled, skipping snapshot change handler (pre)");
4939
return;
5040
}
5141

52-
this.logger.sendDebugLog({
53-
runId: this.runFriendlyId,
54-
message: "Polling for latest snapshot",
55-
});
56-
57-
this.logger.sendDebugLog({
58-
runId: this.runFriendlyId,
59-
message: `snapshot poll: started`,
60-
properties: {
61-
snapshotId: this.snapshotFriendlyId,
62-
},
63-
});
42+
this.sendDebugLog("polling for latest snapshot");
6443

6544
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
6645

6746
if (!response.success) {
68-
this.logger.sendDebugLog({
69-
runId: this.runFriendlyId,
70-
message: "Snapshot poll failed",
71-
properties: {
72-
error: response.error,
73-
},
74-
});
75-
76-
this.logger.sendDebugLog({
77-
runId: this.runFriendlyId,
78-
message: `snapshot poll: failed`,
79-
properties: {
80-
snapshotId: this.snapshotFriendlyId,
81-
error: response.error,
82-
},
83-
});
47+
this.sendDebugLog("failed to get run execution data", { error: response.error });
48+
return;
49+
}
8450

51+
if (!this.enabled) {
52+
this.sendDebugLog("poller disabled, skipping snapshot change handler (post)");
8553
return;
8654
}
8755

8856
await this.handleSnapshotChange(response.data.execution);
8957
},
90-
intervalMs: this.snapshotPollIntervalMs,
58+
intervalMs,
9159
leadingEdge: false,
9260
onError: async (error) => {
93-
this.logger.sendDebugLog({
94-
runId: this.runFriendlyId,
95-
message: "Failed to poll for snapshot",
96-
properties: { error: error instanceof Error ? error.message : String(error) },
61+
this.sendDebugLog("failed to poll for snapshot", {
62+
error: error instanceof Error ? error.message : String(error),
9763
});
9864
},
9965
});
66+
67+
this.sendDebugLog("created");
68+
}
69+
70+
private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) {
71+
this.logger.sendDebugLog({
72+
runId: this.runFriendlyId,
73+
message: `[poller] ${message}`,
74+
properties: {
75+
...properties,
76+
runId: this.runFriendlyId,
77+
snapshotId: this.snapshotFriendlyId,
78+
pollIntervalMs: this.poller.intervalMs,
79+
},
80+
});
10081
}
10182

10283
resetCurrentInterval() {
@@ -112,10 +93,22 @@ export class RunExecutionSnapshotPoller {
11293
}
11394

11495
start() {
96+
if (this.enabled) {
97+
this.sendDebugLog("already started");
98+
return;
99+
}
100+
101+
this.enabled = true;
115102
this.poller.start();
116103
}
117104

118105
stop() {
106+
if (!this.enabled) {
107+
this.sendDebugLog("already stopped");
108+
return;
109+
}
110+
111+
this.enabled = false;
119112
this.poller.stop();
120113
}
121114
}

0 commit comments

Comments
 (0)