diff --git a/.changeset/tricky-houses-invite.md b/.changeset/tricky-houses-invite.md new file mode 100644 index 0000000000..e21e7b5818 --- /dev/null +++ b/.changeset/tricky-houses-invite.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Managed run controller performance and reliability improvements diff --git a/.configs/tsconfig.base.json b/.configs/tsconfig.base.json index 3ce4c2db29..2d560d22d0 100644 --- a/.configs/tsconfig.base.json +++ b/.configs/tsconfig.base.json @@ -10,7 +10,7 @@ "strict": true, "alwaysStrict": true, - "strictPropertyInitialization": false, + "strictPropertyInitialization": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true, "noUnusedLocals": false, diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index d7caccbd80..72498075cd 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -27,6 +27,7 @@ const Env = z.object({ RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + RUNNER_DOCKER_AUTOREMOVE: BoolEnv.default(true), // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 350b81e3ff..811ee8746d 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -66,6 +66,7 @@ class ManagedSupervisor { heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS, snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS, additionalEnvVars: env.RUNNER_ADDITIONAL_ENV_VARS, + dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE, } satisfies WorkloadManagerOptions; if (this.isKubernetes) { diff --git a/apps/supervisor/src/services/podCleaner.ts b/apps/supervisor/src/services/podCleaner.ts index e39a98cfbe..56eaaeb88a 100644 --- a/apps/supervisor/src/services/podCleaner.ts +++ b/apps/supervisor/src/services/podCleaner.ts @@ -1,7 +1,7 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { K8sApi } from "../clients/kubernetes.js"; import { createK8sApi } from "../clients/kubernetes.js"; -import { HeartbeatService } from "@trigger.dev/core/v3"; +import { IntervalService } from "@trigger.dev/core/v3"; import { Counter, Gauge, Registry } from "prom-client"; import { register } from "../metrics.js"; @@ -19,7 +19,7 @@ export class PodCleaner { private readonly namespace: string; private readonly batchSize: number; - private readonly deletionHeartbeat: HeartbeatService; + private readonly deletionInterval: IntervalService; // Metrics private readonly register: Registry; @@ -32,10 +32,10 @@ export class PodCleaner { this.namespace = opts.namespace; this.batchSize = opts.batchSize ?? 500; - this.deletionHeartbeat = new HeartbeatService({ + this.deletionInterval = new IntervalService({ intervalMs: opts.intervalMs ?? 10000, leadingEdge: true, - heartbeat: this.deleteCompletedPods.bind(this), + onInterval: this.deleteCompletedPods.bind(this), }); // Initialize metrics @@ -57,11 +57,11 @@ export class PodCleaner { } async start() { - this.deletionHeartbeat.start(); + this.deletionInterval.start(); } async stop() { - this.deletionHeartbeat.stop(); + this.deletionInterval.stop(); } private async deleteCompletedPods() { diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 9e4ba29594..171e2c0971 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -43,6 +43,10 @@ export class DockerWorkloadManager implements WorkloadManager { `--name=${runnerId}`, ]; + if (this.opts.dockerAutoremove) { + runArgs.push("--rm"); + } + if (this.opts.warmStartUrl) { runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); } diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index a5d7ed3c90..b3cd418f1e 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -10,6 +10,7 @@ export interface WorkloadManagerOptions { heartbeatIntervalSeconds?: number; snapshotPollIntervalSeconds?: number; additionalEnvVars?: Record; + dockerAutoremove?: boolean; } export interface WorkloadManager { diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ed90c450c3..2dcf329736 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -452,7 +452,7 @@ export class WorkloadServer extends EventEmitter { logger.debug("runConnected", { ...getSocketMetadata() }); // If there's already a run ID set, we should "disconnect" it from this socket - if (socket.data.runFriendlyId) { + if (socket.data.runFriendlyId && socket.data.runFriendlyId !== friendlyId) { logger.debug("runConnected: disconnecting existing run", { ...getSocketMetadata(), newRunId: friendlyId, diff --git a/apps/webapp/app/v3/authenticatedSocketConnection.server.ts b/apps/webapp/app/v3/authenticatedSocketConnection.server.ts index a6de96b9c9..cd255c800b 100644 --- a/apps/webapp/app/v3/authenticatedSocketConnection.server.ts +++ b/apps/webapp/app/v3/authenticatedSocketConnection.server.ts @@ -1,6 +1,6 @@ import { clientWebsocketMessages, - HeartbeatService, + IntervalService, serverWebsocketMessages, } from "@trigger.dev/core/v3"; import { ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler"; @@ -19,7 +19,7 @@ export class AuthenticatedSocketConnection { private _sender: ZodMessageSender; private _consumer: DevQueueConsumer; private _messageHandler: ZodMessageHandler; - private _pingService: HeartbeatService; + private _pingService: IntervalService; constructor( public ws: WebSocket, @@ -75,8 +75,8 @@ export class AuthenticatedSocketConnection { // }); }); - this._pingService = new HeartbeatService({ - heartbeat: async () => { + this._pingService = new IntervalService({ + onInterval: async () => { if (ws.readyState !== WebSocket.OPEN) { logger.debug("[AuthenticatedSocketConnection] Websocket not open, skipping ping"); return; diff --git a/internal-packages/run-engine/src/engine/db/worker.ts b/internal-packages/run-engine/src/engine/db/worker.ts index 8bc2817d63..e61e9e8d43 100644 --- a/internal-packages/run-engine/src/engine/db/worker.ts +++ b/internal-packages/run-engine/src/engine/db/worker.ts @@ -193,7 +193,7 @@ export async function getWorkerDeploymentFromWorker( prisma: PrismaClientOrTransaction, workerId: string ): Promise { - const worker = await prisma.backgroundWorker.findUnique({ + const worker = await prisma.backgroundWorker.findFirst({ where: { id: workerId, }, @@ -264,12 +264,10 @@ export async function getManagedWorkerFromCurrentlyPromotedDeployment( prisma: PrismaClientOrTransaction, environmentId: string ): Promise { - const promotion = await prisma.workerDeploymentPromotion.findUnique({ + const promotion = await prisma.workerDeploymentPromotion.findFirst({ where: { - environmentId_label: { - environmentId, - label: CURRENT_DEPLOYMENT_LABEL, - }, + environmentId, + label: CURRENT_DEPLOYMENT_LABEL, }, include: { deployment: { diff --git a/internal-packages/run-engine/src/engine/systems/batchSystem.ts b/internal-packages/run-engine/src/engine/systems/batchSystem.ts index 5f1948a831..8f0a14f4e3 100644 --- a/internal-packages/run-engine/src/engine/systems/batchSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/batchSystem.ts @@ -34,7 +34,7 @@ export class BatchSystem { */ async #tryCompleteBatch({ batchId }: { batchId: string }) { return startSpan(this.$.tracer, "#tryCompleteBatch", async (span) => { - const batch = await this.$.prisma.batchTaskRun.findUnique({ + const batch = await this.$.prisma.batchTaskRun.findFirst({ select: { status: true, runtimeEnvironmentId: true, diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 5ecf5ffd99..76b97c7d60 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -139,12 +139,10 @@ export class RunAttemptSystem { throw new ServiceValidationError("Task run is not locked", 400); } - const queue = await prisma.taskQueue.findUnique({ + const queue = await prisma.taskQueue.findFirst({ where: { - runtimeEnvironmentId_name: { - runtimeEnvironmentId: environment.id, - name: taskRun.queue, - }, + runtimeEnvironmentId: environment.id, + name: taskRun.queue, }, }); @@ -1199,7 +1197,7 @@ export class RunAttemptSystem { async #getAuthenticatedEnvironmentFromRun(runId: string, tx?: PrismaClientOrTransaction) { const prisma = tx ?? this.$.prisma; - const taskRun = await prisma.taskRun.findUnique({ + const taskRun = await prisma.taskRun.findFirst({ where: { id: runId, }, diff --git a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts index 12910f4634..f020fe2b3c 100644 --- a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts @@ -33,7 +33,7 @@ export class TtlSystem { } //only expire "PENDING" runs - const run = await prisma.taskRun.findUnique({ where: { id: runId } }); + const run = await prisma.taskRun.findFirst({ where: { id: runId } }); if (!run) { this.$.logger.debug("Could not find enqueued run to expire", { diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 669fcf0e26..b2eb9e5396 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -159,12 +159,10 @@ export class WaitpointSystem { const prisma = tx ?? this.$.prisma; const existingWaitpoint = idempotencyKey - ? await prisma.waitpoint.findUnique({ + ? await prisma.waitpoint.findFirst({ where: { - environmentId_idempotencyKey: { - environmentId, - idempotencyKey, - }, + environmentId, + idempotencyKey, }, }) : undefined; @@ -241,12 +239,10 @@ export class WaitpointSystem { tags?: string[]; }): Promise<{ waitpoint: Waitpoint; isCached: boolean }> { const existingWaitpoint = idempotencyKey - ? await this.$.prisma.waitpoint.findUnique({ + ? await this.$.prisma.waitpoint.findFirst({ where: { - environmentId_idempotencyKey: { - environmentId, - idempotencyKey, - }, + environmentId, + idempotencyKey, }, }) : undefined; diff --git a/packages/cli-v3/e2e/utils.ts b/packages/cli-v3/e2e/utils.ts index be158ef599..73530208c7 100644 --- a/packages/cli-v3/e2e/utils.ts +++ b/packages/cli-v3/e2e/utils.ts @@ -8,6 +8,7 @@ import { TaskRunProcess } from "../src/executions/taskRunProcess.js"; import { createTestHttpServer } from "@epic-web/test-server/http"; import { TestCase, TestCaseRun } from "./fixtures.js"; import { access } from "node:fs/promises"; +import { MachinePreset } from "@trigger.dev/core/v3"; export type PackageManager = "npm" | "pnpm" | "yarn"; @@ -295,6 +296,13 @@ export async function executeTestCaseRun({ }, }); + const machine = { + name: "small-1x", + cpu: 1, + memory: 256, + centsPerMs: 0.0000001, + } satisfies MachinePreset; + try { const taskRunProcess = new TaskRunProcess({ workerManifest: workerManifest!, @@ -314,12 +322,7 @@ export async function executeTestCaseRun({ version: "1.0.0", contentHash, }, - machine: { - name: "small-1x", - cpu: 1, - memory: 256, - centsPerMs: 0.0000001, - }, + machineResources: machine, }).initialize(); const result = await taskRunProcess.execute({ @@ -372,12 +375,7 @@ export async function executeTestCaseRun({ ref: "main", name: "test", }, - machine: { - name: "small-1x", - cpu: 1, - memory: 256, - centsPerMs: 0.0000001, - }, + machine, }, }, messageId: "run_1234", diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index e1445b4600..be00598471 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -49,13 +49,13 @@ export async function startWorkerRuntime(options: WorkerRuntimeOptions): Promise * - Receiving snapshot update pings (via socket) */ class DevSupervisor implements WorkerRuntime { - private config: DevConfigResponseBody; + private config?: DevConfigResponseBody; private disconnectPresence: (() => void) | undefined; private lastManifest?: BuildManifest; private latestWorkerId?: string; /** Receive notifications when runs change state */ - private socket: Socket; + private socket?: Socket; private socketIsReconnecting = false; /** Workers are versions of the code */ @@ -93,7 +93,7 @@ class DevSupervisor implements WorkerRuntime { this.runLimiter = pLimit(maxConcurrentRuns); - this.#createSocket(); + this.socket = this.#createSocket(); //start an SSE connection for presence this.disconnectPresence = await this.#startPresenceConnection(); @@ -105,7 +105,7 @@ class DevSupervisor implements WorkerRuntime { async shutdown(): Promise { this.disconnectPresence?.(); try { - this.socket.close(); + this.socket?.close(); } catch (error) { logger.debug("[DevSupervisor] shutdown, socket failed to close", { error }); } @@ -187,6 +187,10 @@ class DevSupervisor implements WorkerRuntime { * For the latest version we will pull from the main queue, so we don't specify that. */ async #dequeueRuns() { + if (!this.config) { + throw new Error("No config, can't dequeue runs"); + } + if (!this.latestWorkerId) { //try again later logger.debug(`[DevSupervisor] dequeueRuns. No latest worker ID, trying again later`); @@ -409,13 +413,14 @@ class DevSupervisor implements WorkerRuntime { const wsUrl = new URL(this.options.client.apiURL); wsUrl.pathname = "/dev-worker"; - this.socket = io(wsUrl.href, { + const socket = io(wsUrl.href, { transports: ["websocket"], extraHeaders: { Authorization: `Bearer ${this.options.client.accessToken}`, }, }); - this.socket.on("run:notify", async ({ version, run }) => { + + socket.on("run:notify", async ({ version, run }) => { logger.debug("[DevSupervisor] Received run notification", { version, run }); this.options.client.dev.sendDebugLog(run.friendlyId, { @@ -434,10 +439,11 @@ class DevSupervisor implements WorkerRuntime { await controller.getLatestSnapshot(); }); - this.socket.on("connect", () => { + + socket.on("connect", () => { logger.debug("[DevSupervisor] Connected to supervisor"); - if (this.socket.recovered || this.socketIsReconnecting) { + if (socket.recovered || this.socketIsReconnecting) { logger.debug("[DevSupervisor] Socket recovered"); eventBus.emit("socketConnectionReconnected", `Connection was recovered`); } @@ -448,19 +454,21 @@ class DevSupervisor implements WorkerRuntime { controller.resubscribeToRunNotifications(); } }); - this.socket.on("connect_error", (error) => { + + socket.on("connect_error", (error) => { logger.debug("[DevSupervisor] Connection error", { error }); }); - this.socket.on("disconnect", (reason, description) => { + + socket.on("disconnect", (reason, description) => { logger.debug("[DevSupervisor] socket was disconnected", { reason, description, - active: this.socket.active, + active: socket.active, }); if (reason === "io server disconnect") { // the disconnection was initiated by the server, you need to manually reconnect - this.socket.connect(); + socket.connect(); } else { this.socketIsReconnecting = true; eventBus.emit("socketConnectionDisconnected", reason); @@ -472,6 +480,8 @@ class DevSupervisor implements WorkerRuntime { connections: Array.from(this.socketConnections), }); }, 5000); + + return socket; } #subscribeToRunNotifications() { diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index ccfc68e259..f851bc07aa 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -1,7 +1,7 @@ import { CompleteRunAttemptResult, DequeuedMessage, - HeartbeatService, + IntervalService, LogLevel, RunExecutionData, TaskRunExecution, @@ -44,9 +44,9 @@ export class DevRunController { private taskRunProcess?: TaskRunProcess; private readonly worker: BackgroundWorker; private readonly httpClient: CliApiClient; - private readonly runHeartbeat: HeartbeatService; + private readonly runHeartbeat: IntervalService; private readonly heartbeatIntervalSeconds: number; - private readonly snapshotPoller: HeartbeatService; + private readonly snapshotPoller: IntervalService; private readonly snapshotPollIntervalSeconds: number; private state: @@ -78,8 +78,8 @@ export class DevRunController { this.httpClient = opts.httpClient; - this.snapshotPoller = new HeartbeatService({ - heartbeat: async () => { + this.snapshotPoller = new IntervalService({ + onInterval: async () => { if (!this.runFriendlyId) { logger.debug("[DevRunController] Skipping snapshot poll, no run ID"); return; @@ -121,8 +121,8 @@ export class DevRunController { }, }); - this.runHeartbeat = new HeartbeatService({ - heartbeat: async () => { + this.runHeartbeat = new IntervalService({ + onInterval: async () => { if (!this.runFriendlyId || !this.snapshotFriendlyId) { logger.debug("[DevRunController] Skipping heartbeat, no run ID or snapshot ID"); return; @@ -619,7 +619,7 @@ export class DevRunController { version: this.opts.worker.serverWorker?.version, engine: "V2", }, - machine: execution.machine, + machineResources: execution.machine, }).initialize(); logger.debug("executing task run process", { diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index c41b50ad27..4baa701b05 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -1,1687 +1,12 @@ -import { logger } from "../utilities/logger.js"; -import { TaskRunProcess } from "../executions/taskRunProcess.js"; import { env as stdEnv } from "std-env"; -import { z } from "zod"; -import { randomUUID } from "crypto"; import { readJSONFile } from "../utilities/fileSystem.js"; -import { - type CompleteRunAttemptResult, - HeartbeatService, - type RunExecutionData, - SuspendedProcessError, - type TaskRunExecutionMetrics, - type TaskRunExecutionResult, - type TaskRunFailedExecutionResult, - WorkerManifest, -} from "@trigger.dev/core/v3"; -import { - WarmStartClient, - WORKLOAD_HEADERS, - type WorkloadClientToServerEvents, - type WorkloadDebugLogRequestBody, - WorkloadHttpClient, - type WorkloadServerToClientEvents, - type WorkloadRunAttemptStartResponseBody, -} from "@trigger.dev/core/v3/workers"; -import { assertExhaustive } from "../utilities/assertExhaustive.js"; -import { setTimeout as sleep } from "timers/promises"; -import { io, type Socket } from "socket.io-client"; +import { WorkerManifest } from "@trigger.dev/core/v3"; +import { ManagedRunController } from "./managed/controller.js"; -const DateEnv = z - .string() - .transform((val) => new Date(parseInt(val, 10))) - .pipe(z.date()); +const manifest = await readJSONFile("./index.json"); +const workerManifest = WorkerManifest.parse(manifest); -// All IDs are friendly IDs -const Env = z.object({ - // Set at build time - TRIGGER_CONTENT_HASH: z.string(), - TRIGGER_DEPLOYMENT_ID: z.string(), - TRIGGER_DEPLOYMENT_VERSION: z.string(), - TRIGGER_PROJECT_ID: z.string(), - TRIGGER_PROJECT_REF: z.string(), - NODE_ENV: z.string().default("production"), - NODE_EXTRA_CA_CERTS: z.string().optional(), - - // Set at runtime - TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), - TRIGGER_ENV_ID: z.string(), - TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts - TRIGGER_SNAPSHOT_ID: z.string().optional(), // This is only useful for cold starts - OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), - TRIGGER_WARM_START_URL: z.string().optional(), - TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS: z.coerce.number().default(30_000), - TRIGGER_WARM_START_KEEPALIVE_MS: z.coerce.number().default(300_000), - TRIGGER_MACHINE_CPU: z.string().default("0"), - TRIGGER_MACHINE_MEMORY: z.string().default("0"), - TRIGGER_RUNNER_ID: z.string(), - TRIGGER_METADATA_URL: z.string().optional(), - TRIGGER_PRE_SUSPEND_WAIT_MS: z.coerce.number().default(200), - - // Timeline metrics - TRIGGER_POD_SCHEDULED_AT_MS: DateEnv, - TRIGGER_DEQUEUED_AT_MS: DateEnv, - - // May be overridden - TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), - TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), - TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), - TRIGGER_WORKER_INSTANCE_NAME: z.string(), - TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), - TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), - TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0), - TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1), -}); - -const env = Env.parse(stdEnv); - -logger.loggerLevel = "debug"; - -type ManagedRunControllerOptions = { - workerManifest: WorkerManifest; -}; - -type Run = { - friendlyId: string; - attemptNumber?: number | null; -}; - -type Snapshot = { - friendlyId: string; -}; - -type Metadata = { - TRIGGER_SUPERVISOR_API_PROTOCOL: string | undefined; - TRIGGER_SUPERVISOR_API_DOMAIN: string | undefined; - TRIGGER_SUPERVISOR_API_PORT: number | undefined; - TRIGGER_WORKER_INSTANCE_NAME: string | undefined; - TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined; - TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined; - TRIGGER_SUCCESS_EXIT_CODE: number | undefined; - TRIGGER_FAILURE_EXIT_CODE: number | undefined; - TRIGGER_RUNNER_ID: string | undefined; -}; - -class MetadataClient { - private readonly url: URL; - - constructor(url: string) { - this.url = new URL(url); - } - - async getEnvOverrides(): Promise { - try { - const response = await fetch(new URL("/env", this.url)); - return response.json(); - } catch (error) { - console.error("Failed to fetch metadata", { error }); - return null; - } - } -} - -class ManagedRunController { - private taskRunProcess?: TaskRunProcess; - - private workerManifest: WorkerManifest; - - private readonly httpClient: WorkloadHttpClient; - private readonly warmStartClient: WarmStartClient | undefined; - private readonly metadataClient?: MetadataClient; - - private socket: Socket; - - private readonly runHeartbeat: HeartbeatService; - private heartbeatIntervalSeconds: number; - - private readonly snapshotPoller: HeartbeatService; - private snapshotPollIntervalSeconds: number; - - private workerApiUrl: string; - private workerInstanceName: string; - - private runnerId: string; - - private successExitCode = env.TRIGGER_SUCCESS_EXIT_CODE; - private failureExitCode = env.TRIGGER_FAILURE_EXIT_CODE; - - private state: - | { - phase: "RUN"; - run: Run; - snapshot: Snapshot; - } - | { - phase: "IDLE" | "WARM_START"; - } = { phase: "IDLE" }; - - constructor(opts: ManagedRunControllerOptions) { - this.workerManifest = opts.workerManifest; - - this.runnerId = env.TRIGGER_RUNNER_ID; - - this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; - this.workerInstanceName = env.TRIGGER_WORKER_INSTANCE_NAME; - - this.httpClient = new WorkloadHttpClient({ - workerApiUrl: this.workerApiUrl, - runnerId: this.runnerId, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, - projectRef: env.TRIGGER_PROJECT_REF, - }); - - const properties = { - ...env, - TRIGGER_POD_SCHEDULED_AT_MS: env.TRIGGER_POD_SCHEDULED_AT_MS.toISOString(), - TRIGGER_DEQUEUED_AT_MS: env.TRIGGER_DEQUEUED_AT_MS.toISOString(), - }; - - this.sendDebugLog({ - runId: env.TRIGGER_RUN_ID, - message: "Creating run controller", - properties, - }); - - this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; - this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; - - if (env.TRIGGER_METADATA_URL) { - this.metadataClient = new MetadataClient(env.TRIGGER_METADATA_URL); - } - - if (env.TRIGGER_WARM_START_URL) { - this.warmStartClient = new WarmStartClient({ - apiUrl: new URL(env.TRIGGER_WARM_START_URL), - controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, - machineCpu: env.TRIGGER_MACHINE_CPU, - machineMemory: env.TRIGGER_MACHINE_MEMORY, - }); - } - - this.snapshotPoller = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId) { - this.sendDebugLog({ - runId: env.TRIGGER_RUN_ID, - message: "Skipping snapshot poll, no run ID", - }); - return; - } - - this.sendDebugLog({ - runId: env.TRIGGER_RUN_ID, - message: "Polling for latest snapshot", - }); - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: `snapshot poll: started`, - properties: { - snapshotId: this.snapshotFriendlyId, - }, - }); - - const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!response.success) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Snapshot poll failed", - properties: { - error: response.error, - }, - }); - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: `snapshot poll: failed`, - properties: { - snapshotId: this.snapshotFriendlyId, - error: response.error, - }, - }); - - return; - } - - await this.handleSnapshotChange(response.data.execution); - }, - intervalMs: this.snapshotPollIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Failed to poll for snapshot", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - }, - }); - - this.runHeartbeat = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Skipping heartbeat, no run ID or snapshot ID", - }); - return; - } - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "heartbeat: started", - }); - - const response = await this.httpClient.heartbeatRun( - this.runFriendlyId, - this.snapshotFriendlyId - ); - - if (!response.success) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "heartbeat: failed", - properties: { - error: response.error, - }, - }); - } - }, - intervalMs: this.heartbeatIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Failed to send heartbeat", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - }, - }); - - process.on("SIGTERM", async () => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Received SIGTERM, stopping worker", - }); - await this.stop(); - }); - } - - private enterRunPhase(run: Run, snapshot: Snapshot) { - this.onExitRunPhase(run); - this.state = { phase: "RUN", run, snapshot }; - - this.runHeartbeat.start(); - this.snapshotPoller.start(); - } - - private enterWarmStartPhase() { - this.onExitRunPhase(); - this.state = { phase: "WARM_START" }; - } - - // This should only be used when we're already executing a run. Attempt number changes are not allowed. - private updateRunPhase(run: Run, snapshot: Snapshot) { - if (this.state.phase !== "RUN") { - this.sendDebugLog({ - runId: run.friendlyId, - message: `updateRunPhase: Invalid phase for updating snapshot: ${this.state.phase}`, - properties: { - currentPhase: this.state.phase, - snapshotId: snapshot.friendlyId, - }, - }); - - throw new Error(`Invalid phase for updating snapshot: ${this.state.phase}`); - } - - if (this.state.run.friendlyId !== run.friendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: `updateRunPhase: Mismatched run IDs`, - properties: { - currentRunId: this.state.run.friendlyId, - newRunId: run.friendlyId, - currentSnapshotId: this.state.snapshot.friendlyId, - newSnapshotId: snapshot.friendlyId, - }, - }); - - throw new Error("Mismatched run IDs"); - } - - if (this.state.snapshot.friendlyId === snapshot.friendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "updateRunPhase: Snapshot not changed", - properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: `updateRunPhase: Snapshot not changed`, - properties: { - snapshotId: snapshot.friendlyId, - }, - }); - - return; - } - - if (this.state.run.attemptNumber !== run.attemptNumber) { - this.sendDebugLog({ - runId: run.friendlyId, - message: `updateRunPhase: Attempt number changed`, - properties: { - oldAttemptNumber: this.state.run.attemptNumber ?? undefined, - newAttemptNumber: run.attemptNumber ?? undefined, - }, - }); - throw new Error("Attempt number changed"); - } - - this.state = { - phase: "RUN", - run: { - friendlyId: run.friendlyId, - attemptNumber: run.attemptNumber, - }, - snapshot: { - friendlyId: snapshot.friendlyId, - }, - }; - } - - private onExitRunPhase(newRun: Run | undefined = undefined) { - // We're not in a run phase, nothing to do - if (this.state.phase !== "RUN") { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "onExitRunPhase: Not in run phase, skipping", - properties: { phase: this.state.phase }, - }); - return; - } - - // This is still the same run, so we're not exiting the phase - if (newRun?.friendlyId === this.state.run.friendlyId) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "onExitRunPhase: Same run, skipping", - properties: { newRun: newRun?.friendlyId }, - }); - return; - } - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "onExitRunPhase: Exiting run phase", - properties: { newRun: newRun?.friendlyId }, - }); - - this.runHeartbeat.stop(); - this.snapshotPoller.stop(); - - const { run, snapshot } = this.state; - - this.unsubscribeFromRunNotifications({ run, snapshot }); - } - - private subscribeToRunNotifications({ run, snapshot }: { run: Run; snapshot: Snapshot }) { - this.socket.emit("run:start", { - version: "1", - run: { - friendlyId: run.friendlyId, - }, - snapshot: { - friendlyId: snapshot.friendlyId, - }, - }); - } - - private unsubscribeFromRunNotifications({ run, snapshot }: { run: Run; snapshot: Snapshot }) { - this.socket.emit("run:stop", { - version: "1", - run: { - friendlyId: run.friendlyId, - }, - snapshot: { - friendlyId: snapshot.friendlyId, - }, - }); - } - - private get runFriendlyId() { - if (this.state.phase !== "RUN") { - return undefined; - } - - return this.state.run.friendlyId; - } - - private get snapshotFriendlyId() { - if (this.state.phase !== "RUN") { - return; - } - - return this.state.snapshot.friendlyId; - } - - private handleSnapshotChangeLock = false; - - private async handleSnapshotChange({ - run, - snapshot, - completedWaitpoints, - }: Pick) { - if (this.handleSnapshotChangeLock) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "handleSnapshotChange: already in progress", - }); - return; - } - - this.handleSnapshotChangeLock = true; - - try { - if (!this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "handleSnapshotChange: Missing snapshot ID", - properties: { - newSnapshotId: snapshot.friendlyId, - newSnapshotStatus: snapshot.executionStatus, - }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "snapshot change: missing snapshot ID", - properties: { - newSnapshotId: snapshot.friendlyId, - newSnapshotStatus: snapshot.executionStatus, - }, - }); - - return; - } - - if (this.snapshotFriendlyId === snapshot.friendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "handleSnapshotChange: snapshot not changed, skipping", - properties: { snapshot: snapshot.friendlyId }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "snapshot change: skipping, no change", - properties: { - snapshotId: this.snapshotFriendlyId, - snapshotStatus: snapshot.executionStatus, - }, - }); - - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: `snapshot change: ${snapshot.executionStatus}`, - properties: { - oldSnapshotId: this.snapshotFriendlyId, - newSnapshotId: snapshot.friendlyId, - completedWaitpoints: completedWaitpoints.length, - }, - }); - - try { - this.updateRunPhase(run, snapshot); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "snapshot change: failed to update run phase", - properties: { - currentPhase: this.state.phase, - error: error instanceof Error ? error.message : String(error), - }, - }); - - this.waitForNextRun(); - return; - } - - switch (snapshot.executionStatus) { - case "PENDING_CANCEL": { - try { - await this.cancelAttempt(run.friendlyId); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "snapshot change: failed to cancel attempt", - properties: { - error: error instanceof Error ? error.message : String(error), - }, - }); - - this.waitForNextRun(); - return; - } - - return; - } - case "FINISHED": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run is finished, will wait for next run", - }); - - if (this.activeRunExecution) { - // Let's pretend we've just suspended the run. This will kill the process and should automatically wait for the next run. - // We still explicitly call waitForNextRun() afterwards in case of race conditions. Locks will prevent this from causing issues. - await this.taskRunProcess?.suspend(); - } - - this.waitForNextRun(); - - return; - } - case "QUEUED_EXECUTING": - case "EXECUTING_WITH_WAITPOINTS": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run is executing with waitpoints", - properties: { snapshot: snapshot.friendlyId }, - }); - - try { - // This should never throw. It should also never fail the run. - await this.taskRunProcess?.cleanup(false); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to cleanup task run process", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - } - - if (snapshot.friendlyId !== this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Snapshot changed after cleanup, abort", - properties: { - oldSnapshotId: snapshot.friendlyId, - newSnapshotId: this.snapshotFriendlyId, - }, - }); - return; - } - - await sleep(env.TRIGGER_PRE_SUSPEND_WAIT_MS); - - if (snapshot.friendlyId !== this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Snapshot changed after suspend threshold, abort", - properties: { - oldSnapshotId: snapshot.friendlyId, - newSnapshotId: this.snapshotFriendlyId, - }, - }); - return; - } - - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: - "handleSnapshotChange: Missing run ID or snapshot ID after suspension, abort", - properties: { - runId: this.runFriendlyId, - snapshotId: this.snapshotFriendlyId, - }, - }); - return; - } - - const suspendResult = await this.httpClient.suspendRun( - this.runFriendlyId, - this.snapshotFriendlyId - ); - - if (!suspendResult.success) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to suspend run, staying alive 🎶", - properties: { - error: suspendResult.error, - }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "checkpoint: suspend request failed", - properties: { - snapshotId: snapshot.friendlyId, - error: suspendResult.error, - }, - }); - - return; - } - - if (!suspendResult.data.ok) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "checkpoint: failed to suspend run", - properties: { - snapshotId: snapshot.friendlyId, - error: suspendResult.data.error, - }, - }); - - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Suspending, any day now 🚬", - properties: { ok: suspendResult.data.ok }, - }); - return; - } - case "SUSPENDED": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run was suspended, kill the process and wait for more runs", - properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, - }); - - // This will kill the process and fail the execution with a SuspendedProcessError - await this.taskRunProcess?.suspend(); - - return; - } - case "PENDING_EXECUTING": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run is pending execution", - properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, - }); - - if (completedWaitpoints.length === 0) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "No waitpoints to complete, nothing to do", - }); - return; - } - - // There are waitpoints to complete so we've been restored after being suspended - - // Short delay to give websocket time to reconnect - await sleep(100); - - // Env may have changed after restore - await this.processEnvOverrides(); - - // We need to let the platform know we're ready to continue - const continuationResult = await this.httpClient.continueRunExecution( - run.friendlyId, - snapshot.friendlyId - ); - - if (!continuationResult.success) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "failed to continue execution", - properties: { - error: continuationResult.error, - }, - }); - - this.waitForNextRun(); - return; - } - - return; - } - case "EXECUTING": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run is now executing", - properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, - }); - - if (completedWaitpoints.length === 0) { - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Processing completed waitpoints", - properties: { completedWaitpoints: completedWaitpoints.length }, - }); - - if (!this.taskRunProcess) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "No task run process, ignoring completed waitpoints", - properties: { completedWaitpoints: completedWaitpoints.length }, - }); - return; - } - - for (const waitpoint of completedWaitpoints) { - this.taskRunProcess.waitpointCompleted(waitpoint); - } - - return; - } - case "RUN_CREATED": - case "QUEUED": { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Status change not handled", - properties: { status: snapshot.executionStatus }, - }); - return; - } - default: { - assertExhaustive(snapshot.executionStatus); - } - } - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "snapshot change: unexpected error", - properties: { - snapshotId: snapshot.friendlyId, - error: error instanceof Error ? error.message : String(error), - }, - }); - } finally { - this.handleSnapshotChangeLock = false; - } - } - - private async processEnvOverrides() { - if (!this.metadataClient) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "No metadata client, skipping env overrides", - }); - return; - } - - const overrides = await this.metadataClient.getEnvOverrides(); - - if (!overrides) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "No env overrides, skipping", - }); - return; - } - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Processing env overrides", - properties: { ...overrides }, - }); - - if (overrides.TRIGGER_SUCCESS_EXIT_CODE) { - this.successExitCode = overrides.TRIGGER_SUCCESS_EXIT_CODE; - } - - if (overrides.TRIGGER_FAILURE_EXIT_CODE) { - this.failureExitCode = overrides.TRIGGER_FAILURE_EXIT_CODE; - } - - if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { - this.heartbeatIntervalSeconds = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; - this.runHeartbeat.updateInterval(this.heartbeatIntervalSeconds * 1000); - } - - if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { - this.snapshotPollIntervalSeconds = overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; - this.snapshotPoller.updateInterval(this.snapshotPollIntervalSeconds * 1000); - } - - if (overrides.TRIGGER_WORKER_INSTANCE_NAME) { - this.workerInstanceName = overrides.TRIGGER_WORKER_INSTANCE_NAME; - } - - if ( - overrides.TRIGGER_SUPERVISOR_API_PROTOCOL || - overrides.TRIGGER_SUPERVISOR_API_DOMAIN || - overrides.TRIGGER_SUPERVISOR_API_PORT - ) { - const protocol = - overrides.TRIGGER_SUPERVISOR_API_PROTOCOL ?? env.TRIGGER_SUPERVISOR_API_PROTOCOL; - const domain = overrides.TRIGGER_SUPERVISOR_API_DOMAIN ?? env.TRIGGER_SUPERVISOR_API_DOMAIN; - const port = overrides.TRIGGER_SUPERVISOR_API_PORT ?? env.TRIGGER_SUPERVISOR_API_PORT; - - this.workerApiUrl = `${protocol}://${domain}:${port}`; - - this.httpClient.updateApiUrl(this.workerApiUrl); - } - - if (overrides.TRIGGER_RUNNER_ID) { - this.runnerId = overrides.TRIGGER_RUNNER_ID; - this.httpClient.updateRunnerId(this.runnerId); - } - } - - private activeRunExecution: Promise | null = null; - - private async startAndExecuteRunAttempt({ - runFriendlyId, - snapshotFriendlyId, - dequeuedAt, - podScheduledAt, - isWarmStart, - skipLockCheckForImmediateRetry: skipLockCheck, - }: { - runFriendlyId: string; - snapshotFriendlyId: string; - dequeuedAt?: Date; - podScheduledAt?: Date; - isWarmStart?: boolean; - skipLockCheckForImmediateRetry?: boolean; - }) { - if (!skipLockCheck && this.activeRunExecution) { - this.sendDebugLog({ - runId: runFriendlyId, - message: "startAndExecuteRunAttempt: already in progress", - }); - return; - } - - const execution = async () => { - if (!this.socket) { - this.sendDebugLog({ - runId: runFriendlyId, - message: "Starting run without socket connection", - }); - } - - this.subscribeToRunNotifications({ - run: { friendlyId: runFriendlyId }, - snapshot: { friendlyId: snapshotFriendlyId }, - }); - - const attemptStartedAt = Date.now(); - - const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, { - isWarmStart, - }); - - if (!start.success) { - this.sendDebugLog({ - runId: runFriendlyId, - message: "Failed to start run", - properties: { error: start.error }, - }); - - this.sendDebugLog({ - runId: runFriendlyId, - message: "failed to start run attempt", - properties: { - error: start.error, - }, - }); - - this.waitForNextRun(); - return; - } - - const attemptDuration = Date.now() - attemptStartedAt; - - const { run, snapshot, execution, envVars } = start.data; - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Started run", - properties: { snapshot: snapshot.friendlyId }, - }); - - this.enterRunPhase(run, snapshot); - - const metrics = [ - { - name: "start", - event: "create_attempt", - timestamp: attemptStartedAt, - duration: attemptDuration, - }, - ] - .concat( - dequeuedAt - ? [ - { - name: "start", - event: "dequeue", - timestamp: dequeuedAt.getTime(), - duration: 0, - }, - ] - : [] - ) - .concat( - podScheduledAt - ? [ - { - name: "start", - event: "pod_scheduled", - timestamp: podScheduledAt.getTime(), - duration: 0, - }, - ] - : [] - ) satisfies TaskRunExecutionMetrics; - - const taskRunEnv = { - ...gatherProcessEnv(), - ...envVars, - }; - - try { - return await this.executeRun({ - run, - snapshot, - envVars: taskRunEnv, - execution, - metrics, - isWarmStart, - }); - } catch (error) { - if (error instanceof SuspendedProcessError) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run was suspended and task run process was killed, waiting for next run", - properties: { run: run.friendlyId, snapshot: snapshot.friendlyId }, - }); - - this.waitForNextRun(); - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Error while executing attempt", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Submitting attempt completion", - properties: { - snapshotId: snapshot.friendlyId, - updatedSnapshotId: this.snapshotFriendlyId, - }, - }); - - const completion = { - id: execution.run.id, - ok: false, - retry: undefined, - error: TaskRunProcess.parseExecuteError(error), - } satisfies TaskRunFailedExecutionResult; - - const completionResult = await this.httpClient.completeRunAttempt( - run.friendlyId, - // FIXME: if the snapshot has changed since starting the run, this won't be accurate - // ..but we probably shouldn't fetch the latest snapshot either because we may be in an "unhealthy" state while the next runner has already taken over - this.snapshotFriendlyId ?? snapshot.friendlyId, - { completion } - ); - - if (!completionResult.success) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to submit completion after error", - properties: { error: completionResult.error }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "completion: failed to submit after error", - properties: { - error: completionResult.error, - }, - }); - - this.waitForNextRun(); - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Attempt completion submitted after error", - properties: { - attemptStatus: completionResult.data.result.attemptStatus, - runId: completionResult.data.result.run.friendlyId, - snapshotId: completionResult.data.result.snapshot.friendlyId, - }, - }); - - try { - await this.handleCompletionResult(completion, completionResult.data.result); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to handle completion result after error", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - - this.waitForNextRun(); - return; - } - } - }; - - this.activeRunExecution = execution(); - - try { - await this.activeRunExecution; - } catch (error) { - this.sendDebugLog({ - runId: runFriendlyId, - message: "startAndExecuteRunAttempt: unexpected error", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - } finally { - this.activeRunExecution = null; - } - } - - private waitForNextRunLock = false; - - /** This will kill the child process before spinning up a new one. It will never throw, - * but may exit the process on any errors or when no runs are available after the - * configured duration. */ - private async waitForNextRun() { - if (this.waitForNextRunLock) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: already in progress", - }); - return; - } - - this.waitForNextRunLock = true; - const previousRunId = this.runFriendlyId; - - try { - // If there's a run execution in progress, we need to kill it and wait for it to finish - if (this.activeRunExecution) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: waiting for existing run execution to finish", - }); - await this.activeRunExecution; - } - - // Just for good measure - await this.taskRunProcess?.kill("SIGKILL"); - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: waiting for next run", - }); - - this.enterWarmStartPhase(); - - if (!this.warmStartClient) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: warm starts disabled, shutting down", - }); - this.exitProcess(this.successExitCode); - } - - if (this.taskRunProcess) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: eagerly recreating task run process with options", - }); - this.taskRunProcess = new TaskRunProcess({ - ...this.taskRunProcess.options, - isWarmStart: true, - }).initialize(); - } else { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: no existing task run process, so we can't eagerly recreate it", - }); - } - - // Check the service is up and get additional warm start config - const connect = await this.warmStartClient.connect(); - - if (!connect.success) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: failed to connect to warm start service", - properties: { - warmStartUrl: env.TRIGGER_WARM_START_URL, - error: connect.error, - }, - }); - this.exitProcess(this.successExitCode); - } - - const connectionTimeoutMs = - connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; - const keepaliveMs = connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: connected to warm start service", - properties: { - connectionTimeoutMs, - keepaliveMs, - }, - }); - - if (previousRunId) { - this.sendDebugLog({ - runId: previousRunId, - message: "warm start: received config", - properties: { - connectionTimeoutMs, - keepaliveMs, - }, - }); - } - - if (!connectionTimeoutMs || !keepaliveMs) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: warm starts disabled after connect", - properties: { - connectionTimeoutMs, - keepaliveMs, - }, - }); - this.exitProcess(this.successExitCode); - } - - const nextRun = await this.warmStartClient.warmStart({ - workerInstanceName: this.workerInstanceName, - connectionTimeoutMs, - keepaliveMs, - }); - - if (!nextRun) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: warm start failed, shutting down", - }); - this.exitProcess(this.successExitCode); - } - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: got next run", - properties: { nextRun: nextRun.run.friendlyId }, - }); - - this.startAndExecuteRunAttempt({ - runFriendlyId: nextRun.run.friendlyId, - snapshotFriendlyId: nextRun.snapshot.friendlyId, - dequeuedAt: nextRun.dequeuedAt, - isWarmStart: true, - }).finally(() => {}); - return; - } catch (error) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "waitForNextRun: unexpected error", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - this.exitProcess(this.failureExitCode); - } finally { - this.waitForNextRunLock = false; - } - } - - private exitProcess(code?: number): never { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Exiting process", - properties: { code }, - }); - if (this.taskRunProcess?.isPreparedForNextRun) { - this.taskRunProcess.forceExit(); - } - process.exit(code); - } - - createSocket() { - const wsUrl = new URL("/workload", this.workerApiUrl); - - this.socket = io(wsUrl.href, { - transports: ["websocket"], - extraHeaders: { - [WORKLOAD_HEADERS.DEPLOYMENT_ID]: env.TRIGGER_DEPLOYMENT_ID, - [WORKLOAD_HEADERS.RUNNER_ID]: env.TRIGGER_RUNNER_ID, - }, - }); - this.socket.on("run:notify", async ({ version, run }) => { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify received by runner", - properties: { version, runId: run.friendlyId }, - }); - - if (!this.runFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify: ignoring notification, no local run ID", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - }, - }); - return; - } - - if (run.friendlyId !== this.runFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify: ignoring notification for different run", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - notificationRunId: run.friendlyId, - }, - }); - return; - } - - // Reset the (fallback) snapshot poll interval so we don't do unnecessary work - this.snapshotPoller.resetCurrentInterval(); - - const latestSnapshot = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!latestSnapshot.success) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "run:notify: failed to get latest snapshot data", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - error: latestSnapshot.error, - }, - }); - return; - } - - await this.handleSnapshotChange(latestSnapshot.data.execution); - }); - this.socket.on("connect", () => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Connected to supervisor", - }); - - // This should handle the case where we reconnect after being restored - if (this.state.phase === "RUN") { - const { run, snapshot } = this.state; - this.subscribeToRunNotifications({ run, snapshot }); - } - }); - this.socket.on("connect_error", (error) => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Connection error", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - }); - this.socket.on("disconnect", (reason, description) => { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Disconnected from supervisor", - properties: { reason, description: description?.toString() }, - }); - }); - } - - private async executeRun({ - run, - snapshot, - envVars, - execution, - metrics, - isWarmStart, - }: WorkloadRunAttemptStartResponseBody & { - metrics?: TaskRunExecutionMetrics; - isWarmStart?: boolean; - }) { - this.snapshotPoller.start(); - - if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) { - this.taskRunProcess = new TaskRunProcess({ - workerManifest: this.workerManifest, - env: envVars, - serverWorker: { - id: "unmanaged", - contentHash: env.TRIGGER_CONTENT_HASH, - version: env.TRIGGER_DEPLOYMENT_VERSION, - engine: "V2", - }, - machine: execution.machine, - isWarmStart, - }).initialize(); - } - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "executing task run process", - properties: { - attemptId: execution.attempt.id, - runId: execution.run.id, - }, - }); - - const completion = await this.taskRunProcess.execute( - { - payload: { - execution, - traceContext: execution.run.traceContext ?? {}, - metrics, - }, - messageId: run.friendlyId, - env: envVars, - }, - isWarmStart - ); - - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Completed run", - properties: { completion: completion.ok }, - }); - - try { - // The execution has finished, so we can cleanup the task run process. Killing it should be safe. - await this.taskRunProcess.cleanup(true); - } catch (error) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Failed to cleanup task run process, submitting completion anyway", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - } - - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "executeRun: Missing run ID or snapshot ID after execution", - properties: { - runId: this.runFriendlyId, - snapshotId: this.snapshotFriendlyId, - }, - }); - - this.waitForNextRun(); - return; - } - - const completionResult = await this.httpClient.completeRunAttempt( - this.runFriendlyId, - this.snapshotFriendlyId, - { - completion, - } - ); - - if (!completionResult.success) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "completion: failed to submit", - properties: { - error: completionResult.error, - }, - }); - - this.sendDebugLog({ - runId: run.friendlyId, - message: "completion: failed to submit", - properties: { - error: completionResult.error, - }, - }); - - this.waitForNextRun(); - return; - } - - this.sendDebugLog({ - runId: run.friendlyId, - message: "Attempt completion submitted", - properties: { - attemptStatus: completionResult.data.result.attemptStatus, - runId: completionResult.data.result.run.friendlyId, - snapshotId: completionResult.data.result.snapshot.friendlyId, - }, - }); - - try { - await this.handleCompletionResult(completion, completionResult.data.result); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to handle completion result", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - - this.waitForNextRun(); - return; - } - } - - private async handleCompletionResult( - completion: TaskRunExecutionResult, - result: CompleteRunAttemptResult - ) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Handling completion result", - properties: { - completion: completion.ok, - attemptStatus: result.attemptStatus, - snapshotId: result.snapshot.friendlyId, - runId: result.run.friendlyId, - }, - }); - - const { attemptStatus, snapshot: completionSnapshot, run } = result; - - try { - this.updateRunPhase(run, completionSnapshot); - } catch (error) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Failed to update run phase after completion", - properties: { error: error instanceof Error ? error.message : String(error) }, - }); - - this.waitForNextRun(); - return; - } - - if (attemptStatus === "RUN_FINISHED") { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run finished", - }); - - this.waitForNextRun(); - return; - } - - if (attemptStatus === "RUN_PENDING_CANCEL") { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Run pending cancel", - }); - return; - } - - if (attemptStatus === "RETRY_QUEUED") { - this.sendDebugLog({ - runId: run.friendlyId, - message: "Retry queued", - }); - - this.waitForNextRun(); - return; - } - - if (attemptStatus === "RETRY_IMMEDIATELY") { - if (completion.ok) { - throw new Error("Should retry but completion OK."); - } - - if (!completion.retry) { - throw new Error("Should retry but missing retry params."); - } - - await sleep(completion.retry.delay); - - if (!this.snapshotFriendlyId) { - throw new Error("Missing snapshot ID after retry"); - } - - this.startAndExecuteRunAttempt({ - runFriendlyId: run.friendlyId, - snapshotFriendlyId: this.snapshotFriendlyId, - skipLockCheckForImmediateRetry: true, - isWarmStart: true, - }).finally(() => {}); - return; - } - - assertExhaustive(attemptStatus); - } - - sendDebugLog({ - runId, - message, - date, - properties, - }: { - runId?: string; - message: string; - date?: Date; - properties?: WorkloadDebugLogRequestBody["properties"]; - }) { - if (!runId) { - runId = this.runFriendlyId; - } - - if (!runId) { - runId = env.TRIGGER_RUN_ID; - } - - if (!runId) { - return; - } - - const mergedProperties = { - ...properties, - runId, - runnerId: this.runnerId, - workerName: this.workerInstanceName, - }; - - console.log(message, mergedProperties); - - this.httpClient.sendDebugLog(runId, { - message, - time: date ?? new Date(), - properties: mergedProperties, - }); - } - - async cancelAttempt(runId: string) { - this.sendDebugLog({ - runId, - message: "cancelling attempt", - properties: { runId }, - }); - - await this.taskRunProcess?.cancel(); - } - - async start() { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Starting up", - }); - - // Websocket notifications are only an optimisation so we don't need to wait for a successful connection - this.createSocket(); - - // If we have run and snapshot IDs, we can start an attempt immediately - if (env.TRIGGER_RUN_ID && env.TRIGGER_SNAPSHOT_ID) { - this.startAndExecuteRunAttempt({ - runFriendlyId: env.TRIGGER_RUN_ID, - snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID, - dequeuedAt: env.TRIGGER_DEQUEUED_AT_MS, - podScheduledAt: env.TRIGGER_POD_SCHEDULED_AT_MS, - }).finally(() => {}); - return; - } - - // ..otherwise we need to wait for a run - this.waitForNextRun(); - return; - } - - async stop() { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "Shutting down", - }); - - if (this.taskRunProcess) { - await this.taskRunProcess.cleanup(true); - } - - this.runHeartbeat.stop(); - this.snapshotPoller.stop(); - - this.socket.close(); - } -} - -const workerManifest = await loadWorkerManifest(); - -const prodWorker = new ManagedRunController({ workerManifest }); -await prodWorker.start(); - -function gatherProcessEnv(): Record { - const $env = { - NODE_ENV: env.NODE_ENV, - NODE_EXTRA_CA_CERTS: env.NODE_EXTRA_CA_CERTS, - OTEL_EXPORTER_OTLP_ENDPOINT: env.OTEL_EXPORTER_OTLP_ENDPOINT, - }; - - // Filter out undefined values - return Object.fromEntries( - Object.entries($env).filter(([key, value]) => value !== undefined) - ) as Record; -} - -async function loadWorkerManifest() { - const manifest = await readJSONFile("./index.json"); - return WorkerManifest.parse(manifest); -} +new ManagedRunController({ + workerManifest, + env: stdEnv, +}).start(); diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts new file mode 100644 index 0000000000..35fec13932 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -0,0 +1,552 @@ +import { WorkerManifest } from "@trigger.dev/core/v3"; +import { + WarmStartClient, + WORKLOAD_HEADERS, + type WorkloadClientToServerEvents, + WorkloadHttpClient, + type WorkloadServerToClientEvents, +} from "@trigger.dev/core/v3/workers"; +import { io, type Socket } from "socket.io-client"; +import { RunnerEnv } from "./env.js"; +import { RunLogger, SendDebugLogOptions } from "./logger.js"; +import { EnvObject } from "std-env"; +import { RunExecution } from "./execution.js"; +import { tryCatch } from "@trigger.dev/core/utils"; + +type ManagedRunControllerOptions = { + workerManifest: WorkerManifest; + env: EnvObject; +}; + +type SupervisorSocket = Socket; + +export class ManagedRunController { + private readonly env: RunnerEnv; + private readonly workerManifest: WorkerManifest; + private readonly httpClient: WorkloadHttpClient; + private readonly warmStartClient: WarmStartClient | undefined; + private socket: SupervisorSocket; + private readonly logger: RunLogger; + + private warmStartCount = 0; + private restoreCount = 0; + + private currentExecution: RunExecution | null = null; + + constructor(opts: ManagedRunControllerOptions) { + const env = new RunnerEnv(opts.env); + this.env = env; + + this.workerManifest = opts.workerManifest; + + this.httpClient = new WorkloadHttpClient({ + workerApiUrl: this.workerApiUrl, + runnerId: this.runnerId, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, + projectRef: env.TRIGGER_PROJECT_REF, + }); + + this.logger = new RunLogger({ + httpClient: this.httpClient, + env, + }); + + const properties = { + ...env.raw, + TRIGGER_POD_SCHEDULED_AT_MS: env.TRIGGER_POD_SCHEDULED_AT_MS.toISOString(), + TRIGGER_DEQUEUED_AT_MS: env.TRIGGER_DEQUEUED_AT_MS.toISOString(), + }; + + this.sendDebugLog({ + runId: env.TRIGGER_RUN_ID, + message: "Creating run controller", + properties, + }); + + if (env.TRIGGER_WARM_START_URL) { + this.warmStartClient = new WarmStartClient({ + apiUrl: new URL(env.TRIGGER_WARM_START_URL), + controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, + machineCpu: env.TRIGGER_MACHINE_CPU, + machineMemory: env.TRIGGER_MACHINE_MEMORY, + }); + } + + // Websocket notifications are only an optimisation so we don't need to wait for a successful connection + this.socket = this.createSupervisorSocket(); + + process.on("SIGTERM", async () => { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Received SIGTERM, stopping worker", + }); + await this.stop(); + }); + } + + get metrics() { + return { + warmStartCount: this.warmStartCount, + restoreCount: this.restoreCount, + }; + } + + get runnerId() { + return this.env.TRIGGER_RUNNER_ID; + } + + get successExitCode() { + return this.env.TRIGGER_SUCCESS_EXIT_CODE; + } + + get failureExitCode() { + return this.env.TRIGGER_FAILURE_EXIT_CODE; + } + + get workerApiUrl() { + return this.env.TRIGGER_SUPERVISOR_API_URL; + } + + get workerInstanceName() { + return this.env.TRIGGER_WORKER_INSTANCE_NAME; + } + + private subscribeToRunNotifications(runFriendlyId: string, snapshotFriendlyId: string) { + this.socket.emit("run:start", { + version: "1", + run: { + friendlyId: runFriendlyId, + }, + snapshot: { + friendlyId: snapshotFriendlyId, + }, + }); + } + + private unsubscribeFromRunNotifications(runFriendlyId: string, snapshotFriendlyId: string) { + this.socket.emit("run:stop", { + version: "1", + run: { + friendlyId: runFriendlyId, + }, + snapshot: { + friendlyId: snapshotFriendlyId, + }, + }); + } + + private get runFriendlyId() { + return this.currentExecution?.runFriendlyId; + } + + private get snapshotFriendlyId() { + return this.currentExecution?.currentSnapshotFriendlyId; + } + + private lockedRunExecution: Promise | null = null; + + private async startRunExecution({ + runFriendlyId, + snapshotFriendlyId, + dequeuedAt, + podScheduledAt, + isWarmStart, + previousRunId, + }: { + runFriendlyId: string; + snapshotFriendlyId: string; + dequeuedAt?: Date; + podScheduledAt?: Date; + isWarmStart?: boolean; + previousRunId?: string; + }) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "startAndExecuteRunAttempt()", + properties: { previousRunId }, + }); + + if (this.lockedRunExecution) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "startAndExecuteRunAttempt: execution already locked", + }); + return; + } + + const execution = async () => { + if (!this.currentExecution || !this.currentExecution.isPreparedForNextRun) { + this.currentExecution = new RunExecution({ + workerManifest: this.workerManifest, + env: this.env, + httpClient: this.httpClient, + logger: this.logger, + }); + } + + // Subscribe to run notifications + this.subscribeToRunNotifications(runFriendlyId, snapshotFriendlyId); + + // We're prepared for the next run so we can start executing + await this.currentExecution.execute({ + runFriendlyId, + snapshotFriendlyId, + dequeuedAt, + podScheduledAt, + isWarmStart, + }); + }; + + this.lockedRunExecution = execution(); + + const [error] = await tryCatch(this.lockedRunExecution); + + if (error) { + this.sendDebugLog({ + runId: runFriendlyId, + message: "Error during execution", + properties: { error: error.message }, + }); + } + + const metrics = this.currentExecution?.metrics; + + if (metrics?.restoreCount) { + this.restoreCount += metrics.restoreCount; + } + + this.lockedRunExecution = null; + this.unsubscribeFromRunNotifications(runFriendlyId, snapshotFriendlyId); + this.waitForNextRun(); + } + + private waitForNextRunLock = false; + + /** + * This will eagerly create a new run execution. It will never throw, but may exit + * the process on any errors or when no runs are available after the configured duration. + */ + private async waitForNextRun() { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun()", + }); + + if (this.waitForNextRunLock) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: already in progress, skipping", + }); + return; + } + + if (this.lockedRunExecution) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: execution locked, skipping", + }); + return; + } + + this.waitForNextRunLock = true; + + try { + if (!this.warmStartClient) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm starts disabled, shutting down", + }); + this.exitProcess(this.successExitCode); + } + + const previousRunId = this.runFriendlyId; + + if (this.currentExecution?.taskRunEnv) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: eagerly recreating task run process", + }); + + const previousTaskRunEnv = this.currentExecution.taskRunEnv; + + this.currentExecution = new RunExecution({ + workerManifest: this.workerManifest, + env: this.env, + httpClient: this.httpClient, + logger: this.logger, + }).prepareForExecution({ + taskRunEnv: previousTaskRunEnv, + }); + } + + // Check the service is up and get additional warm start config + const connect = await this.warmStartClient.connect(); + + if (!connect.success) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: failed to connect to warm start service", + properties: { + warmStartUrl: this.env.TRIGGER_WARM_START_URL, + error: connect.error, + }, + }); + this.exitProcess(this.successExitCode); + } + + const connectionTimeoutMs = + connect.data.connectionTimeoutMs ?? this.env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; + const keepaliveMs = connect.data.keepaliveMs ?? this.env.TRIGGER_WARM_START_KEEPALIVE_MS; + + const warmStartConfig = { + connectionTimeoutMs, + keepaliveMs, + }; + + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: connected to warm start service", + properties: warmStartConfig, + }); + + if (!connectionTimeoutMs || !keepaliveMs) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm starts disabled after connect", + properties: warmStartConfig, + }); + this.exitProcess(this.successExitCode); + } + + const nextRun = await this.warmStartClient.warmStart({ + workerInstanceName: this.workerInstanceName, + connectionTimeoutMs, + keepaliveMs, + }); + + if (!nextRun) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: warm start failed, shutting down", + properties: warmStartConfig, + }); + this.exitProcess(this.successExitCode); + } + + this.warmStartCount++; + + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: got next run", + properties: { + ...warmStartConfig, + nextRunId: nextRun.run.friendlyId, + }, + }); + + this.startRunExecution({ + runFriendlyId: nextRun.run.friendlyId, + snapshotFriendlyId: nextRun.snapshot.friendlyId, + dequeuedAt: nextRun.dequeuedAt, + isWarmStart: true, + previousRunId, + }).finally(() => {}); + } catch (error) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "waitForNextRun: unexpected error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + this.exitProcess(this.failureExitCode); + } finally { + this.waitForNextRunLock = false; + } + } + + private exitProcess(code?: number): never { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Exiting process", + properties: { code }, + }); + + this.currentExecution?.exit(); + + process.exit(code); + } + + createSupervisorSocket(): SupervisorSocket { + const wsUrl = new URL("/workload", this.workerApiUrl); + + const socket = io(wsUrl.href, { + transports: ["websocket"], + extraHeaders: { + [WORKLOAD_HEADERS.DEPLOYMENT_ID]: this.env.TRIGGER_DEPLOYMENT_ID, + [WORKLOAD_HEADERS.RUNNER_ID]: this.env.TRIGGER_RUNNER_ID, + }, + }) satisfies SupervisorSocket; + + socket.on("run:notify", async ({ version, run }) => { + this.sendDebugLog({ + runId: run.friendlyId, + message: "run:notify received by runner", + properties: { version, runId: run.friendlyId }, + }); + + if (!this.runFriendlyId) { + this.sendDebugLog({ + runId: run.friendlyId, + message: "run:notify: ignoring notification, no local run ID", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + }, + }); + return; + } + + if (run.friendlyId !== this.runFriendlyId) { + this.sendDebugLog({ + runId: run.friendlyId, + message: "run:notify: ignoring notification for different run", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + notificationRunId: run.friendlyId, + }, + }); + return; + } + + const latestSnapshot = await this.httpClient.getRunExecutionData(this.runFriendlyId); + + if (!latestSnapshot.success) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "run:notify: failed to get latest snapshot data", + properties: { + currentRunId: this.runFriendlyId, + currentSnapshotId: this.snapshotFriendlyId, + error: latestSnapshot.error, + }, + }); + return; + } + + const runExecutionData = latestSnapshot.data.execution; + + if (!this.currentExecution) { + this.sendDebugLog({ + runId: runExecutionData.run.friendlyId, + message: "handleSnapshotChange: no current execution", + }); + return; + } + + const [error] = await tryCatch(this.currentExecution.handleSnapshotChange(runExecutionData)); + + if (error) { + this.sendDebugLog({ + runId: runExecutionData.run.friendlyId, + message: "handleSnapshotChange: unexpected error", + properties: { error: error.message }, + }); + } + }); + + socket.on("connect", () => { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Socket connected to supervisor", + }); + + // This should handle the case where we reconnect after being restored + if ( + this.runFriendlyId && + this.snapshotFriendlyId && + this.runFriendlyId !== this.env.TRIGGER_RUN_ID + ) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Subscribing to notifications for in-progress run", + }); + this.subscribeToRunNotifications(this.runFriendlyId, this.snapshotFriendlyId); + } + }); + + socket.on("connect_error", (error) => { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Socket connection error", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + }); + + socket.on("disconnect", (reason, description) => { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Socket disconnected from supervisor", + properties: { reason, description: description?.toString() }, + }); + }); + + return socket; + } + + async cancelAttempt(runId: string) { + this.sendDebugLog({ + runId, + message: "cancelling attempt", + properties: { runId }, + }); + + await this.currentExecution?.cancel(); + } + + start() { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Starting up", + }); + + // If we have run and snapshot IDs, we can start an attempt immediately + if (this.env.TRIGGER_RUN_ID && this.env.TRIGGER_SNAPSHOT_ID) { + this.startRunExecution({ + runFriendlyId: this.env.TRIGGER_RUN_ID, + snapshotFriendlyId: this.env.TRIGGER_SNAPSHOT_ID, + dequeuedAt: this.env.TRIGGER_DEQUEUED_AT_MS, + podScheduledAt: this.env.TRIGGER_POD_SCHEDULED_AT_MS, + }).finally(() => {}); + return; + } + + // ..otherwise we need to wait for a run + this.waitForNextRun(); + return; + } + + async stop() { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Shutting down", + }); + + await this.currentExecution?.cancel(); + this.socket.close(); + } + + sendDebugLog(opts: SendDebugLogOptions) { + this.logger.sendDebugLog({ + ...opts, + message: `[controller] ${opts.message}`, + properties: { + ...opts.properties, + runnerWarmStartCount: this.warmStartCount, + runnerRestoreCount: this.restoreCount, + }, + }); + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/env.ts b/packages/cli-v3/src/entryPoints/managed/env.ts new file mode 100644 index 0000000000..1355f68d82 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/env.ts @@ -0,0 +1,223 @@ +import { randomUUID } from "node:crypto"; +import { Metadata } from "./overrides.js"; +import { z } from "zod"; +import { EnvObject } from "std-env"; + +const DateEnv = z + .string() + .transform((val) => new Date(parseInt(val, 10))) + .pipe(z.date()); + +// All IDs are friendly IDs +const Env = z.object({ + // Set at build time + TRIGGER_CONTENT_HASH: z.string(), + TRIGGER_DEPLOYMENT_ID: z.string(), + TRIGGER_DEPLOYMENT_VERSION: z.string(), + TRIGGER_PROJECT_ID: z.string(), + TRIGGER_PROJECT_REF: z.string(), + NODE_ENV: z.string().default("production"), + NODE_EXTRA_CA_CERTS: z.string().optional(), + + // Set at runtime + TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), + TRIGGER_ENV_ID: z.string(), + TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts + TRIGGER_SNAPSHOT_ID: z.string().optional(), // This is only useful for cold starts + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), + TRIGGER_WARM_START_URL: z.string().optional(), + TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS: z.coerce.number().default(30_000), + TRIGGER_WARM_START_KEEPALIVE_MS: z.coerce.number().default(300_000), + TRIGGER_MACHINE_CPU: z.string().default("0"), + TRIGGER_MACHINE_MEMORY: z.string().default("0"), + TRIGGER_RUNNER_ID: z.string(), + TRIGGER_METADATA_URL: z.string().optional(), + TRIGGER_PRE_SUSPEND_WAIT_MS: z.coerce.number().default(200), + + // Timeline metrics + TRIGGER_POD_SCHEDULED_AT_MS: DateEnv, + TRIGGER_DEQUEUED_AT_MS: DateEnv, + + // May be overridden + TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), + TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), + TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), + TRIGGER_WORKER_INSTANCE_NAME: z.string(), + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), + TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0), + TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1), +}); + +type Env = z.infer; + +export class RunnerEnv { + private env: Env; + public readonly initial: Env; + + constructor(env: EnvObject) { + this.env = Env.parse(env); + this.initial = { ...this.env }; + } + + get raw() { + return this.env; + } + + // Base environment variables + get NODE_ENV() { + return this.env.NODE_ENV; + } + get NODE_EXTRA_CA_CERTS() { + return this.env.NODE_EXTRA_CA_CERTS; + } + get OTEL_EXPORTER_OTLP_ENDPOINT() { + return this.env.OTEL_EXPORTER_OTLP_ENDPOINT; + } + get TRIGGER_CONTENT_HASH() { + return this.env.TRIGGER_CONTENT_HASH; + } + get TRIGGER_DEPLOYMENT_ID() { + return this.env.TRIGGER_DEPLOYMENT_ID; + } + get TRIGGER_DEPLOYMENT_VERSION() { + return this.env.TRIGGER_DEPLOYMENT_VERSION; + } + get TRIGGER_PROJECT_ID() { + return this.env.TRIGGER_PROJECT_ID; + } + get TRIGGER_PROJECT_REF() { + return this.env.TRIGGER_PROJECT_REF; + } + get TRIGGER_WORKLOAD_CONTROLLER_ID() { + return this.env.TRIGGER_WORKLOAD_CONTROLLER_ID; + } + get TRIGGER_ENV_ID() { + return this.env.TRIGGER_ENV_ID; + } + get TRIGGER_RUN_ID() { + return this.env.TRIGGER_RUN_ID; + } + get TRIGGER_SNAPSHOT_ID() { + return this.env.TRIGGER_SNAPSHOT_ID; + } + get TRIGGER_WARM_START_URL() { + return this.env.TRIGGER_WARM_START_URL; + } + get TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS() { + return this.env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; + } + get TRIGGER_WARM_START_KEEPALIVE_MS() { + return this.env.TRIGGER_WARM_START_KEEPALIVE_MS; + } + get TRIGGER_MACHINE_CPU() { + return this.env.TRIGGER_MACHINE_CPU; + } + get TRIGGER_MACHINE_MEMORY() { + return this.env.TRIGGER_MACHINE_MEMORY; + } + get TRIGGER_METADATA_URL() { + return this.env.TRIGGER_METADATA_URL; + } + get TRIGGER_PRE_SUSPEND_WAIT_MS() { + return this.env.TRIGGER_PRE_SUSPEND_WAIT_MS; + } + get TRIGGER_POD_SCHEDULED_AT_MS() { + return this.env.TRIGGER_POD_SCHEDULED_AT_MS; + } + get TRIGGER_DEQUEUED_AT_MS() { + return this.env.TRIGGER_DEQUEUED_AT_MS; + } + + // Overridable values + get TRIGGER_SUCCESS_EXIT_CODE() { + return this.env.TRIGGER_SUCCESS_EXIT_CODE; + } + get TRIGGER_FAILURE_EXIT_CODE() { + return this.env.TRIGGER_FAILURE_EXIT_CODE; + } + get TRIGGER_HEARTBEAT_INTERVAL_SECONDS() { + return this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + } + get TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS() { + return this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + } + get TRIGGER_WORKER_INSTANCE_NAME() { + return this.env.TRIGGER_WORKER_INSTANCE_NAME; + } + get TRIGGER_RUNNER_ID() { + return this.env.TRIGGER_RUNNER_ID; + } + + get TRIGGER_SUPERVISOR_API_PROTOCOL() { + return this.env.TRIGGER_SUPERVISOR_API_PROTOCOL; + } + + get TRIGGER_SUPERVISOR_API_DOMAIN() { + return this.env.TRIGGER_SUPERVISOR_API_DOMAIN; + } + + get TRIGGER_SUPERVISOR_API_PORT() { + return this.env.TRIGGER_SUPERVISOR_API_PORT; + } + + get TRIGGER_SUPERVISOR_API_URL() { + return `${this.TRIGGER_SUPERVISOR_API_PROTOCOL}://${this.TRIGGER_SUPERVISOR_API_DOMAIN}:${this.TRIGGER_SUPERVISOR_API_PORT}`; + } + + /** Overrides existing env vars with new values */ + override(overrides: Metadata) { + if (overrides.TRIGGER_SUCCESS_EXIT_CODE) { + this.env.TRIGGER_SUCCESS_EXIT_CODE = overrides.TRIGGER_SUCCESS_EXIT_CODE; + } + + if (overrides.TRIGGER_FAILURE_EXIT_CODE) { + this.env.TRIGGER_FAILURE_EXIT_CODE = overrides.TRIGGER_FAILURE_EXIT_CODE; + } + + if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { + this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + } + + if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { + this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS = + overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + } + + if (overrides.TRIGGER_WORKER_INSTANCE_NAME) { + this.env.TRIGGER_WORKER_INSTANCE_NAME = overrides.TRIGGER_WORKER_INSTANCE_NAME; + } + + if (overrides.TRIGGER_SUPERVISOR_API_PROTOCOL) { + this.env.TRIGGER_SUPERVISOR_API_PROTOCOL = overrides.TRIGGER_SUPERVISOR_API_PROTOCOL as + | "http" + | "https"; + } + + if (overrides.TRIGGER_SUPERVISOR_API_DOMAIN) { + this.env.TRIGGER_SUPERVISOR_API_DOMAIN = overrides.TRIGGER_SUPERVISOR_API_DOMAIN; + } + + if (overrides.TRIGGER_SUPERVISOR_API_PORT) { + this.env.TRIGGER_SUPERVISOR_API_PORT = overrides.TRIGGER_SUPERVISOR_API_PORT; + } + + if (overrides.TRIGGER_RUNNER_ID) { + this.env.TRIGGER_RUNNER_ID = overrides.TRIGGER_RUNNER_ID; + } + } + + // Helper method to get process env for task runs + gatherProcessEnv(): Record { + const $env = { + NODE_ENV: this.NODE_ENV, + NODE_EXTRA_CA_CERTS: this.NODE_EXTRA_CA_CERTS, + OTEL_EXPORTER_OTLP_ENDPOINT: this.OTEL_EXPORTER_OTLP_ENDPOINT, + }; + + // Filter out undefined values + return Object.fromEntries( + Object.entries($env).filter(([key, value]) => value !== undefined) + ) as Record; + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts new file mode 100644 index 0000000000..bfefeee27f --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -0,0 +1,923 @@ +import { + type CompleteRunAttemptResult, + type RunExecutionData, + SuspendedProcessError, + type TaskRunExecutionMetrics, + type TaskRunExecutionResult, + TaskRunExecutionRetry, + type TaskRunFailedExecutionResult, + WorkerManifest, +} from "@trigger.dev/core/v3"; +import { type WorkloadRunAttemptStartResponseBody } from "@trigger.dev/core/v3/workers"; +import { TaskRunProcess } from "../../executions/taskRunProcess.js"; +import { RunLogger, SendDebugLogOptions } from "./logger.js"; +import { RunnerEnv } from "./env.js"; +import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers"; +import { setTimeout as sleep } from "timers/promises"; +import { RunExecutionHeartbeat } from "./heartbeat.js"; +import { RunExecutionSnapshotPoller } from "./poller.js"; +import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils"; +import { MetadataClient } from "./overrides.js"; +import { randomBytes } from "node:crypto"; + +class ExecutionAbortError extends Error { + constructor(message: string) { + super(message); + this.name = "ExecutionAbortError"; + } +} + +type RunExecutionOptions = { + workerManifest: WorkerManifest; + env: RunnerEnv; + httpClient: WorkloadHttpClient; + logger: RunLogger; +}; + +type RunExecutionPrepareOptions = { + taskRunEnv: Record; +}; + +type RunExecutionRunOptions = { + runFriendlyId: string; + snapshotFriendlyId: string; + dequeuedAt?: Date; + podScheduledAt?: Date; + isWarmStart?: boolean; +}; + +export class RunExecution { + private id: string; + private executionAbortController: AbortController; + + private _runFriendlyId?: string; + private currentSnapshotId?: string; + private currentTaskRunEnv?: Record; + + private dequeuedAt?: Date; + private podScheduledAt?: Date; + private readonly workerManifest: WorkerManifest; + private readonly env: RunnerEnv; + private readonly httpClient: WorkloadHttpClient; + private readonly logger: RunLogger; + private restoreCount: number; + + private taskRunProcess?: TaskRunProcess; + private runHeartbeat?: RunExecutionHeartbeat; + private snapshotPoller?: RunExecutionSnapshotPoller; + + constructor(opts: RunExecutionOptions) { + this.id = randomBytes(4).toString("hex"); + this.workerManifest = opts.workerManifest; + this.env = opts.env; + this.httpClient = opts.httpClient; + this.logger = opts.logger; + + this.restoreCount = 0; + this.executionAbortController = new AbortController(); + } + + /** + * Prepares the execution with task run environment variables. + * This should be called before executing, typically after a successful run to prepare for the next one. + */ + public prepareForExecution(opts: RunExecutionPrepareOptions): this { + if (this.taskRunProcess) { + throw new Error("prepareForExecution called after process was already created"); + } + + if (this.isPreparedForNextRun) { + throw new Error("prepareForExecution called after execution was already prepared"); + } + + this.taskRunProcess = this.createTaskRunProcess({ + envVars: opts.taskRunEnv, + isWarmStart: true, + }); + + return this; + } + + private createTaskRunProcess({ + envVars, + isWarmStart, + }: { + envVars: Record; + isWarmStart?: boolean; + }) { + return new TaskRunProcess({ + workerManifest: this.workerManifest, + env: { + ...envVars, + ...this.env.gatherProcessEnv(), + }, + serverWorker: { + id: "managed", + contentHash: this.env.TRIGGER_CONTENT_HASH, + version: this.env.TRIGGER_DEPLOYMENT_VERSION, + engine: "V2", + }, + machineResources: { + cpu: Number(this.env.TRIGGER_MACHINE_CPU), + memory: Number(this.env.TRIGGER_MACHINE_MEMORY), + }, + isWarmStart, + }).initialize(); + } + + /** + * Returns true if the execution has been prepared with task run env. + */ + get isPreparedForNextRun(): boolean { + return !!this.taskRunProcess?.isPreparedForNextRun; + } + + /** + * Called by the RunController when it receives a websocket notification + * or when the snapshot poller detects a change + */ + public async handleSnapshotChange(runData: RunExecutionData): Promise { + const { run, snapshot, completedWaitpoints } = runData; + + const snapshotMetadata = { + incomingRunId: run.friendlyId, + incomingSnapshotId: snapshot.friendlyId, + completedWaitpoints: completedWaitpoints.length, + }; + + // Ensure we have run details + if (!this.runFriendlyId || !this.currentSnapshotId) { + this.sendDebugLog( + "handleSnapshotChange: missing run or snapshot ID", + snapshotMetadata, + run.friendlyId + ); + return; + } + + // Ensure the run ID matches + if (run.friendlyId !== this.runFriendlyId) { + // Send debug log to both runs + this.sendDebugLog("handleSnapshotChange: mismatched run IDs", snapshotMetadata); + this.sendDebugLog( + "handleSnapshotChange: mismatched run IDs", + snapshotMetadata, + run.friendlyId + ); + return; + } + + this.sendDebugLog(`enqueued snapshot change: ${snapshot.executionStatus}`, snapshotMetadata); + + this.snapshotChangeQueue.push(runData); + await this.processSnapshotChangeQueue(); + } + + private snapshotChangeQueue: RunExecutionData[] = []; + private snapshotChangeQueueLock = false; + + private async processSnapshotChangeQueue() { + if (this.snapshotChangeQueueLock) { + return; + } + + this.snapshotChangeQueueLock = true; + while (this.snapshotChangeQueue.length > 0) { + const runData = this.snapshotChangeQueue.shift(); + + if (!runData) { + continue; + } + + const [error] = await tryCatch(this.processSnapshotChange(runData)); + + if (error) { + this.sendDebugLog("Failed to process snapshot change", { error: error.message }); + } + } + this.snapshotChangeQueueLock = false; + } + + private async processSnapshotChange(runData: RunExecutionData): Promise { + const { run, snapshot, completedWaitpoints } = runData; + + const snapshotMetadata = { + incomingSnapshotId: snapshot.friendlyId, + completedWaitpoints: completedWaitpoints.length, + }; + + // Check if the incoming snapshot is newer than the current one + if (!this.currentSnapshotId || snapshot.friendlyId < this.currentSnapshotId) { + this.sendDebugLog( + "handleSnapshotChange: received older snapshot, skipping", + snapshotMetadata + ); + return; + } + + if (snapshot.friendlyId === this.currentSnapshotId) { + this.sendDebugLog("handleSnapshotChange: snapshot not changed", snapshotMetadata); + return; + } + + this.sendDebugLog(`snapshot change: ${snapshot.executionStatus}`, snapshotMetadata); + + // Reset the snapshot poll interval so we don't do unnecessary work + this.snapshotPoller?.resetCurrentInterval(); + + // Update internal state + this.currentSnapshotId = snapshot.friendlyId; + + // Update services + this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId); + this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId); + + switch (snapshot.executionStatus) { + case "PENDING_CANCEL": { + const [error] = await tryCatch(this.cancel()); + + if (error) { + this.sendDebugLog("snapshot change: failed to cancel attempt", { + ...snapshotMetadata, + error: error.message, + }); + } + + this.abortExecution(); + return; + } + case "FINISHED": { + this.sendDebugLog("Run is finished", snapshotMetadata); + + // Pretend we've just suspended the run. This will kill the process without failing the run. + await this.taskRunProcess?.suspend(); + return; + } + case "QUEUED_EXECUTING": + case "EXECUTING_WITH_WAITPOINTS": { + this.sendDebugLog("Run is executing with waitpoints", snapshotMetadata); + + const [error] = await tryCatch(this.taskRunProcess?.cleanup(false)); + + if (error) { + this.sendDebugLog("Failed to cleanup task run process, carrying on", { + ...snapshotMetadata, + error: error.message, + }); + } + + if (snapshot.friendlyId !== this.currentSnapshotId) { + this.sendDebugLog("Snapshot changed after cleanup, abort", snapshotMetadata); + + this.abortExecution(); + return; + } + + await sleep(this.env.TRIGGER_PRE_SUSPEND_WAIT_MS); + + if (snapshot.friendlyId !== this.currentSnapshotId) { + this.sendDebugLog("Snapshot changed after suspend threshold, abort", snapshotMetadata); + + this.abortExecution(); + return; + } + + if (!this.runFriendlyId || !this.currentSnapshotId) { + this.sendDebugLog( + "handleSnapshotChange: Missing run ID or snapshot ID after suspension, abort", + snapshotMetadata + ); + + this.abortExecution(); + return; + } + + const suspendResult = await this.httpClient.suspendRun( + this.runFriendlyId, + this.currentSnapshotId + ); + + if (!suspendResult.success) { + this.sendDebugLog("Failed to suspend run, staying alive 🎶", { + ...snapshotMetadata, + error: suspendResult.error, + }); + + this.sendDebugLog("checkpoint: suspend request failed", { + ...snapshotMetadata, + error: suspendResult.error, + }); + + // This is fine, we'll wait for the next status change + return; + } + + if (!suspendResult.data.ok) { + this.sendDebugLog("checkpoint: failed to suspend run", { + snapshotId: this.currentSnapshotId, + error: suspendResult.data.error, + }); + + // This is fine, we'll wait for the next status change + return; + } + + this.sendDebugLog("Suspending, any day now 🚬", snapshotMetadata); + + // Wait for next status change + return; + } + case "SUSPENDED": { + this.sendDebugLog("Run was suspended, kill the process", snapshotMetadata); + + // This will kill the process and fail the execution with a SuspendedProcessError + await this.taskRunProcess?.suspend(); + + return; + } + case "PENDING_EXECUTING": { + this.sendDebugLog("Run is pending execution", snapshotMetadata); + + if (completedWaitpoints.length === 0) { + this.sendDebugLog("No waitpoints to complete, nothing to do", snapshotMetadata); + return; + } + + const [error] = await tryCatch(this.restore()); + + if (error) { + this.sendDebugLog("Failed to restore execution", { + ...snapshotMetadata, + error: error.message, + }); + + this.abortExecution(); + return; + } + + return; + } + case "EXECUTING": { + this.sendDebugLog("Run is now executing", snapshotMetadata); + + if (completedWaitpoints.length === 0) { + return; + } + + this.sendDebugLog("Processing completed waitpoints", snapshotMetadata); + + if (!this.taskRunProcess) { + this.sendDebugLog("No task run process, ignoring completed waitpoints", snapshotMetadata); + + this.abortExecution(); + return; + } + + for (const waitpoint of completedWaitpoints) { + this.taskRunProcess.waitpointCompleted(waitpoint); + } + + return; + } + case "RUN_CREATED": + case "QUEUED": { + this.sendDebugLog("Invalid status change", snapshotMetadata); + + this.abortExecution(); + return; + } + default: { + assertExhaustive(snapshot.executionStatus); + } + } + } + + private async startAttempt({ + isWarmStart, + }: { + isWarmStart?: boolean; + }): Promise { + if (!this.runFriendlyId || !this.currentSnapshotId) { + throw new Error("Cannot start attempt: missing run or snapshot ID"); + } + + this.sendDebugLog("Starting attempt"); + + const attemptStartedAt = Date.now(); + + // Check for abort before each major async operation + if (this.executionAbortController.signal.aborted) { + throw new ExecutionAbortError("Execution aborted before start"); + } + + const start = await this.httpClient.startRunAttempt( + this.runFriendlyId, + this.currentSnapshotId, + { isWarmStart } + ); + + if (this.executionAbortController.signal.aborted) { + throw new ExecutionAbortError("Execution aborted after start"); + } + + if (!start.success) { + throw new Error(`Start API call failed: ${start.error}`); + } + + // A snapshot was just created, so update the snapshot ID + this.currentSnapshotId = start.data.snapshot.friendlyId; + + const metrics = this.measureExecutionMetrics({ + attemptCreatedAt: attemptStartedAt, + dequeuedAt: this.dequeuedAt?.getTime(), + podScheduledAt: this.podScheduledAt?.getTime(), + }); + + this.sendDebugLog("Started attempt"); + + return { ...start.data, metrics }; + } + + /** + * Executes the run. This will return when the execution is complete and we should warm start. + * When this returns, the child process will have been cleaned up. + */ + public async execute(runOpts: RunExecutionRunOptions): Promise { + // Setup initial state + this.runFriendlyId = runOpts.runFriendlyId; + this.currentSnapshotId = runOpts.snapshotFriendlyId; + this.dequeuedAt = runOpts.dequeuedAt; + this.podScheduledAt = runOpts.podScheduledAt; + + // Create and start services + this.runHeartbeat = new RunExecutionHeartbeat({ + runFriendlyId: this.runFriendlyId, + snapshotFriendlyId: this.currentSnapshotId, + httpClient: this.httpClient, + logger: this.logger, + heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS, + }); + this.snapshotPoller = new RunExecutionSnapshotPoller({ + runFriendlyId: this.runFriendlyId, + snapshotFriendlyId: this.currentSnapshotId, + httpClient: this.httpClient, + logger: this.logger, + snapshotPollIntervalSeconds: this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS, + handleSnapshotChange: this.handleSnapshotChange.bind(this), + }); + + this.runHeartbeat.start(); + this.snapshotPoller.start(); + + const [startError, start] = await tryCatch( + this.startAttempt({ isWarmStart: runOpts.isWarmStart }) + ); + + if (startError) { + this.sendDebugLog("Failed to start attempt", { error: startError.message }); + + this.stopServices(); + return; + } + + const [executeError] = await tryCatch(this.executeRunWrapper(start)); + + if (executeError) { + this.sendDebugLog("Failed to execute run", { error: executeError.message }); + + this.stopServices(); + return; + } + + this.stopServices(); + } + + private async executeRunWrapper({ + run, + snapshot, + envVars, + execution, + metrics, + isWarmStart, + }: WorkloadRunAttemptStartResponseBody & { + metrics: TaskRunExecutionMetrics; + isWarmStart?: boolean; + }) { + this.currentTaskRunEnv = envVars; + + const [executeError] = await tryCatch( + this.executeRun({ + run, + snapshot, + envVars, + execution, + metrics, + isWarmStart, + }) + ); + + this.sendDebugLog("Run execution completed", { error: executeError?.message }); + + if (!executeError) { + this.stopServices(); + return; + } + + if (executeError instanceof SuspendedProcessError) { + this.sendDebugLog("Run was suspended", { + run: run.friendlyId, + snapshot: snapshot.friendlyId, + error: executeError.message, + }); + + return; + } + + if (executeError instanceof ExecutionAbortError) { + this.sendDebugLog("Run was interrupted", { + run: run.friendlyId, + snapshot: snapshot.friendlyId, + error: executeError.message, + }); + + return; + } + + this.sendDebugLog("Error while executing attempt", { + error: executeError.message, + runId: run.friendlyId, + snapshotId: snapshot.friendlyId, + }); + + const completion = { + id: execution.run.id, + ok: false, + retry: undefined, + error: TaskRunProcess.parseExecuteError(executeError), + } satisfies TaskRunFailedExecutionResult; + + const [completeError] = await tryCatch(this.complete({ completion })); + + if (completeError) { + this.sendDebugLog("Failed to complete run", { error: completeError.message }); + } + + this.stopServices(); + } + + private async executeRun({ + run, + snapshot, + envVars, + execution, + metrics, + isWarmStart, + }: WorkloadRunAttemptStartResponseBody & { + metrics: TaskRunExecutionMetrics; + isWarmStart?: boolean; + }) { + // To skip this step and eagerly create the task run process, run prepareForExecution first + if (!this.taskRunProcess || !this.isPreparedForNextRun) { + this.taskRunProcess = this.createTaskRunProcess({ envVars, isWarmStart }); + } + + this.sendDebugLog("executing task run process", { runId: execution.run.id }); + + // Set up an abort handler that will cleanup the task run process + this.executionAbortController.signal.addEventListener("abort", async () => { + this.sendDebugLog("Execution aborted during task run, cleaning up process", { + runId: execution.run.id, + }); + + await this.taskRunProcess?.cleanup(true); + }); + + const completion = await this.taskRunProcess.execute( + { + payload: { + execution, + traceContext: execution.run.traceContext ?? {}, + metrics, + }, + messageId: run.friendlyId, + env: envVars, + }, + isWarmStart + ); + + // If we get here, the task completed normally + this.sendDebugLog("Completed run attempt", { attemptSuccess: completion.ok }); + + // The execution has finished, so we can cleanup the task run process. Killing it should be safe. + const [error] = await tryCatch(this.taskRunProcess.cleanup(true)); + + if (error) { + this.sendDebugLog("Failed to cleanup task run process, submitting completion anyway", { + error: error.message, + }); + } + + const [completionError] = await tryCatch(this.complete({ completion })); + + if (completionError) { + this.sendDebugLog("Failed to complete run", { error: completionError.message }); + } + } + + /** + * Cancels the current execution. + */ + public async cancel(): Promise { + this.sendDebugLog("cancelling attempt", { runId: this.runFriendlyId }); + + await this.taskRunProcess?.cancel(); + } + + public exit() { + if (this.isPreparedForNextRun) { + this.taskRunProcess?.forceExit(); + } + } + + private async complete({ completion }: { completion: TaskRunExecutionResult }): Promise { + if (!this.runFriendlyId || !this.currentSnapshotId) { + throw new Error("Cannot complete run: missing run or snapshot ID"); + } + + const completionResult = await this.httpClient.completeRunAttempt( + this.runFriendlyId, + this.currentSnapshotId, + { completion } + ); + + if (!completionResult.success) { + throw new Error(`failed to submit completion: ${completionResult.error}`); + } + + await this.handleCompletionResult({ + completion, + result: completionResult.data.result, + }); + } + + private async handleCompletionResult({ + completion, + result, + }: { + completion: TaskRunExecutionResult; + result: CompleteRunAttemptResult; + }) { + this.sendDebugLog("Handling completion result", { + attemptSuccess: completion.ok, + attemptStatus: result.attemptStatus, + snapshotId: result.snapshot.friendlyId, + runId: result.run.friendlyId, + }); + + // Update our snapshot ID to match the completion result + // This ensures any subsequent API calls use the correct snapshot + this.currentSnapshotId = result.snapshot.friendlyId; + + const { attemptStatus } = result; + + if (attemptStatus === "RUN_FINISHED") { + this.sendDebugLog("Run finished"); + + return; + } + + if (attemptStatus === "RUN_PENDING_CANCEL") { + this.sendDebugLog("Run pending cancel"); + return; + } + + if (attemptStatus === "RETRY_QUEUED") { + this.sendDebugLog("Retry queued"); + + return; + } + + if (attemptStatus === "RETRY_IMMEDIATELY") { + if (completion.ok) { + throw new Error("Should retry but completion OK."); + } + + if (!completion.retry) { + throw new Error("Should retry but missing retry params."); + } + + await this.retryImmediately({ retryOpts: completion.retry }); + return; + } + + assertExhaustive(attemptStatus); + } + + private measureExecutionMetrics({ + attemptCreatedAt, + dequeuedAt, + podScheduledAt, + }: { + attemptCreatedAt: number; + dequeuedAt?: number; + podScheduledAt?: number; + }): TaskRunExecutionMetrics { + const metrics: TaskRunExecutionMetrics = [ + { + name: "start", + event: "create_attempt", + timestamp: attemptCreatedAt, + duration: Date.now() - attemptCreatedAt, + }, + ]; + + if (dequeuedAt) { + metrics.push({ + name: "start", + event: "dequeue", + timestamp: dequeuedAt, + duration: 0, + }); + } + + if (podScheduledAt) { + metrics.push({ + name: "start", + event: "pod_scheduled", + timestamp: podScheduledAt, + duration: 0, + }); + } + + return metrics; + } + + private async retryImmediately({ retryOpts }: { retryOpts: TaskRunExecutionRetry }) { + this.sendDebugLog("Retrying run immediately", { + timestamp: retryOpts.timestamp, + delay: retryOpts.delay, + }); + + const delay = retryOpts.timestamp - Date.now(); + + if (delay > 0) { + // Wait for retry delay to pass + await sleep(delay); + } + + // Start and execute next attempt + const [startError, start] = await tryCatch(this.startAttempt({ isWarmStart: true })); + + if (startError) { + this.sendDebugLog("Failed to start attempt for retry", { error: startError.message }); + + this.stopServices(); + return; + } + + const [executeError] = await tryCatch(this.executeRunWrapper({ ...start, isWarmStart: true })); + + if (executeError) { + this.sendDebugLog("Failed to execute run for retry", { error: executeError.message }); + + this.stopServices(); + return; + } + + this.stopServices(); + } + + /** + * Restores a suspended execution from PENDING_EXECUTING + */ + private async restore(): Promise { + this.sendDebugLog("Restoring execution"); + + if (!this.runFriendlyId || !this.currentSnapshotId) { + throw new Error("Cannot restore: missing run or snapshot ID"); + } + + // Short delay to give websocket time to reconnect + await sleep(100); + + // Process any env overrides + await this.processEnvOverrides(); + + const continuationResult = await this.httpClient.continueRunExecution( + this.runFriendlyId, + this.currentSnapshotId + ); + + if (!continuationResult.success) { + throw new Error(continuationResult.error); + } + + // Track restore count + this.restoreCount++; + } + + /** + * Processes env overrides from the metadata service. Generally called when we're resuming from a suspended state. + */ + private async processEnvOverrides() { + if (!this.env.TRIGGER_METADATA_URL) { + this.sendDebugLog("No metadata URL, skipping env overrides"); + return; + } + + const metadataClient = new MetadataClient(this.env.TRIGGER_METADATA_URL); + const overrides = await metadataClient.getEnvOverrides(); + + if (!overrides) { + this.sendDebugLog("No env overrides, skipping"); + return; + } + + this.sendDebugLog("Processing env overrides", overrides); + + // Override the env with the new values + this.env.override(overrides); + + // Update services with new values + if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { + this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000); + } + if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { + this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000); + } + if ( + overrides.TRIGGER_SUPERVISOR_API_PROTOCOL || + overrides.TRIGGER_SUPERVISOR_API_DOMAIN || + overrides.TRIGGER_SUPERVISOR_API_PORT + ) { + this.httpClient.updateApiUrl(this.env.TRIGGER_SUPERVISOR_API_URL); + } + if (overrides.TRIGGER_RUNNER_ID) { + this.httpClient.updateRunnerId(this.env.TRIGGER_RUNNER_ID); + } + } + + sendDebugLog( + message: string, + properties?: SendDebugLogOptions["properties"], + runIdOverride?: string + ) { + this.logger.sendDebugLog({ + runId: runIdOverride ?? this.runFriendlyId, + message: `[execution] ${message}`, + properties: { + ...properties, + runId: this.runFriendlyId, + snapshotId: this.currentSnapshotId, + executionId: this.id, + executionRestoreCount: this.restoreCount, + }, + }); + } + + // Ensure we can only set this once + private set runFriendlyId(id: string) { + if (this._runFriendlyId) { + throw new Error("Run ID already set"); + } + + this._runFriendlyId = id; + } + + public get runFriendlyId(): string | undefined { + return this._runFriendlyId; + } + + public get currentSnapshotFriendlyId(): string | undefined { + return this.currentSnapshotId; + } + + public get taskRunEnv(): Record | undefined { + return this.currentTaskRunEnv; + } + + public get metrics() { + return { + restoreCount: this.restoreCount, + }; + } + + get isAborted() { + return this.executionAbortController.signal.aborted; + } + + private abortExecution() { + if (this.isAborted) { + this.sendDebugLog("Execution already aborted"); + return; + } + + this.executionAbortController.abort(); + this.stopServices(); + } + + private stopServices() { + this.runHeartbeat?.stop(); + this.snapshotPoller?.stop(); + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/heartbeat.ts b/packages/cli-v3/src/entryPoints/managed/heartbeat.ts new file mode 100644 index 0000000000..3b3c820c91 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/heartbeat.ts @@ -0,0 +1,92 @@ +import { IntervalService } from "@trigger.dev/core/v3"; +import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker"; +import { RunLogger } from "./logger.js"; + +export type RunExecutionHeartbeatOptions = { + runFriendlyId: string; + snapshotFriendlyId: string; + httpClient: WorkloadHttpClient; + logger: RunLogger; + heartbeatIntervalSeconds: number; +}; + +export class RunExecutionHeartbeat { + private readonly runFriendlyId: string; + private snapshotFriendlyId: string; + + private readonly httpClient: WorkloadHttpClient; + private readonly logger: RunLogger; + private readonly heartbeatIntervalMs: number; + private readonly heartbeat: IntervalService; + + constructor(opts: RunExecutionHeartbeatOptions) { + this.runFriendlyId = opts.runFriendlyId; + this.snapshotFriendlyId = opts.snapshotFriendlyId; + this.httpClient = opts.httpClient; + this.logger = opts.logger; + this.heartbeatIntervalMs = opts.heartbeatIntervalSeconds * 1000; + + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "RunExecutionHeartbeat", + properties: { + runFriendlyId: this.runFriendlyId, + snapshotFriendlyId: this.snapshotFriendlyId, + heartbeatIntervalSeconds: opts.heartbeatIntervalSeconds, + }, + }); + + this.heartbeat = new IntervalService({ + onInterval: async () => { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "heartbeat: started", + }); + + const response = await this.httpClient.heartbeatRun( + this.runFriendlyId, + this.snapshotFriendlyId + ); + + if (!response.success) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "heartbeat: failed", + properties: { + error: response.error, + }, + }); + } + }, + intervalMs: this.heartbeatIntervalMs, + leadingEdge: false, + onError: async (error) => { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "Failed to send heartbeat", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + }, + }); + } + + resetCurrentInterval() { + this.heartbeat.resetCurrentInterval(); + } + + updateSnapshotId(snapshotFriendlyId: string) { + this.snapshotFriendlyId = snapshotFriendlyId; + } + + updateInterval(intervalMs: number) { + this.heartbeat.updateInterval(intervalMs); + } + + start() { + this.heartbeat.start(); + } + + stop() { + this.heartbeat.stop(); + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/logger.ts b/packages/cli-v3/src/entryPoints/managed/logger.ts new file mode 100644 index 0000000000..3a7a045476 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/logger.ts @@ -0,0 +1,52 @@ +import { + WorkloadDebugLogRequestBody, + WorkloadHttpClient, +} from "@trigger.dev/core/v3/runEngineWorker"; +import { RunnerEnv } from "./env.js"; + +export type SendDebugLogOptions = { + runId?: string; + message: string; + date?: Date; + properties?: WorkloadDebugLogRequestBody["properties"]; +}; + +export type RunLoggerOptions = { + httpClient: WorkloadHttpClient; + env: RunnerEnv; +}; + +export class RunLogger { + private readonly httpClient: WorkloadHttpClient; + private readonly env: RunnerEnv; + + constructor(private readonly opts: RunLoggerOptions) { + this.httpClient = opts.httpClient; + this.env = opts.env; + } + + sendDebugLog({ runId, message, date, properties }: SendDebugLogOptions) { + if (!runId) { + runId = this.env.TRIGGER_RUN_ID; + } + + if (!runId) { + return; + } + + const mergedProperties = { + ...properties, + runId, + runnerId: this.env.TRIGGER_RUNNER_ID, + workerName: this.env.TRIGGER_WORKER_INSTANCE_NAME, + }; + + console.log(message, mergedProperties); + + this.httpClient.sendDebugLog(runId, { + message, + time: date ?? new Date(), + properties: mergedProperties, + }); + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/overrides.ts b/packages/cli-v3/src/entryPoints/managed/overrides.ts new file mode 100644 index 0000000000..872b5ad0b3 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/overrides.ts @@ -0,0 +1,29 @@ +export type Metadata = { + TRIGGER_SUPERVISOR_API_PROTOCOL: string | undefined; + TRIGGER_SUPERVISOR_API_DOMAIN: string | undefined; + TRIGGER_SUPERVISOR_API_PORT: number | undefined; + TRIGGER_WORKER_INSTANCE_NAME: string | undefined; + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined; + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined; + TRIGGER_SUCCESS_EXIT_CODE: number | undefined; + TRIGGER_FAILURE_EXIT_CODE: number | undefined; + TRIGGER_RUNNER_ID: string | undefined; +}; + +export class MetadataClient { + private readonly url: URL; + + constructor(url: string) { + this.url = new URL(url); + } + + async getEnvOverrides(): Promise { + try { + const response = await fetch(new URL("/env", this.url)); + return response.json(); + } catch (error) { + console.error("Failed to fetch metadata", { error }); + return null; + } + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/poller.ts b/packages/cli-v3/src/entryPoints/managed/poller.ts new file mode 100644 index 0000000000..2decd401ee --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/poller.ts @@ -0,0 +1,121 @@ +import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker"; +import { RunLogger } from "./logger.js"; +import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3"; + +export type RunExecutionSnapshotPollerOptions = { + runFriendlyId: string; + snapshotFriendlyId: string; + httpClient: WorkloadHttpClient; + logger: RunLogger; + snapshotPollIntervalSeconds: number; + handleSnapshotChange: (execution: RunExecutionData) => Promise; +}; + +export class RunExecutionSnapshotPoller { + private runFriendlyId: string; + private snapshotFriendlyId: string; + + private readonly httpClient: WorkloadHttpClient; + private readonly logger: RunLogger; + private readonly snapshotPollIntervalMs: number; + private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise; + private readonly poller: IntervalService; + + constructor(opts: RunExecutionSnapshotPollerOptions) { + this.runFriendlyId = opts.runFriendlyId; + this.snapshotFriendlyId = opts.snapshotFriendlyId; + this.httpClient = opts.httpClient; + this.logger = opts.logger; + this.snapshotPollIntervalMs = opts.snapshotPollIntervalSeconds * 1000; + this.handleSnapshotChange = opts.handleSnapshotChange; + + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "RunExecutionSnapshotPoller", + properties: { + runFriendlyId: this.runFriendlyId, + snapshotFriendlyId: this.snapshotFriendlyId, + snapshotPollIntervalSeconds: opts.snapshotPollIntervalSeconds, + }, + }); + + this.poller = new IntervalService({ + onInterval: async () => { + if (!this.runFriendlyId) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "Skipping snapshot poll, no run ID", + }); + return; + } + + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "Polling for latest snapshot", + }); + + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `snapshot poll: started`, + properties: { + snapshotId: this.snapshotFriendlyId, + }, + }); + + const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); + + if (!response.success) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "Snapshot poll failed", + properties: { + error: response.error, + }, + }); + + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `snapshot poll: failed`, + properties: { + snapshotId: this.snapshotFriendlyId, + error: response.error, + }, + }); + + return; + } + + await this.handleSnapshotChange(response.data.execution); + }, + intervalMs: this.snapshotPollIntervalMs, + leadingEdge: false, + onError: async (error) => { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: "Failed to poll for snapshot", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + }, + }); + } + + resetCurrentInterval() { + this.poller.resetCurrentInterval(); + } + + updateSnapshotId(snapshotFriendlyId: string) { + this.snapshotFriendlyId = snapshotFriendlyId; + } + + updateInterval(intervalMs: number) { + this.poller.updateInterval(intervalMs); + } + + start() { + this.poller.start(); + } + + stop() { + this.poller.stop(); + } +} diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index abe7c93389..96f68f0f42 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -1,7 +1,7 @@ import { CompletedWaitpoint, ExecutorToWorkerMessageCatalog, - MachinePreset, + MachinePresetResources, ServerBackgroundWorker, TaskRunErrorCodes, TaskRunExecution, @@ -50,7 +50,7 @@ export type TaskRunProcessOptions = { workerManifest: WorkerManifest; serverWorker: ServerBackgroundWorker; env: Record; - machine: MachinePreset; + machineResources: MachinePresetResources; isWarmStart?: boolean; cwd?: string; }; @@ -125,7 +125,7 @@ export class TaskRunProcess { } initialize() { - const { env: $env, workerManifest, cwd, machine } = this.options; + const { env: $env, workerManifest, cwd, machineResources: machine } = this.options; const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, machine); diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 1fb82cc714..4a214a4536 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -2,7 +2,13 @@ export function assertExhaustive(x: never): never { throw new Error("Unexpected object: " + x); } -export async function tryCatch(promise: Promise): Promise<[null, T] | [E, null]> { +export async function tryCatch( + promise: Promise | undefined +): Promise<[null, T] | [E, null]> { + if (!promise) { + return [null, undefined as T]; + } + try { const data = await promise; return [null, data]; diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index 1f1f4d3076..8877393dca 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -67,6 +67,7 @@ export { } from "./utils/ioSerialization.js"; export * from "./utils/imageRef.js"; +export * from "./utils/interval.js"; export * from "./utils/heartbeat.js"; export * from "./config.js"; diff --git a/packages/core/src/v3/machines/index.ts b/packages/core/src/v3/machines/index.ts index 771f4345a4..e5dcb097dc 100644 --- a/packages/core/src/v3/machines/index.ts +++ b/packages/core/src/v3/machines/index.ts @@ -1,14 +1,17 @@ -import { MachinePreset } from "../schemas/common.js"; +import { MachinePresetResources } from "../schemas/common.js"; /** * Returns a value to be used for `--max-old-space-size`. It is in MiB. * Setting this correctly means V8 spends more times running Garbage Collection (GC). * It won't eliminate crashes but it will help avoid them. - * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {MachinePresetResources} machine - The machine preset configuration containing memory specifications * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) * @returns {number} The calculated max old space size in MiB */ -export function maxOldSpaceSizeForMachine(machine: MachinePreset, overhead: number = 0.2): number { +export function maxOldSpaceSizeForMachine( + machine: MachinePresetResources, + overhead: number = 0.2 +): number { return Math.round(machine.memory * 1_024 * (1 - overhead)); } @@ -16,24 +19,27 @@ export function maxOldSpaceSizeForMachine(machine: MachinePreset, overhead: numb * Returns a flag to be used for `--max-old-space-size`. It is in MiB. * Setting this correctly means V8 spends more times running Garbage Collection (GC). * It won't eliminate crashes but it will help avoid them. - * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {MachinePresetResources} machine - The machine preset configuration containing memory specifications * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) * @returns {string} The calculated max old space size flag */ -export function maxOldSpaceSizeFlag(machine: MachinePreset, overhead: number = 0.2): string { +export function maxOldSpaceSizeFlag( + machine: MachinePresetResources, + overhead: number = 0.2 +): string { return `--max-old-space-size=${maxOldSpaceSizeForMachine(machine, overhead)}`; } /** * Takes the existing NODE_OPTIONS value, removes any existing max-old-space-size flag, and adds a new one. * @param {string | undefined} existingOptions - The existing NODE_OPTIONS value - * @param {MachinePreset} machine - The machine preset configuration containing memory specifications + * @param {MachinePresetResources} machine - The machine preset configuration containing memory specifications * @param {number} [overhead=0.2] - The memory overhead factor (0.2 = 20% reserved for system operations) * @returns {string} The updated NODE_OPTIONS value with the new max-old-space-size flag */ export function nodeOptionsWithMaxOldSpaceSize( existingOptions: string | undefined, - machine: MachinePreset, + machine: MachinePresetResources, overhead: number = 0.2 ): string { let options = existingOptions ?? ""; diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index 8814c84c35..4f899e4f22 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -81,6 +81,7 @@ export class SupervisorHttpClient { ); } + /** @deprecated Not currently used */ async dequeueFromVersion(deploymentId: string, maxRunCount = 1, runnerId?: string) { return wrapZodFetch( WorkerApiDequeueResponseBody, diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 8dd90a3b98..747e1dae5e 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -8,7 +8,7 @@ import { VERSION } from "../../../version.js"; import { io, Socket } from "socket.io-client"; import { WorkerClientToServerEvents, WorkerServerToClientEvents } from "../types.js"; import { getDefaultWorkerHeaders } from "./util.js"; -import { HeartbeatService } from "../../utils/heartbeat.js"; +import { IntervalService } from "../../utils/interval.js"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { queueConsumerEnabled?: boolean; @@ -29,7 +29,7 @@ export class SupervisorSession extends EventEmitter { private readonly queueConsumerEnabled: boolean; private readonly queueConsumer: RunQueueConsumer; - private readonly heartbeatService: HeartbeatService; + private readonly heartbeat: IntervalService; private readonly heartbeatIntervalSeconds: number; constructor(private opts: SupervisorSessionOptions) { @@ -50,8 +50,8 @@ export class SupervisorSession extends EventEmitter { // TODO: This should be dynamic and set by (or at least overridden by) the platform this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30; - this.heartbeatService = new HeartbeatService({ - heartbeat: async () => { + this.heartbeat = new IntervalService({ + onInterval: async () => { console.debug("[SupervisorSession] Sending heartbeat"); const body = this.getHeartbeatBody(); @@ -182,7 +182,7 @@ export class SupervisorSession extends EventEmitter { if (this.queueConsumerEnabled) { console.log("[SupervisorSession] Queue consumer enabled"); this.queueConsumer.start(); - this.heartbeatService.start(); + this.heartbeat.start(); } else { console.warn("[SupervisorSession] Queue consumer disabled"); } @@ -196,7 +196,7 @@ export class SupervisorSession extends EventEmitter { } async stop() { - this.heartbeatService.stop(); + this.heartbeat.stop(); this.runNotificationsSocket?.disconnect(); } diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index 9d97896f09..9dde07d35d 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -165,6 +165,7 @@ export class WorkloadHttpClient { } } + /** @deprecated Not currently used */ async dequeue() { return wrapZodFetch( WorkloadDequeueFromVersionResponseBody, diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 030dd4dcee..a4d37409a2 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -123,6 +123,8 @@ export const MachinePreset = z.object({ export type MachinePreset = z.infer; +export type MachinePresetResources = Pick; + export const TaskRunBuiltInError = z.object({ type: z.literal("BUILT_IN_ERROR"), name: z.string(), diff --git a/packages/core/src/v3/utils/heartbeat.ts b/packages/core/src/v3/utils/heartbeat.ts index 0684bd73c5..c9bb0d97ed 100644 --- a/packages/core/src/v3/utils/heartbeat.ts +++ b/packages/core/src/v3/utils/heartbeat.ts @@ -5,6 +5,9 @@ type HeartbeatServiceOptions = { onError?: (error: unknown) => Promise; }; +/** + * @deprecated Use IntervalService instead + */ export class HeartbeatService { private _heartbeat: () => Promise; private _intervalMs: number; diff --git a/packages/core/src/v3/utils/interval.ts b/packages/core/src/v3/utils/interval.ts new file mode 100644 index 0000000000..59fd0a94cb --- /dev/null +++ b/packages/core/src/v3/utils/interval.ts @@ -0,0 +1,95 @@ +type IntervalServiceOptions = { + onInterval: () => Promise; + onError?: (error: unknown) => Promise; + intervalMs?: number; + leadingEdge?: boolean; +}; + +export class IntervalService { + private _onInterval: () => Promise; + private _onError?: (error: unknown) => Promise; + + private _intervalMs: number; + private _nextInterval: NodeJS.Timeout | undefined; + private _leadingEdge: boolean; + private _isEnabled: boolean; + + constructor(opts: IntervalServiceOptions) { + this._onInterval = opts.onInterval; + this._onError = opts.onError; + + this._intervalMs = opts.intervalMs ?? 45_000; + this._nextInterval = undefined; + this._leadingEdge = opts.leadingEdge ?? false; + this._isEnabled = false; + } + + start() { + if (this._isEnabled) { + return; + } + + this._isEnabled = true; + + if (this._leadingEdge) { + this.#doInterval(); + } else { + this.#scheduleNextInterval(); + } + } + + stop() { + if (!this._isEnabled) { + return; + } + + this._isEnabled = false; + this.#clearNextInterval(); + } + + resetCurrentInterval() { + if (!this._isEnabled) { + return; + } + + this.#clearNextInterval(); + this.#scheduleNextInterval(); + } + + updateInterval(intervalMs: number) { + this._intervalMs = intervalMs; + this.resetCurrentInterval(); + } + + #doInterval = async () => { + this.#clearNextInterval(); + + if (!this._isEnabled) { + return; + } + + try { + await this._onInterval(); + } catch (error) { + if (this._onError) { + try { + await this._onError(error); + } catch (error) { + console.error("Error during interval error handler", error); + } + } + } + + this.#scheduleNextInterval(); + }; + + #clearNextInterval() { + if (this._nextInterval) { + clearTimeout(this._nextInterval); + } + } + + #scheduleNextInterval() { + this._nextInterval = setTimeout(this.#doInterval, this._intervalMs); + } +}