diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 49352f2a14..c41b50ad27 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -8,6 +8,7 @@ import { type CompleteRunAttemptResult, HeartbeatService, type RunExecutionData, + SuspendedProcessError, type TaskRunExecutionMetrics, type TaskRunExecutionResult, type TaskRunFailedExecutionResult, @@ -55,6 +56,7 @@ const Env = z.object({ TRIGGER_MACHINE_MEMORY: z.string().default("0"), TRIGGER_RUNNER_ID: z.string(), TRIGGER_METADATA_URL: z.string().optional(), + TRIGGER_PRE_SUSPEND_WAIT_MS: z.coerce.number().default(200), // Timeline metrics TRIGGER_POD_SCHEDULED_AT_MS: DateEnv, @@ -154,19 +156,10 @@ class ManagedRunController { } = { phase: "IDLE" }; constructor(opts: ManagedRunControllerOptions) { - logger.debug("[ManagedRunController] Creating controller", { env }); - this.workerManifest = opts.workerManifest; this.runnerId = env.TRIGGER_RUNNER_ID; - this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; - this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; - - if (env.TRIGGER_METADATA_URL) { - this.metadataClient = new MetadataClient(env.TRIGGER_METADATA_URL); - } - this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; this.workerInstanceName = env.TRIGGER_WORKER_INSTANCE_NAME; @@ -178,6 +171,25 @@ class ManagedRunController { projectRef: env.TRIGGER_PROJECT_REF, }); + const properties = { + ...env, + TRIGGER_POD_SCHEDULED_AT_MS: env.TRIGGER_POD_SCHEDULED_AT_MS.toISOString(), + TRIGGER_DEQUEUED_AT_MS: env.TRIGGER_DEQUEUED_AT_MS.toISOString(), + }; + + this.sendDebugLog({ + runId: env.TRIGGER_RUN_ID, + message: "Creating run controller", + properties, + }); + + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + + if (env.TRIGGER_METADATA_URL) { + this.metadataClient = new MetadataClient(env.TRIGGER_METADATA_URL); + } + if (env.TRIGGER_WARM_START_URL) { this.warmStartClient = new WarmStartClient({ apiUrl: new URL(env.TRIGGER_WARM_START_URL), @@ -192,11 +204,17 @@ class ManagedRunController { this.snapshotPoller = new HeartbeatService({ heartbeat: async () => { if (!this.runFriendlyId) { - logger.debug("[ManagedRunController] Skipping snapshot poll, no run ID"); + this.sendDebugLog({ + runId: env.TRIGGER_RUN_ID, + message: "Skipping snapshot poll, no run ID", + }); return; } - console.debug("[ManagedRunController] Polling for latest snapshot"); + this.sendDebugLog({ + runId: env.TRIGGER_RUN_ID, + message: "Polling for latest snapshot", + }); this.sendDebugLog({ runId: this.runFriendlyId, @@ -209,7 +227,13 @@ class ManagedRunController { const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); if (!response.success) { - console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Snapshot poll failed", + properties: { + error: response.error, + }, + }); this.sendDebugLog({ runId: this.runFriendlyId, @@ -228,18 +252,28 @@ class ManagedRunController { intervalMs: this.snapshotPollIntervalSeconds * 1000, leadingEdge: false, onError: async (error) => { - console.error("[ManagedRunController] Failed to poll for snapshot", { error }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Failed to poll for snapshot", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); }, }); this.runHeartbeat = new HeartbeatService({ heartbeat: async () => { if (!this.runFriendlyId || !this.snapshotFriendlyId) { - logger.debug("[ManagedRunController] Skipping heartbeat, no run ID or snapshot ID"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Skipping heartbeat, no run ID or snapshot ID", + }); return; } - console.debug("[ManagedRunController] Sending heartbeat"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "heartbeat: started", + }); const response = await this.httpClient.heartbeatRun( this.runFriendlyId, @@ -247,8 +281,6 @@ class ManagedRunController { ); if (!response.success) { - console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); - this.sendDebugLog({ runId: this.runFriendlyId, message: "heartbeat: failed", @@ -261,12 +293,19 @@ class ManagedRunController { intervalMs: this.heartbeatIntervalSeconds * 1000, leadingEdge: false, onError: async (error) => { - console.error("[ManagedRunController] Failed to send heartbeat", { error }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Failed to send heartbeat", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); }, }); process.on("SIGTERM", async () => { - logger.debug("[ManagedRunController] Received SIGTERM, stopping worker"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Received SIGTERM, stopping worker", + }); await this.stop(); }); } @@ -315,7 +354,11 @@ class ManagedRunController { } if (this.state.snapshot.friendlyId === snapshot.friendlyId) { - logger.debug("updateRunPhase: Snapshot not changed", { run, snapshot }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "updateRunPhase: Snapshot not changed", + properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, + }); this.sendDebugLog({ runId: run.friendlyId, @@ -355,17 +398,29 @@ class ManagedRunController { private onExitRunPhase(newRun: Run | undefined = undefined) { // We're not in a run phase, nothing to do if (this.state.phase !== "RUN") { - logger.debug("onExitRunPhase: Not in run phase, skipping", { phase: this.state.phase }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "onExitRunPhase: Not in run phase, skipping", + properties: { phase: this.state.phase }, + }); return; } // This is still the same run, so we're not exiting the phase if (newRun?.friendlyId === this.state.run.friendlyId) { - logger.debug("onExitRunPhase: Same run, skipping", { newRun }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "onExitRunPhase: Same run, skipping", + properties: { newRun: newRun?.friendlyId }, + }); return; } - logger.debug("onExitRunPhase: Exiting run phase", { newRun }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "onExitRunPhase: Exiting run phase", + properties: { newRun: newRun?.friendlyId }, + }); this.runHeartbeat.stop(); this.snapshotPoller.stop(); @@ -423,7 +478,10 @@ class ManagedRunController { completedWaitpoints, }: Pick) { if (this.handleSnapshotChangeLock) { - console.warn("handleSnapshotChange: already in progress"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "handleSnapshotChange: already in progress", + }); return; } @@ -431,9 +489,13 @@ class ManagedRunController { try { if (!this.snapshotFriendlyId) { - console.error("handleSnapshotChange: Missing snapshot ID", { + this.sendDebugLog({ runId: run.friendlyId, - snapshotId: this.snapshotFriendlyId, + message: "handleSnapshotChange: Missing snapshot ID", + properties: { + newSnapshotId: snapshot.friendlyId, + newSnapshotStatus: snapshot.executionStatus, + }, }); this.sendDebugLog({ @@ -449,7 +511,11 @@ class ManagedRunController { } if (this.snapshotFriendlyId === snapshot.friendlyId) { - console.debug("handleSnapshotChange: snapshot not changed, skipping", { snapshot }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "handleSnapshotChange: snapshot not changed, skipping", + properties: { snapshot: snapshot.friendlyId }, + }); this.sendDebugLog({ runId: run.friendlyId, @@ -463,13 +529,6 @@ class ManagedRunController { return; } - console.log(`handleSnapshotChange: ${snapshot.executionStatus}`, { - run, - oldSnapshotId: this.snapshotFriendlyId, - newSnapshot: snapshot, - completedWaitpoints: completedWaitpoints.length, - }); - this.sendDebugLog({ runId: run.friendlyId, message: `snapshot change: ${snapshot.executionStatus}`, @@ -483,12 +542,6 @@ class ManagedRunController { try { this.updateRunPhase(run, snapshot); } catch (error) { - console.error("handleSnapshotChange: failed to update run phase", { - run, - snapshot, - error, - }); - this.sendDebugLog({ runId: run.friendlyId, message: "snapshot change: failed to update run phase", @@ -507,8 +560,12 @@ class ManagedRunController { try { await this.cancelAttempt(run.friendlyId); } catch (error) { - console.error("Failed to cancel attempt, shutting down", { - error, + this.sendDebugLog({ + runId: run.friendlyId, + message: "snapshot change: failed to cancel attempt", + properties: { + error: error instanceof Error ? error.message : String(error), + }, }); this.waitForNextRun(); @@ -518,47 +575,76 @@ class ManagedRunController { return; } case "FINISHED": { - console.log("Run is finished, will wait for next run"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run is finished, will wait for next run", + }); + + if (this.activeRunExecution) { + // Let's pretend we've just suspended the run. This will kill the process and should automatically wait for the next run. + // We still explicitly call waitForNextRun() afterwards in case of race conditions. Locks will prevent this from causing issues. + await this.taskRunProcess?.suspend(); + } + this.waitForNextRun(); + return; } case "QUEUED_EXECUTING": case "EXECUTING_WITH_WAITPOINTS": { - console.log("Run is executing with waitpoints", { snapshot }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run is executing with waitpoints", + properties: { snapshot: snapshot.friendlyId }, + }); try { + // This should never throw. It should also never fail the run. await this.taskRunProcess?.cleanup(false); } catch (error) { - console.error("Failed to cleanup task run process", { error }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to cleanup task run process", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); } if (snapshot.friendlyId !== this.snapshotFriendlyId) { - console.debug("Snapshot changed after cleanup, abort", { - oldSnapshotId: snapshot.friendlyId, - newSnapshotId: this.snapshotFriendlyId, + this.sendDebugLog({ + runId: run.friendlyId, + message: "Snapshot changed after cleanup, abort", + properties: { + oldSnapshotId: snapshot.friendlyId, + newSnapshotId: this.snapshotFriendlyId, + }, }); return; } - // TODO: Make this configurable and add wait debounce - await sleep(200); + await sleep(env.TRIGGER_PRE_SUSPEND_WAIT_MS); if (snapshot.friendlyId !== this.snapshotFriendlyId) { - console.debug("Snapshot changed after suspend threshold, abort", { - oldSnapshotId: snapshot.friendlyId, - newSnapshotId: this.snapshotFriendlyId, + this.sendDebugLog({ + runId: run.friendlyId, + message: "Snapshot changed after suspend threshold, abort", + properties: { + oldSnapshotId: snapshot.friendlyId, + newSnapshotId: this.snapshotFriendlyId, + }, }); return; } if (!this.runFriendlyId || !this.snapshotFriendlyId) { - console.error( - "handleSnapshotChange: Missing run ID or snapshot ID after suspension, abort", - { + this.sendDebugLog({ + runId: run.friendlyId, + message: + "handleSnapshotChange: Missing run ID or snapshot ID after suspension, abort", + properties: { runId: this.runFriendlyId, snapshotId: this.snapshotFriendlyId, - } - ); + }, + }); return; } @@ -568,8 +654,12 @@ class ManagedRunController { ); if (!suspendResult.success) { - console.error("Failed to suspend run, staying alive 🎢", { - error: suspendResult.error, + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to suspend run, staying alive 🎢", + properties: { + error: suspendResult.error, + }, }); this.sendDebugLog({ @@ -585,10 +675,6 @@ class ManagedRunController { } if (!suspendResult.data.ok) { - console.error("Failed to suspend run, staying alive 🎢🎢", { - suspendResult: suspendResult.data, - }); - this.sendDebugLog({ runId: run.friendlyId, message: "checkpoint: failed to suspend run", @@ -601,23 +687,37 @@ class ManagedRunController { return; } - console.log("Suspending, any day now 🚬", { suspendResult: suspendResult.data }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Suspending, any day now 🚬", + properties: { ok: suspendResult.data.ok }, + }); return; } case "SUSPENDED": { - console.log("Run was suspended, kill the process and wait for more runs", { - run, - snapshot, + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run was suspended, kill the process and wait for more runs", + properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, }); - this.waitForNextRun(); + // This will kill the process and fail the execution with a SuspendedProcessError + await this.taskRunProcess?.suspend(); + return; } case "PENDING_EXECUTING": { - console.log("Run is pending execution", { run, snapshot }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run is pending execution", + properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, + }); if (completedWaitpoints.length === 0) { - console.log("No waitpoints to complete, nothing to do"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "No waitpoints to complete, nothing to do", + }); return; } @@ -636,8 +736,6 @@ class ManagedRunController { ); if (!continuationResult.success) { - console.error("Failed to continue execution", { error: continuationResult.error }); - this.sendDebugLog({ runId: run.friendlyId, message: "failed to continue execution", @@ -653,17 +751,27 @@ class ManagedRunController { return; } case "EXECUTING": { - console.log("Run is now executing", { run, snapshot }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run is now executing", + properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, + }); if (completedWaitpoints.length === 0) { return; } - console.log("Processing completed waitpoints", { completedWaitpoints }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Processing completed waitpoints", + properties: { completedWaitpoints: completedWaitpoints.length }, + }); if (!this.taskRunProcess) { - console.error("No task run process, ignoring completed waitpoints", { - completedWaitpoints, + this.sendDebugLog({ + runId: run.friendlyId, + message: "No task run process, ignoring completed waitpoints", + properties: { completedWaitpoints: completedWaitpoints.length }, }); return; } @@ -676,7 +784,11 @@ class ManagedRunController { } case "RUN_CREATED": case "QUEUED": { - console.log("Status change not handled", { status: snapshot.executionStatus }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Status change not handled", + properties: { status: snapshot.executionStatus }, + }); return; } default: { @@ -684,8 +796,6 @@ class ManagedRunController { } } } catch (error) { - console.error("handleSnapshotChange: unexpected error", { error }); - this.sendDebugLog({ runId: run.friendlyId, message: "snapshot change: unexpected error", @@ -701,18 +811,28 @@ class ManagedRunController { private async processEnvOverrides() { if (!this.metadataClient) { - logger.log("No metadata client, skipping env overrides"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "No metadata client, skipping env overrides", + }); return; } const overrides = await this.metadataClient.getEnvOverrides(); if (!overrides) { - logger.log("No env overrides, skipping"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "No env overrides, skipping", + }); return; } - logger.log("Processing env overrides", { env: overrides }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Processing env overrides", + properties: { ...overrides }, + }); if (overrides.TRIGGER_SUCCESS_EXIT_CODE) { this.successExitCode = overrides.TRIGGER_SUCCESS_EXIT_CODE; @@ -757,151 +877,226 @@ class ManagedRunController { } } + private activeRunExecution: Promise | null = null; + private async startAndExecuteRunAttempt({ runFriendlyId, snapshotFriendlyId, dequeuedAt, podScheduledAt, - isWarmStart = false, + isWarmStart, + skipLockCheckForImmediateRetry: skipLockCheck, }: { runFriendlyId: string; snapshotFriendlyId: string; dequeuedAt?: Date; podScheduledAt?: Date; isWarmStart?: boolean; + skipLockCheckForImmediateRetry?: boolean; }) { - if (!this.socket) { - console.warn("[ManagedRunController] Starting run without socket connection"); - } - - this.subscribeToRunNotifications({ - run: { friendlyId: runFriendlyId }, - snapshot: { friendlyId: snapshotFriendlyId }, - }); - - const attemptStartedAt = Date.now(); - - const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, { - isWarmStart, - }); - - if (!start.success) { - console.error("[ManagedRunController] Failed to start run", { error: start.error }); - + if (!skipLockCheck && this.activeRunExecution) { this.sendDebugLog({ runId: runFriendlyId, - message: "failed to start run attempt", - properties: { - error: start.error, - }, + message: "startAndExecuteRunAttempt: already in progress", }); - - this.waitForNextRun(); return; } - const attemptDuration = Date.now() - attemptStartedAt; + const execution = async () => { + if (!this.socket) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "Starting run without socket connection", + }); + } - const { run, snapshot, execution, envVars } = start.data; + this.subscribeToRunNotifications({ + run: { friendlyId: runFriendlyId }, + snapshot: { friendlyId: snapshotFriendlyId }, + }); - logger.debug("[ManagedRunController] Started run", { - runId: run.friendlyId, - snapshot: snapshot.friendlyId, - }); + const attemptStartedAt = Date.now(); - this.enterRunPhase(run, snapshot); + const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, { + isWarmStart, + }); - const metrics = [ - { - name: "start", - event: "create_attempt", - timestamp: attemptStartedAt, - duration: attemptDuration, - }, - ] - .concat( - dequeuedAt - ? [ - { - name: "start", - event: "dequeue", - timestamp: dequeuedAt.getTime(), - duration: 0, - }, - ] - : [] - ) - .concat( - podScheduledAt - ? [ - { - name: "start", - event: "pod_scheduled", - timestamp: podScheduledAt.getTime(), - duration: 0, - }, - ] - : [] - ) satisfies TaskRunExecutionMetrics; + if (!start.success) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "Failed to start run", + properties: { error: start.error }, + }); - const taskRunEnv = { - ...gatherProcessEnv(), - ...envVars, - }; + this.sendDebugLog({ + runId: runFriendlyId, + message: "failed to start run attempt", + properties: { + error: start.error, + }, + }); - try { - return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution, metrics }); - } catch (error) { - console.error("Error while executing attempt", { - error, - }); + this.waitForNextRun(); + return; + } + + const attemptDuration = Date.now() - attemptStartedAt; - console.log("Submitting attempt completion", { + const { run, snapshot, execution, envVars } = start.data; + + this.sendDebugLog({ runId: run.friendlyId, - snapshotId: snapshot.friendlyId, - updatedSnapshotId: this.snapshotFriendlyId, + message: "Started run", + properties: { snapshot: snapshot.friendlyId }, }); - const completion = { - id: execution.run.id, - ok: false, - retry: undefined, - error: TaskRunProcess.parseExecuteError(error), - } satisfies TaskRunFailedExecutionResult; - - const completionResult = await this.httpClient.completeRunAttempt( - run.friendlyId, - this.snapshotFriendlyId ?? snapshot.friendlyId, - { completion } - ); - - if (!completionResult.success) { - console.error("Failed to submit completion after error", { - error: completionResult.error, + this.enterRunPhase(run, snapshot); + + const metrics = [ + { + name: "start", + event: "create_attempt", + timestamp: attemptStartedAt, + duration: attemptDuration, + }, + ] + .concat( + dequeuedAt + ? [ + { + name: "start", + event: "dequeue", + timestamp: dequeuedAt.getTime(), + duration: 0, + }, + ] + : [] + ) + .concat( + podScheduledAt + ? [ + { + name: "start", + event: "pod_scheduled", + timestamp: podScheduledAt.getTime(), + duration: 0, + }, + ] + : [] + ) satisfies TaskRunExecutionMetrics; + + const taskRunEnv = { + ...gatherProcessEnv(), + ...envVars, + }; + + try { + return await this.executeRun({ + run, + snapshot, + envVars: taskRunEnv, + execution, + metrics, + isWarmStart, + }); + } catch (error) { + if (error instanceof SuspendedProcessError) { + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run was suspended and task run process was killed, waiting for next run", + properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, + }); + + this.waitForNextRun(); + return; + } + + this.sendDebugLog({ + runId: run.friendlyId, + message: "Error while executing attempt", + properties: { error: error instanceof Error ? error.message : String(error) }, }); this.sendDebugLog({ runId: run.friendlyId, - message: "completion: failed to submit after error", + message: "Submitting attempt completion", properties: { - error: completionResult.error, + snapshotId: snapshot.friendlyId, + updatedSnapshotId: this.snapshotFriendlyId, }, }); - this.waitForNextRun(); - return; - } + const completion = { + id: execution.run.id, + ok: false, + retry: undefined, + error: TaskRunProcess.parseExecuteError(error), + } satisfies TaskRunFailedExecutionResult; + + const completionResult = await this.httpClient.completeRunAttempt( + run.friendlyId, + // FIXME: if the snapshot has changed since starting the run, this won't be accurate + // ..but we probably shouldn't fetch the latest snapshot either because we may be in an "unhealthy" state while the next runner has already taken over + this.snapshotFriendlyId ?? snapshot.friendlyId, + { completion } + ); - logger.log("Attempt completion submitted after error", completionResult.data.result); + if (!completionResult.success) { + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to submit completion after error", + properties: { error: completionResult.error }, + }); - try { - await this.handleCompletionResult(completion, completionResult.data.result); - } catch (error) { - console.error("Failed to handle completion result after error", { error }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "completion: failed to submit after error", + properties: { + error: completionResult.error, + }, + }); - this.waitForNextRun(); - return; + this.waitForNextRun(); + return; + } + + this.sendDebugLog({ + runId: run.friendlyId, + message: "Attempt completion submitted after error", + properties: { + attemptStatus: completionResult.data.result.attemptStatus, + runId: completionResult.data.result.run.friendlyId, + snapshotId: completionResult.data.result.snapshot.friendlyId, + }, + }); + + try { + await this.handleCompletionResult(completion, completionResult.data.result); + } catch (error) { + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to handle completion result after error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + + this.waitForNextRun(); + return; + } } + }; + + this.activeRunExecution = execution(); + + try { + await this.activeRunExecution; + } catch (error) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "startAndExecuteRunAttempt: unexpected error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + } finally { + this.activeRunExecution = null; } } @@ -912,7 +1107,10 @@ class ManagedRunController { * configured duration. */ private async waitForNextRun() { if (this.waitForNextRunLock) { - console.warn("waitForNextRun: already in progress"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: already in progress", + }); return; } @@ -920,37 +1118,60 @@ class ManagedRunController { const previousRunId = this.runFriendlyId; try { - logger.debug("waitForNextRun: waiting for next run"); - - this.enterWarmStartPhase(); + // If there's a run execution in progress, we need to kill it and wait for it to finish + if (this.activeRunExecution) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: waiting for existing run execution to finish", + }); + await this.activeRunExecution; + } - // Kill the run process + // Just for good measure await this.taskRunProcess?.kill("SIGKILL"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: waiting for next run", + }); + + this.enterWarmStartPhase(); + if (!this.warmStartClient) { - console.error("waitForNextRun: warm starts disabled, shutting down"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm starts disabled, shutting down", + }); this.exitProcess(this.successExitCode); } if (this.taskRunProcess) { - logger.debug("waitForNextRun: eagerly recreating task run process with options"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: eagerly recreating task run process with options", + }); this.taskRunProcess = new TaskRunProcess({ ...this.taskRunProcess.options, isWarmStart: true, }).initialize(); } else { - logger.debug( - "waitForNextRun: no existing task run process, so we can't eagerly recreate it" - ); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: no existing task run process, so we can't eagerly recreate it", + }); } // Check the service is up and get additional warm start config const connect = await this.warmStartClient.connect(); if (!connect.success) { - console.error("waitForNextRun: failed to connect to warm start service", { - warmStartUrl: env.TRIGGER_WARM_START_URL, - error: connect.error, + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: failed to connect to warm start service", + properties: { + warmStartUrl: env.TRIGGER_WARM_START_URL, + error: connect.error, + }, }); this.exitProcess(this.successExitCode); } @@ -959,9 +1180,13 @@ class ManagedRunController { connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; const keepaliveMs = connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; - console.log("waitForNextRun: connected to warm start service", { - connectionTimeoutMs, - keepaliveMs, + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: connected to warm start service", + properties: { + connectionTimeoutMs, + keepaliveMs, + }, }); if (previousRunId) { @@ -976,9 +1201,13 @@ class ManagedRunController { } if (!connectionTimeoutMs || !keepaliveMs) { - console.error("waitForNextRun: warm starts disabled after connect", { - connectionTimeoutMs, - keepaliveMs, + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm starts disabled after connect", + properties: { + connectionTimeoutMs, + keepaliveMs, + }, }); this.exitProcess(this.successExitCode); } @@ -990,11 +1219,18 @@ class ManagedRunController { }); if (!nextRun) { - console.error("waitForNextRun: warm start failed, shutting down"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm start failed, shutting down", + }); this.exitProcess(this.successExitCode); } - console.log("waitForNextRun: got next run", { nextRun }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: got next run", + properties: { nextRun: nextRun.run.friendlyId }, + }); this.startAndExecuteRunAttempt({ runFriendlyId: nextRun.run.friendlyId, @@ -1004,7 +1240,11 @@ class ManagedRunController { }).finally(() => {}); return; } catch (error) { - console.error("waitForNextRun: unexpected error", { error }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: unexpected error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); this.exitProcess(this.failureExitCode); } finally { this.waitForNextRunLock = false; @@ -1012,7 +1252,11 @@ class ManagedRunController { } private exitProcess(code?: number): never { - logger.log("Exiting process", { code }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Exiting process", + properties: { code }, + }); if (this.taskRunProcess?.isPreparedForNextRun) { this.taskRunProcess.forceExit(); } @@ -1030,29 +1274,25 @@ class ManagedRunController { }, }); this.socket.on("run:notify", async ({ version, run }) => { - console.log("[ManagedRunController] Received run notification", { version, run }); - this.sendDebugLog({ runId: run.friendlyId, message: "run:notify received by runner", + properties: { version, runId: run.friendlyId }, }); if (!this.runFriendlyId) { - logger.debug("[ManagedRunController] Ignoring notification, no local run ID", { + this.sendDebugLog({ runId: run.friendlyId, - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, + message: "run:notify: ignoring notification, no local run ID", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + }, }); return; } if (run.friendlyId !== this.runFriendlyId) { - console.log("[ManagedRunController] Ignoring notification for different run", { - runId: run.friendlyId, - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - }); - this.sendDebugLog({ runId: run.friendlyId, message: "run:notify: ignoring notification for different run", @@ -1071,8 +1311,6 @@ class ManagedRunController { const latestSnapshot = await this.httpClient.getRunExecutionData(this.runFriendlyId); if (!latestSnapshot.success) { - console.error("Failed to get latest snapshot data", latestSnapshot.error); - this.sendDebugLog({ runId: this.runFriendlyId, message: "run:notify: failed to get latest snapshot data", @@ -1088,7 +1326,10 @@ class ManagedRunController { await this.handleSnapshotChange(latestSnapshot.data.execution); }); this.socket.on("connect", () => { - console.log("[ManagedRunController] Connected to supervisor"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Connected to supervisor", + }); // This should handle the case where we reconnect after being restored if (this.state.phase === "RUN") { @@ -1097,10 +1338,18 @@ class ManagedRunController { } }); this.socket.on("connect_error", (error) => { - console.error("[ManagedRunController] Connection error", { error }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Connection error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); }); this.socket.on("disconnect", (reason, description) => { - console.log("[ManagedRunController] Disconnected from supervisor", { reason, description }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Disconnected from supervisor", + properties: { reason, description: description?.toString() }, + }); }); } @@ -1110,8 +1359,10 @@ class ManagedRunController { envVars, execution, metrics, + isWarmStart, }: WorkloadRunAttemptStartResponseBody & { metrics?: TaskRunExecutionMetrics; + isWarmStart?: boolean; }) { this.snapshotPoller.start(); @@ -1126,38 +1377,57 @@ class ManagedRunController { engine: "V2", }, machine: execution.machine, + isWarmStart, }).initialize(); } - logger.log("executing task run process", { - attemptId: execution.attempt.id, - runId: execution.run.id, + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "executing task run process", + properties: { + attemptId: execution.attempt.id, + runId: execution.run.id, + }, }); - const completion = await this.taskRunProcess.execute({ - payload: { - execution, - traceContext: execution.run.traceContext ?? {}, - metrics, + const completion = await this.taskRunProcess.execute( + { + payload: { + execution, + traceContext: execution.run.traceContext ?? {}, + metrics, + }, + messageId: run.friendlyId, + env: envVars, }, - messageId: run.friendlyId, - env: envVars, - }); + isWarmStart + ); - logger.log("Completed run", completion); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Completed run", + properties: { completion: completion.ok }, + }); try { + // The execution has finished, so we can cleanup the task run process. Killing it should be safe. await this.taskRunProcess.cleanup(true); } catch (error) { - console.error("Failed to cleanup task run process, submitting completion anyway", { - error, + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Failed to cleanup task run process, submitting completion anyway", + properties: { error: error instanceof Error ? error.message : String(error) }, }); } if (!this.runFriendlyId || !this.snapshotFriendlyId) { - console.error("executeRun: Missing run ID or snapshot ID after execution", { + this.sendDebugLog({ runId: this.runFriendlyId, - snapshotId: this.snapshotFriendlyId, + message: "executeRun: Missing run ID or snapshot ID after execution", + properties: { + runId: this.runFriendlyId, + snapshotId: this.snapshotFriendlyId, + }, }); this.waitForNextRun(); @@ -1173,8 +1443,12 @@ class ManagedRunController { ); if (!completionResult.success) { - console.error("Failed to submit completion", { - error: completionResult.error, + this.sendDebugLog({ + runId: run.friendlyId, + message: "completion: failed to submit", + properties: { + error: completionResult.error, + }, }); this.sendDebugLog({ @@ -1189,12 +1463,24 @@ class ManagedRunController { return; } - logger.log("Attempt completion submitted", completionResult.data.result); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Attempt completion submitted", + properties: { + attemptStatus: completionResult.data.result.attemptStatus, + runId: completionResult.data.result.run.friendlyId, + snapshotId: completionResult.data.result.snapshot.friendlyId, + }, + }); try { await this.handleCompletionResult(completion, completionResult.data.result); } catch (error) { - console.error("Failed to handle completion result", { error }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to handle completion result", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); this.waitForNextRun(); return; @@ -1205,33 +1491,55 @@ class ManagedRunController { completion: TaskRunExecutionResult, result: CompleteRunAttemptResult ) { - logger.debug("[ManagedRunController] Handling completion result", { completion, result }); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Handling completion result", + properties: { + completion: completion.ok, + attemptStatus: result.attemptStatus, + snapshotId: result.snapshot.friendlyId, + runId: result.run.friendlyId, + }, + }); const { attemptStatus, snapshot: completionSnapshot, run } = result; try { this.updateRunPhase(run, completionSnapshot); } catch (error) { - console.error("Failed to update run phase after completion", { error }); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Failed to update run phase after completion", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); this.waitForNextRun(); return; } if (attemptStatus === "RUN_FINISHED") { - logger.debug("Run finished"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run finished", + }); this.waitForNextRun(); return; } if (attemptStatus === "RUN_PENDING_CANCEL") { - logger.debug("Run pending cancel"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Run pending cancel", + }); return; } if (attemptStatus === "RETRY_QUEUED") { - logger.debug("Retry queued"); + this.sendDebugLog({ + runId: run.friendlyId, + message: "Retry queued", + }); this.waitForNextRun(); return; @@ -1255,6 +1563,8 @@ class ManagedRunController { this.startAndExecuteRunAttempt({ runFriendlyId: run.friendlyId, snapshotFriendlyId: this.snapshotFriendlyId, + skipLockCheckForImmediateRetry: true, + isWarmStart: true, }).finally(() => {}); return; } @@ -1268,30 +1578,54 @@ class ManagedRunController { date, properties, }: { - runId: string; + runId?: string; message: string; date?: Date; properties?: WorkloadDebugLogRequestBody["properties"]; }) { + if (!runId) { + runId = this.runFriendlyId; + } + + if (!runId) { + runId = env.TRIGGER_RUN_ID; + } + + if (!runId) { + return; + } + + const mergedProperties = { + ...properties, + runId, + runnerId: this.runnerId, + workerName: this.workerInstanceName, + }; + + console.log(message, mergedProperties); + this.httpClient.sendDebugLog(runId, { message, time: date ?? new Date(), - properties: { - ...properties, - runnerId: this.runnerId, - workerName: this.workerInstanceName, - }, + properties: mergedProperties, }); } async cancelAttempt(runId: string) { - logger.log("cancelling attempt", { runId }); + this.sendDebugLog({ + runId, + message: "cancelling attempt", + properties: { runId }, + }); await this.taskRunProcess?.cancel(); } async start() { - logger.debug("[ManagedRunController] Starting up"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Starting up", + }); // Websocket notifications are only an optimisation so we don't need to wait for a successful connection this.createSocket(); @@ -1313,7 +1647,10 @@ class ManagedRunController { } async stop() { - logger.debug("[ManagedRunController] Shutting down"); + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Shutting down", + }); if (this.taskRunProcess) { await this.taskRunProcess.cleanup(true); diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index df46da8ff5..abe7c93389 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -29,6 +29,7 @@ import { internalErrorFromUnexpectedExit, GracefulExitTimeoutError, UnexpectedExitError, + SuspendedProcessError, } from "@trigger.dev/core/v3/errors"; export type OnWaitForDurationMessage = InferSocketMessageSchema< @@ -73,6 +74,7 @@ export class TaskRunProcess { private _gracefulExitTimeoutElapsed: boolean = false; private _isBeingKilled: boolean = false; private _isBeingCancelled: boolean = false; + private _isBeingSuspended: boolean = false; private _stderr: Array = []; public onTaskRunHeartbeat: Evt = new Evt(); @@ -213,7 +215,10 @@ export class TaskRunProcess { await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000); } - async execute(params: TaskRunProcessExecuteParams): Promise { + async execute( + params: TaskRunProcessExecuteParams, + isWarmStart?: boolean + ): Promise { this._isPreparedForNextRun = false; let resolver: (value: TaskRunExecutionResult) => void; @@ -249,7 +254,7 @@ export class TaskRunProcess { metadata: this.options.serverWorker, metrics, env: params.env, - isWarmStart: this.options.isWarmStart, + isWarmStart: isWarmStart ?? this.options.isWarmStart, }); } @@ -347,7 +352,11 @@ export class TaskRunProcess { // Order matters, this has to be before the graceful exit timeout rejecter(new GracefulExitTimeoutError()); } else if (this._isBeingKilled) { - rejecter(new CleanupProcessError()); + if (this._isBeingSuspended) { + rejecter(new SuspendedProcessError()); + } else { + rejecter(new CleanupProcessError()); + } } else { rejecter( new UnexpectedExitError( @@ -428,6 +437,11 @@ export class TaskRunProcess { } } + async suspend() { + this._isBeingSuspended = true; + await this.kill("SIGKILL"); + } + forceExit() { try { this._isBeingKilled = true; diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index ef16d3d16a..04fe3902e7 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -497,6 +497,14 @@ export class CleanupProcessError extends Error { } } +export class SuspendedProcessError extends Error { + constructor() { + super("Suspended"); + + this.name = "SuspendedProcessError"; + } +} + export class CancelledProcessError extends Error { constructor() { super("Cancelled"); diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 6ec68b6bb8..1f77fd6d3f 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -89,7 +89,8 @@ export class TaskExecutor { execution: TaskRunExecution, worker: ServerBackgroundWorker, traceContext: Record, - signal?: AbortSignal + signal?: AbortSignal, + isWarmStart?: boolean ): Promise<{ result: TaskRunExecutionResult }> { const ctx = TaskRunContext.parse(execution); const attemptMessage = `Attempt ${execution.attempt.number}`; @@ -102,7 +103,7 @@ export class TaskExecutor { taskContext.setGlobalTaskContext({ ctx, worker, - isWarmStart: this._isWarmStart, + isWarmStart: isWarmStart ?? this._isWarmStart, }); if (execution.run.metadata) {