From 2b3c7091f9e7682defdcad0939a4f326a7bc2055 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 08:21:51 +0000 Subject: [PATCH 01/16] handle warm start service failure on supervisor side --- apps/supervisor/src/index.ts | 48 +++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 7ea3f03e0a..8b44a8b3a7 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -214,33 +214,41 @@ class ManagedSupervisor { const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl); - const res = await fetch(warmStartUrlWithPath.href, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ dequeuedMessage }), - }); - - if (!res.ok) { - this.logger.error("[ManagedWorker] Warm start failed", { - runId: dequeuedMessage.run.id, + try { + const res = await fetch(warmStartUrlWithPath.href, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ dequeuedMessage }), }); - return false; - } - const data = await res.json(); - const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); + if (!res.ok) { + this.logger.error("[ManagedWorker] Warm start failed", { + runId: dequeuedMessage.run.id, + }); + return false; + } + + const data = await res.json(); + const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); + + if (!parsedData.success) { + this.logger.error("[ManagedWorker] Warm start response invalid", { + runId: dequeuedMessage.run.id, + data, + }); + return false; + } - if (!parsedData.success) { - this.logger.error("[ManagedWorker] Warm start response invalid", { + return parsedData.data.didWarmStart; + } catch (error) { + this.logger.error("[ManagedWorker] Warm start error", { runId: dequeuedMessage.run.id, - data, + error, }); return false; } - - return parsedData.data.didWarmStart; } async start() { From 22cb3a0f4c51f08bd731372123885018d78b56e5 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:23:17 +0000 Subject: [PATCH 02/16] export zodfetch wrapper --- packages/cli-v3/src/apiClient.ts | 51 +------------------ packages/core/src/v3/apiClient/core.ts | 49 ++++++++++++++++++ .../src/v3/runEngineWorker/supervisor/http.ts | 49 +----------------- .../src/v3/runEngineWorker/workload/http.ts | 49 +----------------- 4 files changed, 52 insertions(+), 146 deletions(-) diff --git a/packages/cli-v3/src/apiClient.ts b/packages/cli-v3/src/apiClient.ts index 07d46621e9..20d9c61081 100644 --- a/packages/cli-v3/src/apiClient.ts +++ b/packages/cli-v3/src/apiClient.ts @@ -31,9 +31,8 @@ import { DevDequeueRequestBody, DevDequeueResponseBody, PromoteDeploymentResponseBody, - ListRunResponse, } from "@trigger.dev/core/v3"; -import { zodfetch, zodfetchSSE, ApiError } from "@trigger.dev/core/v3/zodfetch"; +import { ApiResult, wrapZodFetch, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch"; import { logger } from "./utilities/logger.js"; import { WorkloadDebugLogRequestBody, @@ -41,7 +40,6 @@ import { WorkloadHeartbeatResponseBody, WorkloadRunAttemptCompleteRequestBody, WorkloadRunAttemptCompleteResponseBody, - WorkloadRunAttemptStartRequestBody, WorkloadRunAttemptStartResponseBody, WorkloadRunLatestSnapshotResponseBody, } from "@trigger.dev/core/v3/workers"; @@ -644,50 +642,3 @@ export class CliApiClient { ); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index 10d710e5d5..16b9947b38 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -692,3 +692,52 @@ export function zodfetchSSE { return new ZodFetchSSEResult(options); } + +export type ApiResult = + | { success: true; data: TSuccessResult } + | { + success: false; + error: string; + }; + +export async function wrapZodFetch( + schema: T, + url: string, + requestInit?: RequestInit, + options?: ZodFetchOptions> +): Promise>> { + try { + const response = await zodfetch(schema, url, requestInit, { + retry: { + minTimeoutInMs: 500, + maxTimeoutInMs: 5000, + maxAttempts: 5, + factor: 2, + randomize: false, + }, + ...options, + }); + + return { + success: true, + data: response, + }; + } catch (error) { + if (error instanceof ApiError) { + return { + success: false, + error: error.message, + }; + } else if (error instanceof Error) { + return { + success: false, + error: error.message, + }; + } else { + return { + success: false, + error: String(error), + }; + } + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index fa2249e47a..6a4bd49cdd 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -18,7 +18,7 @@ import { } from "./schemas.js"; import { SupervisorClientCommonOptions } from "./types.js"; import { getDefaultWorkerHeaders } from "./util.js"; -import { ApiError, zodfetch } from "../../zodfetch.js"; +import { wrapZodFetch } from "../../zodfetch.js"; import { createHeaders } from "../util.js"; import { WORKER_HEADERS } from "../consts.js"; @@ -236,50 +236,3 @@ export class SupervisorHttpClient { }); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index cc36e89682..6069ef34ff 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -14,7 +14,7 @@ import { } from "./schemas.js"; import { WorkloadClientCommonOptions } from "./types.js"; import { getDefaultWorkloadHeaders } from "./util.js"; -import { ApiError, zodfetch } from "../../zodfetch.js"; +import { wrapZodFetch } from "../../zodfetch.js"; type WorkloadHttpClientOptions = WorkloadClientCommonOptions; @@ -163,50 +163,3 @@ export class WorkloadHttpClient { ); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} From 41f03d2b7050641c1c0966a9c053caac4234126b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 12:24:40 +0000 Subject: [PATCH 03/16] add warm start client --- .../src/entryPoints/managed-run-controller.ts | 153 ++++++---------- packages/core/src/v3/schemas/index.ts | 1 + packages/core/src/v3/schemas/warmStart.ts | 8 + packages/core/src/v3/workers/index.ts | 1 + .../core/src/v3/workers/warmStartClient.ts | 167 ++++++++++++++++++ 5 files changed, 229 insertions(+), 101 deletions(-) create mode 100644 packages/core/src/v3/schemas/warmStart.ts create mode 100644 packages/core/src/v3/workers/warmStartClient.ts diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index a33e8382ff..a59c20ab30 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -6,7 +6,6 @@ import { randomUUID } from "crypto"; import { readJSONFile } from "../utilities/fileSystem.js"; import { CompleteRunAttemptResult, - DequeuedMessage, HeartbeatService, RunExecutionData, TaskRunExecutionResult, @@ -14,6 +13,7 @@ import { WorkerManifest, } from "@trigger.dev/core/v3"; import { + WarmStartClient, WORKLOAD_HEADERS, WorkloadClientToServerEvents, WorkloadHttpClient, @@ -699,41 +699,56 @@ class ManagedRunController { if (!env.TRIGGER_WARM_START_URL) { console.error("waitForNextRun: warm starts disabled, shutting down"); - process.exit(0); + this.exitProcess(0); } - const warmStartUrl = new URL("/warm-start", env.TRIGGER_WARM_START_URL); - - const res = await longPoll( - warmStartUrl.href, - { - method: "GET", - headers: { - "x-trigger-workload-controller-id": env.TRIGGER_WORKLOAD_CONTROLLER_ID, - "x-trigger-deployment-id": env.TRIGGER_DEPLOYMENT_ID, - "x-trigger-deployment-version": env.TRIGGER_DEPLOYMENT_VERSION, - "x-trigger-machine-cpu": env.TRIGGER_MACHINE_CPU, - "x-trigger-machine-memory": env.TRIGGER_MACHINE_MEMORY, - "x-trigger-worker-instance-name": env.TRIGGER_WORKER_INSTANCE_NAME, - }, - }, - // TODO: get these from the warm start service instead - { - timeoutMs: env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS, - totalDurationMs: env.TRIGGER_WARM_START_TOTAL_DURATION_MS, - } - ); + const warmStartClient = new WarmStartClient({ + apiUrl: new URL(env.TRIGGER_WARM_START_URL), + controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + machineCpu: env.TRIGGER_MACHINE_CPU, + machineMemory: env.TRIGGER_MACHINE_MEMORY, + }); - if (!res.ok) { - console.error("waitForNextRun: failed to poll for next run", { - error: res.error, - timeoutMs: env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS, - totalDurationMs: env.TRIGGER_WARM_START_TOTAL_DURATION_MS, + // Check the service is up and get additional warm start config + const connect = await warmStartClient.connect(); + + if (!connect.success) { + console.error("waitForNextRun: failed to connect to warm start service", { + warmStartUrl: env.TRIGGER_WARM_START_URL, + error: connect.error, }); - process.exit(0); + this.exitProcess(0); } - const nextRun = DequeuedMessage.parse(res.data); + const connectionTimeoutMs = + connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; + const totalWarmStartDurationMs = + connect.data.totalWarmStartDurationMs ?? env.TRIGGER_WARM_START_TOTAL_DURATION_MS; + + console.log("waitForNextRun: connected to warm start service", { + connectionTimeoutMs, + totalWarmStartDurationMs, + }); + + if (!connectionTimeoutMs || !totalWarmStartDurationMs) { + console.error("waitForNextRun: warm starts disabled after connect", { + connectionTimeoutMs, + totalWarmStartDurationMs, + }); + this.exitProcess(0); + } + + const nextRun = await warmStartClient.warmStart({ + workerInstanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + connectionTimeoutMs, + totalWarmStartDurationMs, + }); + + if (!nextRun) { + console.error("waitForNextRun: warm start failed, shutting down"); + this.exitProcess(0); + } console.log("waitForNextRun: got next run", { nextRun }); @@ -745,12 +760,17 @@ class ManagedRunController { return; } catch (error) { console.error("waitForNextRun: unexpected error", { error }); - process.exit(1); + this.exitProcess(1); } finally { this.waitForNextRunLock = false; } } + private exitProcess(code?: number): never { + logger.log("Exiting process", { code }); + process.exit(code); + } + createSocket() { const wsUrl = new URL("/workload", this.workerApiUrl); @@ -971,7 +991,7 @@ class ManagedRunController { // TODO: remove this after testing setTimeout(() => { console.error("[ManagedRunController] Exiting after 5 minutes"); - process.exit(1); + this.exitProcess(1); }, 60 * 5000); // Websocket notifications are only an optimisation so we don't need to wait for a successful connection @@ -1027,72 +1047,3 @@ async function loadWorkerManifest() { const manifest = await readJSONFile("./index.json"); return WorkerManifest.parse(manifest); } - -const longPoll = async ( - url: string, - requestInit: Omit, - { - timeoutMs, - totalDurationMs, - }: { - timeoutMs: number; - totalDurationMs: number; - } -): Promise< - | { - ok: true; - data: T; - } - | { - ok: false; - error: string; - } -> => { - logger.debug("Long polling", { url, requestInit, timeoutMs, totalDurationMs }); - - const endTime = Date.now() + totalDurationMs; - - while (Date.now() < endTime) { - try { - const controller = new AbortController(); - const signal = controller.signal; - - // TODO: Think about using a random timeout instead - const timeoutId = setTimeout(() => controller.abort(), timeoutMs); - - const response = await fetch(url, { ...requestInit, signal }); - - clearTimeout(timeoutId); - - if (response.ok) { - const data = await response.json(); - - return { - ok: true, - data, - }; - } else { - return { - ok: false, - error: `Server error: ${response.status}`, - }; - } - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - console.log("Long poll request timed out, retrying..."); - continue; - } else { - console.error("Error during fetch, retrying...", error); - - // TODO: exponential backoff - await sleep(1000); - continue; - } - } - } - - return { - ok: false, - error: "TotalDurationExceeded", - }; -}; diff --git a/packages/core/src/v3/schemas/index.ts b/packages/core/src/v3/schemas/index.ts index 9e3c468306..0f0c753122 100644 --- a/packages/core/src/v3/schemas/index.ts +++ b/packages/core/src/v3/schemas/index.ts @@ -13,3 +13,4 @@ export * from "./build.js"; export * from "./runEngine.js"; export * from "./webhooks.js"; export * from "./checkpoints.js"; +export * from "./warmStart.js"; diff --git a/packages/core/src/v3/schemas/warmStart.ts b/packages/core/src/v3/schemas/warmStart.ts new file mode 100644 index 0000000000..069f4134e1 --- /dev/null +++ b/packages/core/src/v3/schemas/warmStart.ts @@ -0,0 +1,8 @@ +import { z } from "zod"; + +export const WarmStartConnectResponse = z.object({ + connectionTimeoutMs: z.number().optional(), + totalWarmStartDurationMs: z.number().optional(), +}); + +export type WarmStartConnectResponse = z.infer; diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 2ac0d01b84..7b51da7879 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -19,3 +19,4 @@ export { StandardWaitUntilManager } from "../waitUntil/manager.js"; export { ManagedRuntimeManager } from "../runtime/managedRuntimeManager.js"; export * from "../runEngineWorker/index.js"; export { StandardRunTimelineMetricsManager } from "../runTimelineMetrics/runTimelineMetricsManager.js"; +export { WarmStartClient, type WarmStartClientOptions } from "../workers/warmStartClient.js"; diff --git a/packages/core/src/v3/workers/warmStartClient.ts b/packages/core/src/v3/workers/warmStartClient.ts new file mode 100644 index 0000000000..b57020111b --- /dev/null +++ b/packages/core/src/v3/workers/warmStartClient.ts @@ -0,0 +1,167 @@ +import { DequeuedMessage } from "../schemas/runEngine.js"; +import { SimpleStructuredLogger } from "../utils/structuredLogger.js"; +import { WarmStartConnectResponse } from "../schemas/warmStart.js"; +import { ApiResult, wrapZodFetch } from "../zodfetch.js"; +import { ExponentialBackoff } from "../apps/backoff.js"; + +export type WarmStartClientOptions = { + apiUrl: URL; + controllerId: string; + deploymentId: string; + machineCpu: string; + machineMemory: string; +}; + +export class WarmStartClient { + private readonly logger = new SimpleStructuredLogger("warm-start-client"); + private readonly apiUrl: URL; + private backoff = new ExponentialBackoff("FullJitter"); + + private get connectUrl() { + return new URL("/connect", this.apiUrl); + } + + private get warmStartUrl() { + return new URL("/warm-start", this.apiUrl); + } + + constructor(private opts: WarmStartClientOptions) { + this.apiUrl = opts.apiUrl; + } + + async connect(): Promise> { + return wrapZodFetch( + WarmStartConnectResponse, + this.connectUrl.href, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }, + { + retry: { + minTimeoutInMs: 200, + maxTimeoutInMs: 2000, + maxAttempts: 3, + factor: 2, + randomize: false, + }, + } + ); + } + + async warmStart({ + workerInstanceName, + connectionTimeoutMs, + totalWarmStartDurationMs, + }: { + workerInstanceName: string; + connectionTimeoutMs: number; + totalWarmStartDurationMs: number; + }): Promise { + const res = await this.longPoll( + this.warmStartUrl.href, + { + method: "GET", + headers: { + "x-trigger-workload-controller-id": this.opts.controllerId, + "x-trigger-deployment-id": this.opts.deploymentId, + "x-trigger-machine-cpu": this.opts.machineCpu, + "x-trigger-machine-memory": this.opts.machineMemory, + "x-trigger-worker-instance-name": workerInstanceName, + }, + }, + { + timeoutMs: connectionTimeoutMs, + totalDurationMs: totalWarmStartDurationMs, + } + ); + + if (!res.ok) { + this.logger.error("warmStart: failed", { + error: res.error, + connectionTimeoutMs, + totalWarmStartDurationMs, + }); + return null; + } + + const nextRun = DequeuedMessage.parse(res.data); + + this.logger.debug("warmStart: got next run", { nextRun }); + + return nextRun; + } + + private async longPoll( + url: string, + requestInit: Omit, + { + timeoutMs, + totalDurationMs, + }: { + timeoutMs: number; + totalDurationMs: number; + } + ): Promise< + | { + ok: true; + data: T; + } + | { + ok: false; + error: string; + } + > { + this.logger.debug("Long polling", { url, requestInit, timeoutMs, totalDurationMs }); + + const endTime = Date.now() + totalDurationMs; + + let retries = 0; + + while (Date.now() < endTime) { + try { + const controller = new AbortController(); + const signal = controller.signal; + + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + const response = await fetch(url, { ...requestInit, signal }); + + clearTimeout(timeoutId); + + if (response.ok) { + const data = await response.json(); + + return { + ok: true, + data, + }; + } else { + return { + ok: false, + error: `Server error: ${response.status}`, + }; + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + this.logger.log("Long poll request timed out, retrying..."); + continue; + } else { + this.logger.error("Error during fetch, retrying...", { error }); + + // Wait with exponential backoff + await this.backoff.wait(retries++); + + continue; + } + } + } + + return { + ok: false, + error: "TotalDurationExceeded", + }; + } +} From 7d691e551e9ba1ae186283614a667fd12c53cca0 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:00:26 +0000 Subject: [PATCH 04/16] rename to keepalive --- .../src/entryPoints/managed-run-controller.ts | 15 ++++++++------- packages/core/src/v3/schemas/warmStart.ts | 2 +- packages/core/src/v3/serverOnly/httpServer.ts | 2 +- packages/core/src/v3/workers/warmStartClient.ts | 8 ++++---- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index a59c20ab30..e7c2817fea 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -46,9 +46,10 @@ const Env = z.object({ OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS: z.coerce.number().default(30_000), - TRIGGER_WARM_START_TOTAL_DURATION_MS: z.coerce.number().default(300_000), + TRIGGER_WARM_START_KEEPALIVE_MS: z.coerce.number().default(300_000), TRIGGER_MACHINE_CPU: z.string().default("0"), TRIGGER_MACHINE_MEMORY: z.string().default("0"), + // FIXME: This could change between restores TRIGGER_WORKER_INSTANCE_NAME: z.string(), TRIGGER_RUNNER_ID: z.string(), }); @@ -723,18 +724,18 @@ class ManagedRunController { const connectionTimeoutMs = connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; - const totalWarmStartDurationMs = - connect.data.totalWarmStartDurationMs ?? env.TRIGGER_WARM_START_TOTAL_DURATION_MS; + const keepaliveMs = + connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; console.log("waitForNextRun: connected to warm start service", { connectionTimeoutMs, - totalWarmStartDurationMs, + keepaliveMs, }); - if (!connectionTimeoutMs || !totalWarmStartDurationMs) { + if (!connectionTimeoutMs || !keepaliveMs) { console.error("waitForNextRun: warm starts disabled after connect", { connectionTimeoutMs, - totalWarmStartDurationMs, + keepaliveMs, }); this.exitProcess(0); } @@ -742,7 +743,7 @@ class ManagedRunController { const nextRun = await warmStartClient.warmStart({ workerInstanceName: env.TRIGGER_WORKER_INSTANCE_NAME, connectionTimeoutMs, - totalWarmStartDurationMs, + keepaliveMs, }); if (!nextRun) { diff --git a/packages/core/src/v3/schemas/warmStart.ts b/packages/core/src/v3/schemas/warmStart.ts index 069f4134e1..1d0e43268e 100644 --- a/packages/core/src/v3/schemas/warmStart.ts +++ b/packages/core/src/v3/schemas/warmStart.ts @@ -2,7 +2,7 @@ import { z } from "zod"; export const WarmStartConnectResponse = z.object({ connectionTimeoutMs: z.number().optional(), - totalWarmStartDurationMs: z.number().optional(), + keepaliveMs: z.number().optional(), }); export type WarmStartConnectResponse = z.infer; diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index 3c38f3968a..30c06db5fd 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -16,7 +16,7 @@ type RouteHandler< req: IncomingMessage; res: ServerResponse; reply: HttpReply; -}) => Promise; +}) => Promise; interface RouteDefinition< TParams extends z.ZodFirstPartySchemaTypes = z.ZodUnknown, diff --git a/packages/core/src/v3/workers/warmStartClient.ts b/packages/core/src/v3/workers/warmStartClient.ts index b57020111b..2270335899 100644 --- a/packages/core/src/v3/workers/warmStartClient.ts +++ b/packages/core/src/v3/workers/warmStartClient.ts @@ -54,11 +54,11 @@ export class WarmStartClient { async warmStart({ workerInstanceName, connectionTimeoutMs, - totalWarmStartDurationMs, + keepaliveMs, }: { workerInstanceName: string; connectionTimeoutMs: number; - totalWarmStartDurationMs: number; + keepaliveMs: number; }): Promise { const res = await this.longPoll( this.warmStartUrl.href, @@ -74,7 +74,7 @@ export class WarmStartClient { }, { timeoutMs: connectionTimeoutMs, - totalDurationMs: totalWarmStartDurationMs, + totalDurationMs: keepaliveMs, } ); @@ -82,7 +82,7 @@ export class WarmStartClient { this.logger.error("warmStart: failed", { error: res.error, connectionTimeoutMs, - totalWarmStartDurationMs, + keepaliveMs, }); return null; } From 3f7e758e9a114c505ba94b554622884e741f03cc Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:08:08 +0000 Subject: [PATCH 05/16] add missing warm start header --- packages/cli-v3/src/entryPoints/managed-run-controller.ts | 1 + packages/core/src/v3/workers/warmStartClient.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index e7c2817fea..c7c10cb3f9 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -707,6 +707,7 @@ class ManagedRunController { apiUrl: new URL(env.TRIGGER_WARM_START_URL), controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, machineCpu: env.TRIGGER_MACHINE_CPU, machineMemory: env.TRIGGER_MACHINE_MEMORY, }); diff --git a/packages/core/src/v3/workers/warmStartClient.ts b/packages/core/src/v3/workers/warmStartClient.ts index 2270335899..4a70ec8971 100644 --- a/packages/core/src/v3/workers/warmStartClient.ts +++ b/packages/core/src/v3/workers/warmStartClient.ts @@ -8,6 +8,7 @@ export type WarmStartClientOptions = { apiUrl: URL; controllerId: string; deploymentId: string; + deploymentVersion: string; machineCpu: string; machineMemory: string; }; @@ -67,6 +68,7 @@ export class WarmStartClient { headers: { "x-trigger-workload-controller-id": this.opts.controllerId, "x-trigger-deployment-id": this.opts.deploymentId, + "x-trigger-deployment-version": this.opts.deploymentVersion, "x-trigger-machine-cpu": this.opts.machineCpu, "x-trigger-machine-memory": this.opts.machineMemory, "x-trigger-worker-instance-name": workerInstanceName, From 20b2166b008937fdac9a8d28fef12bc711ab30fd Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:13:23 +0000 Subject: [PATCH 06/16] make heartbeat and snapshot poll interval configurable --- .../cli-v3/src/entryPoints/managed-run-controller.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index c7c10cb3f9..15017d7dd2 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -52,6 +52,8 @@ const Env = z.object({ // FIXME: This could change between restores TRIGGER_WORKER_INSTANCE_NAME: z.string(), TRIGGER_RUNNER_ID: z.string(), + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), }); const env = Env.parse(stdEnv); @@ -60,7 +62,6 @@ logger.loggerLevel = "debug"; type ManagedRunControllerOptions = { workerManifest: WorkerManifest; - heartbeatIntervalSeconds?: number; }; type Run = { @@ -247,9 +248,8 @@ class ManagedRunController { logger.debug("[ManagedRunController] Creating controller", { env }); this.workerManifest = opts.workerManifest; - // TODO: This should be dynamic and set by (or at least overridden by) the managed worker / platform - this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30; - this.snapshotPollIntervalSeconds = 5; + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; @@ -725,8 +725,7 @@ class ManagedRunController { const connectionTimeoutMs = connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; - const keepaliveMs = - connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; + const keepaliveMs = connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; console.log("waitForNextRun: connected to warm start service", { connectionTimeoutMs, From 8bcd42c4ec3de59226338115bb6a185e7184cffa Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:19:33 +0000 Subject: [PATCH 07/16] create warm start client in constructor --- .../src/entryPoints/managed-run-controller.ts | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 15017d7dd2..c466cd3e7e 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -79,6 +79,7 @@ class ManagedRunController { private workerManifest: WorkerManifest; private readonly httpClient: WorkloadHttpClient; + private readonly warmStartClient: WarmStartClient | undefined; private socket: Socket; @@ -259,6 +260,17 @@ class ManagedRunController { runnerId: env.TRIGGER_RUNNER_ID, }); + if (env.TRIGGER_WARM_START_URL) { + this.warmStartClient = new WarmStartClient({ + apiUrl: new URL(env.TRIGGER_WARM_START_URL), + controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, + machineCpu: env.TRIGGER_MACHINE_CPU, + machineMemory: env.TRIGGER_MACHINE_MEMORY, + }); + } + this.snapshotPoller = new HeartbeatService({ heartbeat: async () => { if (!this.runFriendlyId) { @@ -698,22 +710,13 @@ class ManagedRunController { // Kill the run process await this.taskRunProcess?.kill("SIGKILL"); - if (!env.TRIGGER_WARM_START_URL) { + if (!this.warmStartClient) { console.error("waitForNextRun: warm starts disabled, shutting down"); this.exitProcess(0); } - const warmStartClient = new WarmStartClient({ - apiUrl: new URL(env.TRIGGER_WARM_START_URL), - controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, - machineCpu: env.TRIGGER_MACHINE_CPU, - machineMemory: env.TRIGGER_MACHINE_MEMORY, - }); - // Check the service is up and get additional warm start config - const connect = await warmStartClient.connect(); + const connect = await this.warmStartClient.connect(); if (!connect.success) { console.error("waitForNextRun: failed to connect to warm start service", { @@ -740,7 +743,7 @@ class ManagedRunController { this.exitProcess(0); } - const nextRun = await warmStartClient.warmStart({ + const nextRun = await this.warmStartClient.warmStart({ workerInstanceName: env.TRIGGER_WORKER_INSTANCE_NAME, connectionTimeoutMs, keepaliveMs, From a2484d70a6751c2af5523d7c32919091b3181993 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:42:44 +0000 Subject: [PATCH 08/16] add warm start run debug log --- .../cli-v3/src/entryPoints/managed-run-controller.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index c466cd3e7e..941d238a87 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -701,6 +701,7 @@ class ManagedRunController { } this.waitForNextRunLock = true; + const previousRunId = this.runFriendlyId; try { logger.debug("waitForNextRun: waiting for next run"); @@ -735,6 +736,17 @@ class ManagedRunController { keepaliveMs, }); + if (previousRunId) { + this.httpClient.sendDebugLog(previousRunId, { + time: new Date(), + message: "warm start: received config", + properties: { + connectionTimeoutMs, + keepaliveMs, + }, + }); + } + if (!connectionTimeoutMs || !keepaliveMs) { console.error("waitForNextRun: warm starts disabled after connect", { connectionTimeoutMs, From cd7a5bd9f0d4214fe0b4f9fcf77d616e83ded808 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 13 Mar 2025 17:43:21 +0000 Subject: [PATCH 09/16] re-enable checkpoints and improve error messages --- apps/supervisor/src/index.ts | 4 +++ apps/supervisor/src/workloadServer/index.ts | 21 ++++++------ .../src/entryPoints/managed-run-controller.ts | 34 +++++++++++++++---- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 8b44a8b3a7..a0f95865a5 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -87,6 +87,10 @@ class ManagedSupervisor { }); if (env.TRIGGER_CHECKPOINT_URL) { + this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", { + checkpointUrl: env.TRIGGER_CHECKPOINT_URL, + }); + this.checkpointClient = new CheckpointClient({ apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL), workerClient: this.workerSession.httpClient, diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 2981b07390..42ccde3531 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -200,17 +200,11 @@ export class WorkloadServer extends EventEmitter { handler: async ({ reply, params, req }) => { console.debug("Suspend request", { params, headers: req.headers }); - const runnerId = this.runnerIdFromRequest(req); - - if (!runnerId) { - console.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, - }); + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -218,12 +212,17 @@ export class WorkloadServer extends EventEmitter { return; } - if (!this.checkpointClient) { - console.error("Checkpoint client unavailable - suspending impossible", { params }); + const runnerId = this.runnerIdFromRequest(req); + + if (!runnerId) { + console.error("Invalid headers for suspend request", { + ...params, + headers: req.headers, + }); reply.json( { ok: false, - error: "Suspends are not enabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 941d238a87..0d2675b20d 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -485,13 +485,6 @@ class ManagedRunController { return; } - const disableSuspend = true; - - if (disableSuspend) { - console.log("Suspend disabled, will carry on waiting"); - return; - } - const suspendResult = await this.httpClient.suspendRun( this.runFriendlyId, this.snapshotFriendlyId @@ -501,6 +494,33 @@ class ManagedRunController { console.error("Failed to suspend run, staying alive 🎶", { error: suspendResult.error, }); + + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "checkpoint: suspend request failed", + properties: { + snapshotId: snapshot.friendlyId, + error: suspendResult.error, + }, + }); + + return; + } + + if (!suspendResult.data.ok) { + console.error("Failed to suspend run, staying alive 🎶🎶", { + suspendResult: suspendResult.data, + }); + + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "checkpoint: failed to suspend run", + properties: { + snapshotId: snapshot.friendlyId, + error: suspendResult.data.error, + }, + }); + return; } From d5fc929958371a5ecf14ade8d77aafb73ed50862 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 10:13:46 +0000 Subject: [PATCH 10/16] reduce run pod container name cardinality --- apps/supervisor/src/workloadManager/kubernetes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index b4e4a7e19a..3dbb84aefd 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -56,7 +56,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { terminationGracePeriodSeconds: 60 * 60, containers: [ { - name: runnerId, + name: "run-controller", image: opts.image, ports: [ { From 4c22b2899d3a92faf4401216a7d4cf150ce9ca63 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 10:33:47 +0000 Subject: [PATCH 11/16] move runner id generator into core --- apps/supervisor/src/util.ts | 24 ------------------- packages/core/src/v3/isomorphic/friendlyId.ts | 22 +++++++++++++++++ 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/apps/supervisor/src/util.ts b/apps/supervisor/src/util.ts index 0d465d4699..6ec9cd57d1 100644 --- a/apps/supervisor/src/util.ts +++ b/apps/supervisor/src/util.ts @@ -1,30 +1,6 @@ -import { customAlphabet } from "nanoid"; - export function getDockerHostDomain() { const isMacOs = process.platform === "darwin"; const isWindows = process.platform === "win32"; return isMacOs || isWindows ? "host.docker.internal" : "localhost"; } - -export class IdGenerator { - private alphabet: string; - private length: number; - private prefix: string; - - constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) { - this.alphabet = alphabet; - this.length = length; - this.prefix = prefix; - } - - generate(): string { - return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`; - } -} - -export const RunnerId = new IdGenerator({ - alphabet: "123456789abcdefghijkmnopqrstuvwxyz", - length: 20, - prefix: "runner_", -}); diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index cf0a7a1708..42b8e0dde8 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -90,3 +90,25 @@ export const RunId = new IdUtil("run"); export const SnapshotId = new IdUtil("snapshot"); export const WaitpointId = new IdUtil("waitpoint"); export const BatchId = new IdUtil("batch"); + +export class IdGenerator { + private alphabet: string; + private length: number; + private prefix: string; + + constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) { + this.alphabet = alphabet; + this.length = length; + this.prefix = prefix; + } + + generate(): string { + return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`; + } +} + +export const RunnerId = new IdGenerator({ + alphabet: "123456789abcdefghijkmnopqrstuvwxyz", + length: 20, + prefix: "runner_", +}); From 68dc7804d644caa076124230ba1f91ac8be05419 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 10:37:52 +0000 Subject: [PATCH 12/16] fix runner id import --- apps/supervisor/src/workloadManager/kubernetes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 3dbb84aefd..88f6b488bc 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -4,7 +4,7 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { RunnerId } from "../util.js"; +import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3"; import { env } from "../env.js"; import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js"; From 07b06dc880f8c4eb97be06d01b4bedfd554f02ee Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 16:17:52 +0000 Subject: [PATCH 13/16] log when no checkpoint client and we try to restore --- apps/supervisor/src/index.ts | 7 ++++++- apps/supervisor/src/workloadManager/kubernetes.ts | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index a0f95865a5..94088e885a 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -130,8 +130,13 @@ class ManagedSupervisor { if (message.checkpoint) { this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id }); + if (!this.checkpointClient) { + this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id }); + return; + } + try { - const didRestore = await this.checkpointClient?.restoreRun({ + const didRestore = await this.checkpointClient.restoreRun({ runFriendlyId: message.run.friendlyId, snapshotFriendlyId: message.snapshot.friendlyId, checkpoint: message.checkpoint, diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 88f6b488bc..39b8227f16 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -48,7 +48,6 @@ export class KubernetesWorkloadManager implements WorkloadManager { app: "task-run", "app.kubernetes.io/part-of": "trigger-worker", "app.kubernetes.io/component": "create", - run: opts.runId, }, }, spec: { From ae8b4b3ef2b9337f529a10c1568d34c5ff924fe6 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 16:39:21 +0000 Subject: [PATCH 14/16] move run controller constructor to the top --- .../src/entryPoints/managed-run-controller.ts | 208 +++++++++--------- 1 file changed, 104 insertions(+), 104 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 0d2675b20d..9d3c6fcf0c 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -101,6 +101,110 @@ class ManagedRunController { phase: "IDLE" | "WARM_START"; } = { phase: "IDLE" }; + constructor(opts: ManagedRunControllerOptions) { + logger.debug("[ManagedRunController] Creating controller", { env }); + + this.workerManifest = opts.workerManifest; + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + + this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; + + this.httpClient = new WorkloadHttpClient({ + workerApiUrl: this.workerApiUrl, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + runnerId: env.TRIGGER_RUNNER_ID, + }); + + if (env.TRIGGER_WARM_START_URL) { + this.warmStartClient = new WarmStartClient({ + apiUrl: new URL(env.TRIGGER_WARM_START_URL), + controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, + machineCpu: env.TRIGGER_MACHINE_CPU, + machineMemory: env.TRIGGER_MACHINE_MEMORY, + }); + } + + this.snapshotPoller = new HeartbeatService({ + heartbeat: async () => { + if (!this.runFriendlyId) { + logger.debug("[ManagedRunController] Skipping snapshot poll, no run ID"); + return; + } + + console.debug("[ManagedRunController] Polling for latest snapshot"); + + this.httpClient.sendDebugLog(this.runFriendlyId, { + time: new Date(), + message: `snapshot poll: started`, + properties: { + snapshotId: this.snapshotFriendlyId, + }, + }); + + const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); + + if (!response.success) { + console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); + + this.httpClient.sendDebugLog(this.runFriendlyId, { + time: new Date(), + message: `snapshot poll: failed`, + properties: { + snapshotId: this.snapshotFriendlyId, + error: response.error, + }, + }); + + return; + } + + await this.handleSnapshotChange(response.data.execution); + }, + intervalMs: this.snapshotPollIntervalSeconds * 1000, + leadingEdge: false, + onError: async (error) => { + console.error("[ManagedRunController] Failed to poll for snapshot", { error }); + }, + }); + + this.runHeartbeat = new HeartbeatService({ + heartbeat: async () => { + if (!this.runFriendlyId || !this.snapshotFriendlyId) { + logger.debug("[ManagedRunController] Skipping heartbeat, no run ID or snapshot ID"); + return; + } + + console.debug("[ManagedRunController] Sending heartbeat"); + + const response = await this.httpClient.heartbeatRun( + this.runFriendlyId, + this.snapshotFriendlyId, + { + cpu: 0, + memory: 0, + } + ); + + if (!response.success) { + console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); + } + }, + intervalMs: this.heartbeatIntervalSeconds * 1000, + leadingEdge: false, + onError: async (error) => { + console.error("[ManagedRunController] Failed to send heartbeat", { error }); + }, + }); + + process.on("SIGTERM", async () => { + logger.debug("[ManagedRunController] Received SIGTERM, stopping worker"); + await this.stop(); + }); + } + private enterRunPhase(run: Run, snapshot: Snapshot) { this.onExitRunPhase(run); this.state = { phase: "RUN", run, snapshot }; @@ -245,110 +349,6 @@ class ManagedRunController { return this.state.snapshot.friendlyId; } - constructor(opts: ManagedRunControllerOptions) { - logger.debug("[ManagedRunController] Creating controller", { env }); - - this.workerManifest = opts.workerManifest; - this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; - this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; - - this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; - - this.httpClient = new WorkloadHttpClient({ - workerApiUrl: this.workerApiUrl, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - runnerId: env.TRIGGER_RUNNER_ID, - }); - - if (env.TRIGGER_WARM_START_URL) { - this.warmStartClient = new WarmStartClient({ - apiUrl: new URL(env.TRIGGER_WARM_START_URL), - controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, - machineCpu: env.TRIGGER_MACHINE_CPU, - machineMemory: env.TRIGGER_MACHINE_MEMORY, - }); - } - - this.snapshotPoller = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId) { - logger.debug("[ManagedRunController] Skipping snapshot poll, no run ID"); - return; - } - - console.debug("[ManagedRunController] Polling for latest snapshot"); - - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), - message: `snapshot poll: started`, - properties: { - snapshotId: this.snapshotFriendlyId, - }, - }); - - const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!response.success) { - console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); - - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), - message: `snapshot poll: failed`, - properties: { - snapshotId: this.snapshotFriendlyId, - error: response.error, - }, - }); - - return; - } - - await this.handleSnapshotChange(response.data.execution); - }, - intervalMs: this.snapshotPollIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - console.error("[ManagedRunController] Failed to poll for snapshot", { error }); - }, - }); - - this.runHeartbeat = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - logger.debug("[ManagedRunController] Skipping heartbeat, no run ID or snapshot ID"); - return; - } - - console.debug("[ManagedRunController] Sending heartbeat"); - - const response = await this.httpClient.heartbeatRun( - this.runFriendlyId, - this.snapshotFriendlyId, - { - cpu: 0, - memory: 0, - } - ); - - if (!response.success) { - console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); - } - }, - intervalMs: this.heartbeatIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - console.error("[ManagedRunController] Failed to send heartbeat", { error }); - }, - }); - - process.on("SIGTERM", async () => { - logger.debug("[ManagedRunController] Received SIGTERM, stopping worker"); - await this.stop(); - }); - } - private handleSnapshotChangeLock = false; private async handleSnapshotChange({ From 78b850828c08c1123d1ef4371f5a9c626a5e239a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 17:49:50 +0000 Subject: [PATCH 15/16] fix import --- apps/supervisor/src/workloadManager/docker.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 950ad2a343..704e564726 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -1,4 +1,5 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import { type WorkloadManager, type WorkloadManagerCreateOptions, @@ -6,7 +7,7 @@ import { } from "./types.js"; import { x } from "tinyexec"; import { env } from "../env.js"; -import { getDockerHostDomain, RunnerId } from "../util.js"; +import { getDockerHostDomain } from "../util.js"; export class DockerWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); From 0491cdbcdcd18f53f590d687957a24ce5ac50a81 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 14 Mar 2025 17:50:57 +0000 Subject: [PATCH 16/16] support env overrides after restore --- .../src/entryPoints/managed-run-controller.ts | 103 ++++++++++++++++-- .../src/v3/runEngineWorker/workload/http.ts | 6 +- packages/core/src/v3/utils/heartbeat.ts | 5 + 3 files changed, 104 insertions(+), 10 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 9d3c6fcf0c..5eab5839f7 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -36,9 +36,6 @@ const Env = z.object({ NODE_EXTRA_CA_CERTS: z.string().optional(), // Set at runtime - TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), - TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), - TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), TRIGGER_ENV_ID: z.string(), TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts @@ -49,9 +46,14 @@ const Env = z.object({ TRIGGER_WARM_START_KEEPALIVE_MS: z.coerce.number().default(300_000), TRIGGER_MACHINE_CPU: z.string().default("0"), TRIGGER_MACHINE_MEMORY: z.string().default("0"), - // FIXME: This could change between restores - TRIGGER_WORKER_INSTANCE_NAME: z.string(), TRIGGER_RUNNER_ID: z.string(), + TRIGGER_METADATA_URL: z.string().optional(), + + // May be overridden + TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), + TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), + TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), + TRIGGER_WORKER_INSTANCE_NAME: z.string(), TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), }); @@ -73,6 +75,33 @@ type Snapshot = { friendlyId: string; }; +type Metadata = { + TRIGGER_SUPERVISOR_API_PROTOCOL: string | undefined; + TRIGGER_SUPERVISOR_API_DOMAIN: string | undefined; + TRIGGER_SUPERVISOR_API_PORT: number | undefined; + TRIGGER_WORKER_INSTANCE_NAME: string | undefined; + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined; + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined; +}; + +class MetadataClient { + private readonly url: URL; + + constructor(url: string) { + this.url = new URL(url); + } + + async getEnvOverrides(): Promise { + try { + const response = await fetch(new URL("/env", this.url)); + return response.json(); + } catch (error) { + console.error("Failed to fetch metadata", { error }); + return null; + } + } +} + class ManagedRunController { private taskRunProcess?: TaskRunProcess; @@ -80,16 +109,18 @@ class ManagedRunController { private readonly httpClient: WorkloadHttpClient; private readonly warmStartClient: WarmStartClient | undefined; + private readonly metadataClient?: MetadataClient; private socket: Socket; private readonly runHeartbeat: HeartbeatService; - private readonly heartbeatIntervalSeconds: number; + private heartbeatIntervalSeconds: number; private readonly snapshotPoller: HeartbeatService; - private readonly snapshotPollIntervalSeconds: number; + private snapshotPollIntervalSeconds: number; - private readonly workerApiUrl: string; + private workerApiUrl: string; + private workerInstanceName: string; private state: | { @@ -105,10 +136,16 @@ class ManagedRunController { logger.debug("[ManagedRunController] Creating controller", { env }); this.workerManifest = opts.workerManifest; + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + if (env.TRIGGER_METADATA_URL) { + this.metadataClient = new MetadataClient(env.TRIGGER_METADATA_URL); + } + this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; + this.workerInstanceName = env.TRIGGER_WORKER_INSTANCE_NAME; this.httpClient = new WorkloadHttpClient({ workerApiUrl: this.workerApiUrl, @@ -549,6 +586,9 @@ class ManagedRunController { // Short delay to give websocket time to reconnect await sleep(100); + // Env may have changed after restore + await this.processEnvOverrides(); + // We need to let the platform know we're ready to continue const continuationResult = await this.httpClient.continueRunExecution( run.friendlyId, @@ -611,6 +651,51 @@ class ManagedRunController { } } + private async processEnvOverrides() { + if (!this.metadataClient) { + logger.log("No metadata client, skipping env overrides"); + return; + } + + const overrides = await this.metadataClient.getEnvOverrides(); + + if (!overrides) { + logger.log("No env overrides, skipping"); + return; + } + + logger.log("Processing env overrides", { env: overrides }); + + if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { + this.heartbeatIntervalSeconds = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.runHeartbeat.updateInterval(this.heartbeatIntervalSeconds * 1000); + } + + if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { + this.snapshotPollIntervalSeconds = overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + this.snapshotPoller.updateInterval(this.snapshotPollIntervalSeconds * 1000); + } + + if (overrides.TRIGGER_WORKER_INSTANCE_NAME) { + this.workerInstanceName = overrides.TRIGGER_WORKER_INSTANCE_NAME; + } + + if ( + overrides.TRIGGER_SUPERVISOR_API_PROTOCOL || + overrides.TRIGGER_SUPERVISOR_API_DOMAIN || + overrides.TRIGGER_SUPERVISOR_API_PORT + ) { + const protocol = + overrides.TRIGGER_SUPERVISOR_API_PROTOCOL ?? env.TRIGGER_SUPERVISOR_API_PROTOCOL; + const domain = overrides.TRIGGER_SUPERVISOR_API_DOMAIN ?? env.TRIGGER_SUPERVISOR_API_DOMAIN; + const port = overrides.TRIGGER_SUPERVISOR_API_PORT ?? env.TRIGGER_SUPERVISOR_API_PORT; + + this.workerApiUrl = `${protocol}://${domain}:${port}`; + + this.httpClient.updateApiUrl(this.workerApiUrl); + } + } + private async startAndExecuteRunAttempt({ runFriendlyId, snapshotFriendlyId, @@ -776,7 +861,7 @@ class ManagedRunController { } const nextRun = await this.warmStartClient.warmStart({ - workerInstanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + workerInstanceName: this.workerInstanceName, connectionTimeoutMs, keepaliveMs, }); diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index 6069ef34ff..1505e55f05 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -19,7 +19,7 @@ import { wrapZodFetch } from "../../zodfetch.js"; type WorkloadHttpClientOptions = WorkloadClientCommonOptions; export class WorkloadHttpClient { - private readonly apiUrl: string; + private apiUrl: string; private readonly deploymentId: string; private readonly defaultHeaders: Record; @@ -37,6 +37,10 @@ export class WorkloadHttpClient { } } + updateApiUrl(apiUrl: string) { + this.apiUrl = apiUrl.replace(/\/$/, ""); + } + async heartbeatRun(runId: string, snapshotId: string, body: WorkloadHeartbeatRequestBody) { return wrapZodFetch( WorkloadHeartbeatResponseBody, diff --git a/packages/core/src/v3/utils/heartbeat.ts b/packages/core/src/v3/utils/heartbeat.ts index 3b47c6a54a..0684bd73c5 100644 --- a/packages/core/src/v3/utils/heartbeat.ts +++ b/packages/core/src/v3/utils/heartbeat.ts @@ -54,6 +54,11 @@ export class HeartbeatService { this.#scheduleNextHeartbeat(); } + updateInterval(intervalMs: number) { + this._intervalMs = intervalMs; + this.resetCurrentInterval(); + } + #doHeartbeat = async () => { this.#clearNextHeartbeat();