diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 7ea3f03e0a..94088e885a 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -87,6 +87,10 @@ class ManagedSupervisor { }); if (env.TRIGGER_CHECKPOINT_URL) { + this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", { + checkpointUrl: env.TRIGGER_CHECKPOINT_URL, + }); + this.checkpointClient = new CheckpointClient({ apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL), workerClient: this.workerSession.httpClient, @@ -126,8 +130,13 @@ class ManagedSupervisor { if (message.checkpoint) { this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id }); + if (!this.checkpointClient) { + this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id }); + return; + } + try { - const didRestore = await this.checkpointClient?.restoreRun({ + const didRestore = await this.checkpointClient.restoreRun({ runFriendlyId: message.run.friendlyId, snapshotFriendlyId: message.snapshot.friendlyId, checkpoint: message.checkpoint, @@ -214,33 +223,41 @@ class ManagedSupervisor { const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl); - const res = await fetch(warmStartUrlWithPath.href, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ dequeuedMessage }), - }); - - if (!res.ok) { - this.logger.error("[ManagedWorker] Warm start failed", { - runId: dequeuedMessage.run.id, + try { + const res = await fetch(warmStartUrlWithPath.href, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ dequeuedMessage }), }); - return false; - } - const data = await res.json(); - const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); + if (!res.ok) { + this.logger.error("[ManagedWorker] Warm start failed", { + runId: dequeuedMessage.run.id, + }); + return false; + } + + const data = await res.json(); + const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); - if (!parsedData.success) { - this.logger.error("[ManagedWorker] Warm start response invalid", { + if (!parsedData.success) { + this.logger.error("[ManagedWorker] Warm start response invalid", { + runId: dequeuedMessage.run.id, + data, + }); + return false; + } + + return parsedData.data.didWarmStart; + } catch (error) { + this.logger.error("[ManagedWorker] Warm start error", { runId: dequeuedMessage.run.id, - data, + error, }); return false; } - - return parsedData.data.didWarmStart; } async start() { diff --git a/apps/supervisor/src/util.ts b/apps/supervisor/src/util.ts index 0d465d4699..6ec9cd57d1 100644 --- a/apps/supervisor/src/util.ts +++ b/apps/supervisor/src/util.ts @@ -1,30 +1,6 @@ -import { customAlphabet } from "nanoid"; - export function getDockerHostDomain() { const isMacOs = process.platform === "darwin"; const isWindows = process.platform === "win32"; return isMacOs || isWindows ? "host.docker.internal" : "localhost"; } - -export class IdGenerator { - private alphabet: string; - private length: number; - private prefix: string; - - constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) { - this.alphabet = alphabet; - this.length = length; - this.prefix = prefix; - } - - generate(): string { - return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`; - } -} - -export const RunnerId = new IdGenerator({ - alphabet: "123456789abcdefghijkmnopqrstuvwxyz", - length: 20, - prefix: "runner_", -}); diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 950ad2a343..704e564726 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -1,4 +1,5 @@ import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import { type WorkloadManager, type WorkloadManagerCreateOptions, @@ -6,7 +7,7 @@ import { } from "./types.js"; import { x } from "tinyexec"; import { env } from "../env.js"; -import { getDockerHostDomain, RunnerId } from "../util.js"; +import { getDockerHostDomain } from "../util.js"; export class DockerWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index b4e4a7e19a..39b8227f16 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -4,7 +4,7 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { RunnerId } from "../util.js"; +import { RunnerId } from "@trigger.dev/core/v3/isomorphic"; import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3"; import { env } from "../env.js"; import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js"; @@ -48,7 +48,6 @@ export class KubernetesWorkloadManager implements WorkloadManager { app: "task-run", "app.kubernetes.io/part-of": "trigger-worker", "app.kubernetes.io/component": "create", - run: opts.runId, }, }, spec: { @@ -56,7 +55,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { terminationGracePeriodSeconds: 60 * 60, containers: [ { - name: runnerId, + name: "run-controller", image: opts.image, ports: [ { diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 2981b07390..42ccde3531 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -200,17 +200,11 @@ export class WorkloadServer extends EventEmitter { handler: async ({ reply, params, req }) => { console.debug("Suspend request", { params, headers: req.headers }); - const runnerId = this.runnerIdFromRequest(req); - - if (!runnerId) { - console.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, - }); + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -218,12 +212,17 @@ export class WorkloadServer extends EventEmitter { return; } - if (!this.checkpointClient) { - console.error("Checkpoint client unavailable - suspending impossible", { params }); + const runnerId = this.runnerIdFromRequest(req); + + if (!runnerId) { + console.error("Invalid headers for suspend request", { + ...params, + headers: req.headers, + }); reply.json( { ok: false, - error: "Suspends are not enabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 diff --git a/packages/cli-v3/src/apiClient.ts b/packages/cli-v3/src/apiClient.ts index 07d46621e9..20d9c61081 100644 --- a/packages/cli-v3/src/apiClient.ts +++ b/packages/cli-v3/src/apiClient.ts @@ -31,9 +31,8 @@ import { DevDequeueRequestBody, DevDequeueResponseBody, PromoteDeploymentResponseBody, - ListRunResponse, } from "@trigger.dev/core/v3"; -import { zodfetch, zodfetchSSE, ApiError } from "@trigger.dev/core/v3/zodfetch"; +import { ApiResult, wrapZodFetch, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch"; import { logger } from "./utilities/logger.js"; import { WorkloadDebugLogRequestBody, @@ -41,7 +40,6 @@ import { WorkloadHeartbeatResponseBody, WorkloadRunAttemptCompleteRequestBody, WorkloadRunAttemptCompleteResponseBody, - WorkloadRunAttemptStartRequestBody, WorkloadRunAttemptStartResponseBody, WorkloadRunLatestSnapshotResponseBody, } from "@trigger.dev/core/v3/workers"; @@ -644,50 +642,3 @@ export class CliApiClient { ); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index a33e8382ff..5eab5839f7 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -6,7 +6,6 @@ import { randomUUID } from "crypto"; import { readJSONFile } from "../utilities/fileSystem.js"; import { CompleteRunAttemptResult, - DequeuedMessage, HeartbeatService, RunExecutionData, TaskRunExecutionResult, @@ -14,6 +13,7 @@ import { WorkerManifest, } from "@trigger.dev/core/v3"; import { + WarmStartClient, WORKLOAD_HEADERS, WorkloadClientToServerEvents, WorkloadHttpClient, @@ -36,9 +36,6 @@ const Env = z.object({ NODE_EXTRA_CA_CERTS: z.string().optional(), // Set at runtime - TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), - TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), - TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), TRIGGER_ENV_ID: z.string(), TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts @@ -46,11 +43,19 @@ const Env = z.object({ OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS: z.coerce.number().default(30_000), - TRIGGER_WARM_START_TOTAL_DURATION_MS: z.coerce.number().default(300_000), + TRIGGER_WARM_START_KEEPALIVE_MS: z.coerce.number().default(300_000), TRIGGER_MACHINE_CPU: z.string().default("0"), TRIGGER_MACHINE_MEMORY: z.string().default("0"), - TRIGGER_WORKER_INSTANCE_NAME: z.string(), TRIGGER_RUNNER_ID: z.string(), + TRIGGER_METADATA_URL: z.string().optional(), + + // May be overridden + TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), + TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), + TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), + TRIGGER_WORKER_INSTANCE_NAME: z.string(), + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5), }); const env = Env.parse(stdEnv); @@ -59,7 +64,6 @@ logger.loggerLevel = "debug"; type ManagedRunControllerOptions = { workerManifest: WorkerManifest; - heartbeatIntervalSeconds?: number; }; type Run = { @@ -71,22 +75,52 @@ type Snapshot = { friendlyId: string; }; +type Metadata = { + TRIGGER_SUPERVISOR_API_PROTOCOL: string | undefined; + TRIGGER_SUPERVISOR_API_DOMAIN: string | undefined; + TRIGGER_SUPERVISOR_API_PORT: number | undefined; + TRIGGER_WORKER_INSTANCE_NAME: string | undefined; + TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined; + TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined; +}; + +class MetadataClient { + private readonly url: URL; + + constructor(url: string) { + this.url = new URL(url); + } + + async getEnvOverrides(): Promise { + try { + const response = await fetch(new URL("/env", this.url)); + return response.json(); + } catch (error) { + console.error("Failed to fetch metadata", { error }); + return null; + } + } +} + class ManagedRunController { private taskRunProcess?: TaskRunProcess; private workerManifest: WorkerManifest; private readonly httpClient: WorkloadHttpClient; + private readonly warmStartClient: WarmStartClient | undefined; + private readonly metadataClient?: MetadataClient; private socket: Socket; private readonly runHeartbeat: HeartbeatService; - private readonly heartbeatIntervalSeconds: number; + private heartbeatIntervalSeconds: number; private readonly snapshotPoller: HeartbeatService; - private readonly snapshotPollIntervalSeconds: number; + private snapshotPollIntervalSeconds: number; - private readonly workerApiUrl: string; + private workerApiUrl: string; + private workerInstanceName: string; private state: | { @@ -98,6 +132,116 @@ class ManagedRunController { phase: "IDLE" | "WARM_START"; } = { phase: "IDLE" }; + constructor(opts: ManagedRunControllerOptions) { + logger.debug("[ManagedRunController] Creating controller", { env }); + + this.workerManifest = opts.workerManifest; + + this.heartbeatIntervalSeconds = env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.snapshotPollIntervalSeconds = env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + + if (env.TRIGGER_METADATA_URL) { + this.metadataClient = new MetadataClient(env.TRIGGER_METADATA_URL); + } + + this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; + this.workerInstanceName = env.TRIGGER_WORKER_INSTANCE_NAME; + + this.httpClient = new WorkloadHttpClient({ + workerApiUrl: this.workerApiUrl, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + runnerId: env.TRIGGER_RUNNER_ID, + }); + + if (env.TRIGGER_WARM_START_URL) { + this.warmStartClient = new WarmStartClient({ + apiUrl: new URL(env.TRIGGER_WARM_START_URL), + controllerId: env.TRIGGER_WORKLOAD_CONTROLLER_ID, + deploymentId: env.TRIGGER_DEPLOYMENT_ID, + deploymentVersion: env.TRIGGER_DEPLOYMENT_VERSION, + machineCpu: env.TRIGGER_MACHINE_CPU, + machineMemory: env.TRIGGER_MACHINE_MEMORY, + }); + } + + this.snapshotPoller = new HeartbeatService({ + heartbeat: async () => { + if (!this.runFriendlyId) { + logger.debug("[ManagedRunController] Skipping snapshot poll, no run ID"); + return; + } + + console.debug("[ManagedRunController] Polling for latest snapshot"); + + this.httpClient.sendDebugLog(this.runFriendlyId, { + time: new Date(), + message: `snapshot poll: started`, + properties: { + snapshotId: this.snapshotFriendlyId, + }, + }); + + const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); + + if (!response.success) { + console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); + + this.httpClient.sendDebugLog(this.runFriendlyId, { + time: new Date(), + message: `snapshot poll: failed`, + properties: { + snapshotId: this.snapshotFriendlyId, + error: response.error, + }, + }); + + return; + } + + await this.handleSnapshotChange(response.data.execution); + }, + intervalMs: this.snapshotPollIntervalSeconds * 1000, + leadingEdge: false, + onError: async (error) => { + console.error("[ManagedRunController] Failed to poll for snapshot", { error }); + }, + }); + + this.runHeartbeat = new HeartbeatService({ + heartbeat: async () => { + if (!this.runFriendlyId || !this.snapshotFriendlyId) { + logger.debug("[ManagedRunController] Skipping heartbeat, no run ID or snapshot ID"); + return; + } + + console.debug("[ManagedRunController] Sending heartbeat"); + + const response = await this.httpClient.heartbeatRun( + this.runFriendlyId, + this.snapshotFriendlyId, + { + cpu: 0, + memory: 0, + } + ); + + if (!response.success) { + console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); + } + }, + intervalMs: this.heartbeatIntervalSeconds * 1000, + leadingEdge: false, + onError: async (error) => { + console.error("[ManagedRunController] Failed to send heartbeat", { error }); + }, + }); + + process.on("SIGTERM", async () => { + logger.debug("[ManagedRunController] Received SIGTERM, stopping worker"); + await this.stop(); + }); + } + private enterRunPhase(run: Run, snapshot: Snapshot) { this.onExitRunPhase(run); this.state = { phase: "RUN", run, snapshot }; @@ -242,100 +386,6 @@ class ManagedRunController { return this.state.snapshot.friendlyId; } - constructor(opts: ManagedRunControllerOptions) { - logger.debug("[ManagedRunController] Creating controller", { env }); - - this.workerManifest = opts.workerManifest; - // TODO: This should be dynamic and set by (or at least overridden by) the managed worker / platform - this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30; - this.snapshotPollIntervalSeconds = 5; - - this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`; - - this.httpClient = new WorkloadHttpClient({ - workerApiUrl: this.workerApiUrl, - deploymentId: env.TRIGGER_DEPLOYMENT_ID, - runnerId: env.TRIGGER_RUNNER_ID, - }); - - this.snapshotPoller = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId) { - logger.debug("[ManagedRunController] Skipping snapshot poll, no run ID"); - return; - } - - console.debug("[ManagedRunController] Polling for latest snapshot"); - - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), - message: `snapshot poll: started`, - properties: { - snapshotId: this.snapshotFriendlyId, - }, - }); - - const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!response.success) { - console.error("[ManagedRunController] Snapshot poll failed", { error: response.error }); - - this.httpClient.sendDebugLog(this.runFriendlyId, { - time: new Date(), - message: `snapshot poll: failed`, - properties: { - snapshotId: this.snapshotFriendlyId, - error: response.error, - }, - }); - - return; - } - - await this.handleSnapshotChange(response.data.execution); - }, - intervalMs: this.snapshotPollIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - console.error("[ManagedRunController] Failed to poll for snapshot", { error }); - }, - }); - - this.runHeartbeat = new HeartbeatService({ - heartbeat: async () => { - if (!this.runFriendlyId || !this.snapshotFriendlyId) { - logger.debug("[ManagedRunController] Skipping heartbeat, no run ID or snapshot ID"); - return; - } - - console.debug("[ManagedRunController] Sending heartbeat"); - - const response = await this.httpClient.heartbeatRun( - this.runFriendlyId, - this.snapshotFriendlyId, - { - cpu: 0, - memory: 0, - } - ); - - if (!response.success) { - console.error("[ManagedRunController] Heartbeat failed", { error: response.error }); - } - }, - intervalMs: this.heartbeatIntervalSeconds * 1000, - leadingEdge: false, - onError: async (error) => { - console.error("[ManagedRunController] Failed to send heartbeat", { error }); - }, - }); - - process.on("SIGTERM", async () => { - logger.debug("[ManagedRunController] Received SIGTERM, stopping worker"); - await this.stop(); - }); - } - private handleSnapshotChangeLock = false; private async handleSnapshotChange({ @@ -472,13 +522,6 @@ class ManagedRunController { return; } - const disableSuspend = true; - - if (disableSuspend) { - console.log("Suspend disabled, will carry on waiting"); - return; - } - const suspendResult = await this.httpClient.suspendRun( this.runFriendlyId, this.snapshotFriendlyId @@ -488,6 +531,33 @@ class ManagedRunController { console.error("Failed to suspend run, staying alive 🎶", { error: suspendResult.error, }); + + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "checkpoint: suspend request failed", + properties: { + snapshotId: snapshot.friendlyId, + error: suspendResult.error, + }, + }); + + return; + } + + if (!suspendResult.data.ok) { + console.error("Failed to suspend run, staying alive 🎶🎶", { + suspendResult: suspendResult.data, + }); + + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "checkpoint: failed to suspend run", + properties: { + snapshotId: snapshot.friendlyId, + error: suspendResult.data.error, + }, + }); + return; } @@ -516,6 +586,9 @@ class ManagedRunController { // Short delay to give websocket time to reconnect await sleep(100); + // Env may have changed after restore + await this.processEnvOverrides(); + // We need to let the platform know we're ready to continue const continuationResult = await this.httpClient.continueRunExecution( run.friendlyId, @@ -578,6 +651,51 @@ class ManagedRunController { } } + private async processEnvOverrides() { + if (!this.metadataClient) { + logger.log("No metadata client, skipping env overrides"); + return; + } + + const overrides = await this.metadataClient.getEnvOverrides(); + + if (!overrides) { + logger.log("No env overrides, skipping"); + return; + } + + logger.log("Processing env overrides", { env: overrides }); + + if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) { + this.heartbeatIntervalSeconds = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS; + this.runHeartbeat.updateInterval(this.heartbeatIntervalSeconds * 1000); + } + + if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) { + this.snapshotPollIntervalSeconds = overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS; + this.snapshotPoller.updateInterval(this.snapshotPollIntervalSeconds * 1000); + } + + if (overrides.TRIGGER_WORKER_INSTANCE_NAME) { + this.workerInstanceName = overrides.TRIGGER_WORKER_INSTANCE_NAME; + } + + if ( + overrides.TRIGGER_SUPERVISOR_API_PROTOCOL || + overrides.TRIGGER_SUPERVISOR_API_DOMAIN || + overrides.TRIGGER_SUPERVISOR_API_PORT + ) { + const protocol = + overrides.TRIGGER_SUPERVISOR_API_PROTOCOL ?? env.TRIGGER_SUPERVISOR_API_PROTOCOL; + const domain = overrides.TRIGGER_SUPERVISOR_API_DOMAIN ?? env.TRIGGER_SUPERVISOR_API_DOMAIN; + const port = overrides.TRIGGER_SUPERVISOR_API_PORT ?? env.TRIGGER_SUPERVISOR_API_PORT; + + this.workerApiUrl = `${protocol}://${domain}:${port}`; + + this.httpClient.updateApiUrl(this.workerApiUrl); + } + } + private async startAndExecuteRunAttempt({ runFriendlyId, snapshotFriendlyId, @@ -688,6 +806,7 @@ class ManagedRunController { } this.waitForNextRunLock = true; + const previousRunId = this.runFriendlyId; try { logger.debug("waitForNextRun: waiting for next run"); @@ -697,43 +816,60 @@ class ManagedRunController { // Kill the run process await this.taskRunProcess?.kill("SIGKILL"); - if (!env.TRIGGER_WARM_START_URL) { + if (!this.warmStartClient) { console.error("waitForNextRun: warm starts disabled, shutting down"); - process.exit(0); + this.exitProcess(0); + } + + // Check the service is up and get additional warm start config + const connect = await this.warmStartClient.connect(); + + if (!connect.success) { + console.error("waitForNextRun: failed to connect to warm start service", { + warmStartUrl: env.TRIGGER_WARM_START_URL, + error: connect.error, + }); + this.exitProcess(0); } - const warmStartUrl = new URL("/warm-start", env.TRIGGER_WARM_START_URL); - - const res = await longPoll( - warmStartUrl.href, - { - method: "GET", - headers: { - "x-trigger-workload-controller-id": env.TRIGGER_WORKLOAD_CONTROLLER_ID, - "x-trigger-deployment-id": env.TRIGGER_DEPLOYMENT_ID, - "x-trigger-deployment-version": env.TRIGGER_DEPLOYMENT_VERSION, - "x-trigger-machine-cpu": env.TRIGGER_MACHINE_CPU, - "x-trigger-machine-memory": env.TRIGGER_MACHINE_MEMORY, - "x-trigger-worker-instance-name": env.TRIGGER_WORKER_INSTANCE_NAME, + const connectionTimeoutMs = + connect.data.connectionTimeoutMs ?? env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS; + const keepaliveMs = connect.data.keepaliveMs ?? env.TRIGGER_WARM_START_KEEPALIVE_MS; + + console.log("waitForNextRun: connected to warm start service", { + connectionTimeoutMs, + keepaliveMs, + }); + + if (previousRunId) { + this.httpClient.sendDebugLog(previousRunId, { + time: new Date(), + message: "warm start: received config", + properties: { + connectionTimeoutMs, + keepaliveMs, }, - }, - // TODO: get these from the warm start service instead - { - timeoutMs: env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS, - totalDurationMs: env.TRIGGER_WARM_START_TOTAL_DURATION_MS, - } - ); + }); + } - if (!res.ok) { - console.error("waitForNextRun: failed to poll for next run", { - error: res.error, - timeoutMs: env.TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS, - totalDurationMs: env.TRIGGER_WARM_START_TOTAL_DURATION_MS, + if (!connectionTimeoutMs || !keepaliveMs) { + console.error("waitForNextRun: warm starts disabled after connect", { + connectionTimeoutMs, + keepaliveMs, }); - process.exit(0); + this.exitProcess(0); } - const nextRun = DequeuedMessage.parse(res.data); + const nextRun = await this.warmStartClient.warmStart({ + workerInstanceName: this.workerInstanceName, + connectionTimeoutMs, + keepaliveMs, + }); + + if (!nextRun) { + console.error("waitForNextRun: warm start failed, shutting down"); + this.exitProcess(0); + } console.log("waitForNextRun: got next run", { nextRun }); @@ -745,12 +881,17 @@ class ManagedRunController { return; } catch (error) { console.error("waitForNextRun: unexpected error", { error }); - process.exit(1); + this.exitProcess(1); } finally { this.waitForNextRunLock = false; } } + private exitProcess(code?: number): never { + logger.log("Exiting process", { code }); + process.exit(code); + } + createSocket() { const wsUrl = new URL("/workload", this.workerApiUrl); @@ -971,7 +1112,7 @@ class ManagedRunController { // TODO: remove this after testing setTimeout(() => { console.error("[ManagedRunController] Exiting after 5 minutes"); - process.exit(1); + this.exitProcess(1); }, 60 * 5000); // Websocket notifications are only an optimisation so we don't need to wait for a successful connection @@ -1027,72 +1168,3 @@ async function loadWorkerManifest() { const manifest = await readJSONFile("./index.json"); return WorkerManifest.parse(manifest); } - -const longPoll = async ( - url: string, - requestInit: Omit, - { - timeoutMs, - totalDurationMs, - }: { - timeoutMs: number; - totalDurationMs: number; - } -): Promise< - | { - ok: true; - data: T; - } - | { - ok: false; - error: string; - } -> => { - logger.debug("Long polling", { url, requestInit, timeoutMs, totalDurationMs }); - - const endTime = Date.now() + totalDurationMs; - - while (Date.now() < endTime) { - try { - const controller = new AbortController(); - const signal = controller.signal; - - // TODO: Think about using a random timeout instead - const timeoutId = setTimeout(() => controller.abort(), timeoutMs); - - const response = await fetch(url, { ...requestInit, signal }); - - clearTimeout(timeoutId); - - if (response.ok) { - const data = await response.json(); - - return { - ok: true, - data, - }; - } else { - return { - ok: false, - error: `Server error: ${response.status}`, - }; - } - } catch (error) { - if (error instanceof Error && error.name === "AbortError") { - console.log("Long poll request timed out, retrying..."); - continue; - } else { - console.error("Error during fetch, retrying...", error); - - // TODO: exponential backoff - await sleep(1000); - continue; - } - } - } - - return { - ok: false, - error: "TotalDurationExceeded", - }; -}; diff --git a/packages/core/src/v3/apiClient/core.ts b/packages/core/src/v3/apiClient/core.ts index 10d710e5d5..16b9947b38 100644 --- a/packages/core/src/v3/apiClient/core.ts +++ b/packages/core/src/v3/apiClient/core.ts @@ -692,3 +692,52 @@ export function zodfetchSSE { return new ZodFetchSSEResult(options); } + +export type ApiResult = + | { success: true; data: TSuccessResult } + | { + success: false; + error: string; + }; + +export async function wrapZodFetch( + schema: T, + url: string, + requestInit?: RequestInit, + options?: ZodFetchOptions> +): Promise>> { + try { + const response = await zodfetch(schema, url, requestInit, { + retry: { + minTimeoutInMs: 500, + maxTimeoutInMs: 5000, + maxAttempts: 5, + factor: 2, + randomize: false, + }, + ...options, + }); + + return { + success: true, + data: response, + }; + } catch (error) { + if (error instanceof ApiError) { + return { + success: false, + error: error.message, + }; + } else if (error instanceof Error) { + return { + success: false, + error: error.message, + }; + } else { + return { + success: false, + error: String(error), + }; + } + } +} diff --git a/packages/core/src/v3/isomorphic/friendlyId.ts b/packages/core/src/v3/isomorphic/friendlyId.ts index cf0a7a1708..42b8e0dde8 100644 --- a/packages/core/src/v3/isomorphic/friendlyId.ts +++ b/packages/core/src/v3/isomorphic/friendlyId.ts @@ -90,3 +90,25 @@ export const RunId = new IdUtil("run"); export const SnapshotId = new IdUtil("snapshot"); export const WaitpointId = new IdUtil("waitpoint"); export const BatchId = new IdUtil("batch"); + +export class IdGenerator { + private alphabet: string; + private length: number; + private prefix: string; + + constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) { + this.alphabet = alphabet; + this.length = length; + this.prefix = prefix; + } + + generate(): string { + return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`; + } +} + +export const RunnerId = new IdGenerator({ + alphabet: "123456789abcdefghijkmnopqrstuvwxyz", + length: 20, + prefix: "runner_", +}); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index fa2249e47a..6a4bd49cdd 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -18,7 +18,7 @@ import { } from "./schemas.js"; import { SupervisorClientCommonOptions } from "./types.js"; import { getDefaultWorkerHeaders } from "./util.js"; -import { ApiError, zodfetch } from "../../zodfetch.js"; +import { wrapZodFetch } from "../../zodfetch.js"; import { createHeaders } from "../util.js"; import { WORKER_HEADERS } from "../consts.js"; @@ -236,50 +236,3 @@ export class SupervisorHttpClient { }); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index cc36e89682..1505e55f05 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -14,12 +14,12 @@ import { } from "./schemas.js"; import { WorkloadClientCommonOptions } from "./types.js"; import { getDefaultWorkloadHeaders } from "./util.js"; -import { ApiError, zodfetch } from "../../zodfetch.js"; +import { wrapZodFetch } from "../../zodfetch.js"; type WorkloadHttpClientOptions = WorkloadClientCommonOptions; export class WorkloadHttpClient { - private readonly apiUrl: string; + private apiUrl: string; private readonly deploymentId: string; private readonly defaultHeaders: Record; @@ -37,6 +37,10 @@ export class WorkloadHttpClient { } } + updateApiUrl(apiUrl: string) { + this.apiUrl = apiUrl.replace(/\/$/, ""); + } + async heartbeatRun(runId: string, snapshotId: string, body: WorkloadHeartbeatRequestBody) { return wrapZodFetch( WorkloadHeartbeatResponseBody, @@ -163,50 +167,3 @@ export class WorkloadHttpClient { ); } } - -type ApiResult = - | { success: true; data: TSuccessResult } - | { - success: false; - error: string; - }; - -async function wrapZodFetch( - schema: T, - url: string, - requestInit?: RequestInit -): Promise>> { - try { - const response = await zodfetch(schema, url, requestInit, { - retry: { - minTimeoutInMs: 500, - maxTimeoutInMs: 5000, - maxAttempts: 5, - factor: 2, - randomize: false, - }, - }); - - return { - success: true, - data: response, - }; - } catch (error) { - if (error instanceof ApiError) { - return { - success: false, - error: error.message, - }; - } else if (error instanceof Error) { - return { - success: false, - error: error.message, - }; - } else { - return { - success: false, - error: String(error), - }; - } - } -} diff --git a/packages/core/src/v3/schemas/index.ts b/packages/core/src/v3/schemas/index.ts index 9e3c468306..0f0c753122 100644 --- a/packages/core/src/v3/schemas/index.ts +++ b/packages/core/src/v3/schemas/index.ts @@ -13,3 +13,4 @@ export * from "./build.js"; export * from "./runEngine.js"; export * from "./webhooks.js"; export * from "./checkpoints.js"; +export * from "./warmStart.js"; diff --git a/packages/core/src/v3/schemas/warmStart.ts b/packages/core/src/v3/schemas/warmStart.ts new file mode 100644 index 0000000000..1d0e43268e --- /dev/null +++ b/packages/core/src/v3/schemas/warmStart.ts @@ -0,0 +1,8 @@ +import { z } from "zod"; + +export const WarmStartConnectResponse = z.object({ + connectionTimeoutMs: z.number().optional(), + keepaliveMs: z.number().optional(), +}); + +export type WarmStartConnectResponse = z.infer; diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index 3c38f3968a..30c06db5fd 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -16,7 +16,7 @@ type RouteHandler< req: IncomingMessage; res: ServerResponse; reply: HttpReply; -}) => Promise; +}) => Promise; interface RouteDefinition< TParams extends z.ZodFirstPartySchemaTypes = z.ZodUnknown, diff --git a/packages/core/src/v3/utils/heartbeat.ts b/packages/core/src/v3/utils/heartbeat.ts index 3b47c6a54a..0684bd73c5 100644 --- a/packages/core/src/v3/utils/heartbeat.ts +++ b/packages/core/src/v3/utils/heartbeat.ts @@ -54,6 +54,11 @@ export class HeartbeatService { this.#scheduleNextHeartbeat(); } + updateInterval(intervalMs: number) { + this._intervalMs = intervalMs; + this.resetCurrentInterval(); + } + #doHeartbeat = async () => { this.#clearNextHeartbeat(); diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 2ac0d01b84..7b51da7879 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -19,3 +19,4 @@ export { StandardWaitUntilManager } from "../waitUntil/manager.js"; export { ManagedRuntimeManager } from "../runtime/managedRuntimeManager.js"; export * from "../runEngineWorker/index.js"; export { StandardRunTimelineMetricsManager } from "../runTimelineMetrics/runTimelineMetricsManager.js"; +export { WarmStartClient, type WarmStartClientOptions } from "../workers/warmStartClient.js"; diff --git a/packages/core/src/v3/workers/warmStartClient.ts b/packages/core/src/v3/workers/warmStartClient.ts new file mode 100644 index 0000000000..4a70ec8971 --- /dev/null +++ b/packages/core/src/v3/workers/warmStartClient.ts @@ -0,0 +1,169 @@ +import { DequeuedMessage } from "../schemas/runEngine.js"; +import { SimpleStructuredLogger } from "../utils/structuredLogger.js"; +import { WarmStartConnectResponse } from "../schemas/warmStart.js"; +import { ApiResult, wrapZodFetch } from "../zodfetch.js"; +import { ExponentialBackoff } from "../apps/backoff.js"; + +export type WarmStartClientOptions = { + apiUrl: URL; + controllerId: string; + deploymentId: string; + deploymentVersion: string; + machineCpu: string; + machineMemory: string; +}; + +export class WarmStartClient { + private readonly logger = new SimpleStructuredLogger("warm-start-client"); + private readonly apiUrl: URL; + private backoff = new ExponentialBackoff("FullJitter"); + + private get connectUrl() { + return new URL("/connect", this.apiUrl); + } + + private get warmStartUrl() { + return new URL("/warm-start", this.apiUrl); + } + + constructor(private opts: WarmStartClientOptions) { + this.apiUrl = opts.apiUrl; + } + + async connect(): Promise> { + return wrapZodFetch( + WarmStartConnectResponse, + this.connectUrl.href, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + }, + { + retry: { + minTimeoutInMs: 200, + maxTimeoutInMs: 2000, + maxAttempts: 3, + factor: 2, + randomize: false, + }, + } + ); + } + + async warmStart({ + workerInstanceName, + connectionTimeoutMs, + keepaliveMs, + }: { + workerInstanceName: string; + connectionTimeoutMs: number; + keepaliveMs: number; + }): Promise { + const res = await this.longPoll( + this.warmStartUrl.href, + { + method: "GET", + headers: { + "x-trigger-workload-controller-id": this.opts.controllerId, + "x-trigger-deployment-id": this.opts.deploymentId, + "x-trigger-deployment-version": this.opts.deploymentVersion, + "x-trigger-machine-cpu": this.opts.machineCpu, + "x-trigger-machine-memory": this.opts.machineMemory, + "x-trigger-worker-instance-name": workerInstanceName, + }, + }, + { + timeoutMs: connectionTimeoutMs, + totalDurationMs: keepaliveMs, + } + ); + + if (!res.ok) { + this.logger.error("warmStart: failed", { + error: res.error, + connectionTimeoutMs, + keepaliveMs, + }); + return null; + } + + const nextRun = DequeuedMessage.parse(res.data); + + this.logger.debug("warmStart: got next run", { nextRun }); + + return nextRun; + } + + private async longPoll( + url: string, + requestInit: Omit, + { + timeoutMs, + totalDurationMs, + }: { + timeoutMs: number; + totalDurationMs: number; + } + ): Promise< + | { + ok: true; + data: T; + } + | { + ok: false; + error: string; + } + > { + this.logger.debug("Long polling", { url, requestInit, timeoutMs, totalDurationMs }); + + const endTime = Date.now() + totalDurationMs; + + let retries = 0; + + while (Date.now() < endTime) { + try { + const controller = new AbortController(); + const signal = controller.signal; + + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + const response = await fetch(url, { ...requestInit, signal }); + + clearTimeout(timeoutId); + + if (response.ok) { + const data = await response.json(); + + return { + ok: true, + data, + }; + } else { + return { + ok: false, + error: `Server error: ${response.status}`, + }; + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + this.logger.log("Long poll request timed out, retrying..."); + continue; + } else { + this.logger.error("Error during fetch, retrying...", { error }); + + // Wait with exponential backoff + await this.backoff.wait(retries++); + + continue; + } + } + } + + return { + ok: false, + error: "TotalDurationExceeded", + }; + } +}