diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 1b2eb52ddc..5009206a29 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -1,14 +1,7 @@ import { randomUUID } from "crypto"; import { env as stdEnv } from "std-env"; import { z } from "zod"; - -const BoolEnv = z.preprocess((val) => { - if (typeof val !== "string") { - return val; - } - - return ["true", "1"].includes(val.toLowerCase().trim()); -}, z.boolean()); +import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; const Env = z.object({ // This will come from `spec.nodeName` in k8s @@ -30,6 +23,11 @@ const Env = z.object({ TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + // Runner settings + RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000), diff --git a/apps/supervisor/src/envUtil.test.ts b/apps/supervisor/src/envUtil.test.ts new file mode 100644 index 0000000000..c3d35758f1 --- /dev/null +++ b/apps/supervisor/src/envUtil.test.ts @@ -0,0 +1,80 @@ +import { describe, it, expect } from "vitest"; +import { BoolEnv, AdditionalEnvVars } from "./envUtil.js"; + +describe("BoolEnv", () => { + it("should parse string 'true' as true", () => { + expect(BoolEnv.parse("true")).toBe(true); + expect(BoolEnv.parse("TRUE")).toBe(true); + expect(BoolEnv.parse("True")).toBe(true); + }); + + it("should parse string '1' as true", () => { + expect(BoolEnv.parse("1")).toBe(true); + }); + + it("should parse string 'false' as false", () => { + expect(BoolEnv.parse("false")).toBe(false); + expect(BoolEnv.parse("FALSE")).toBe(false); + expect(BoolEnv.parse("False")).toBe(false); + }); + + it("should handle whitespace", () => { + expect(BoolEnv.parse(" true ")).toBe(true); + expect(BoolEnv.parse(" 1 ")).toBe(true); + }); + + it("should pass through boolean values", () => { + expect(BoolEnv.parse(true)).toBe(true); + expect(BoolEnv.parse(false)).toBe(false); + }); + + it("should return false for invalid inputs", () => { + expect(BoolEnv.parse("invalid")).toBe(false); + expect(BoolEnv.parse("")).toBe(false); + }); +}); + +describe("AdditionalEnvVars", () => { + it("should parse single key-value pair", () => { + expect(AdditionalEnvVars.parse("FOO=bar")).toEqual({ FOO: "bar" }); + }); + + it("should parse multiple key-value pairs", () => { + expect(AdditionalEnvVars.parse("FOO=bar,BAZ=qux")).toEqual({ + FOO: "bar", + BAZ: "qux", + }); + }); + + it("should handle whitespace", () => { + expect(AdditionalEnvVars.parse(" FOO = bar , BAZ = qux ")).toEqual({ + FOO: "bar", + BAZ: "qux", + }); + }); + + it("should return undefined for empty string", () => { + expect(AdditionalEnvVars.parse("")).toBeUndefined(); + }); + + it("should return undefined for invalid format", () => { + expect(AdditionalEnvVars.parse("invalid")).toBeUndefined(); + }); + + it("should skip invalid pairs but include valid ones", () => { + expect(AdditionalEnvVars.parse("FOO=bar,INVALID,BAZ=qux")).toEqual({ + FOO: "bar", + BAZ: "qux", + }); + }); + + it("should pass through undefined", () => { + expect(AdditionalEnvVars.parse(undefined)).toBeUndefined(); + }); + + it("should handle empty values", () => { + expect(AdditionalEnvVars.parse("FOO=,BAR=value")).toEqual({ + BAR: "value", + }); + }); +}); diff --git a/apps/supervisor/src/envUtil.ts b/apps/supervisor/src/envUtil.ts new file mode 100644 index 0000000000..41dd5ca22a --- /dev/null +++ b/apps/supervisor/src/envUtil.ts @@ -0,0 +1,39 @@ +import { z } from "zod"; + +export const BoolEnv = z.preprocess((val) => { + if (typeof val !== "string") { + return val; + } + + return ["true", "1"].includes(val.toLowerCase().trim()); +}, z.boolean()); + +export const AdditionalEnvVars = z.preprocess((val) => { + if (typeof val !== "string") { + return val; + } + + if (!val) { + return undefined; + } + + try { + const result = val.split(",").reduce( + (acc, pair) => { + const [key, value] = pair.split("="); + if (!key || !value) { + return acc; + } + acc[key.trim()] = value.trim(); + return acc; + }, + {} as Record + ); + + // Return undefined if no valid key-value pairs were found + return Object.keys(result).length === 0 ? undefined : result; + } catch (error) { + console.warn("Failed to parse additional env vars", { error, val }); + return undefined; + } +}, z.record(z.string(), z.string()).optional()); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 27b9998427..3d9947e00a 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -2,7 +2,7 @@ import { SupervisorSession } from "@trigger.dev/core/v3/workers"; import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { env } from "./env.js"; import { WorkloadServer } from "./workloadServer/index.js"; -import { type WorkloadManager } from "./workloadManager/types.js"; +import type { WorkloadManagerOptions, WorkloadManager } from "./workloadManager/types.js"; import Docker from "dockerode"; import { z } from "zod"; import { type DequeuedMessage } from "@trigger.dev/core/v3"; @@ -50,16 +50,23 @@ class ManagedSupervisor { console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets }); } - const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL; - const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN; - const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL; - if (this.warmStartUrl) { this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", { warmStartUrl: this.warmStartUrl, }); } + const workloadManagerOptions = { + workloadApiProtocol: env.TRIGGER_WORKLOAD_API_PROTOCOL, + workloadApiDomain: env.TRIGGER_WORKLOAD_API_DOMAIN, + workloadApiPort: env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL, + warmStartUrl: this.warmStartUrl, + imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","), + heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS, + snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS, + additionalEnvVars: env.RUNNER_ADDITIONAL_ENV_VARS, + } satisfies WorkloadManagerOptions; + if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", { @@ -92,21 +99,10 @@ class ManagedSupervisor { } this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), ""); - this.workloadManager = new KubernetesWorkloadManager({ - workloadApiProtocol, - workloadApiDomain, - workloadApiPort: workloadApiPortExternal, - warmStartUrl: this.warmStartUrl, - imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","), - }); + this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions); } else { this.resourceMonitor = new DockerResourceMonitor(new Docker()); - this.workloadManager = new DockerWorkloadManager({ - workloadApiProtocol, - workloadApiDomain, - workloadApiPort: workloadApiPortExternal, - warmStartUrl: this.warmStartUrl, - }); + this.workloadManager = new DockerWorkloadManager(workloadManagerOptions); } this.workerSession = new SupervisorSession({ diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 704e564726..9641e4e25a 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -45,6 +45,24 @@ export class DockerWorkloadManager implements WorkloadManager { runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); } + if (this.opts.heartbeatIntervalSeconds) { + runArgs.push( + `--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}` + ); + } + + if (this.opts.snapshotPollIntervalSeconds) { + runArgs.push( + `--env=TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}` + ); + } + + if (this.opts.additionalEnvVars) { + Object.entries(this.opts.additionalEnvVars).forEach(([key, value]) => { + runArgs.push(`--env=${key}=${value}`); + }); + } + if (env.ENFORCE_MACHINE_PRESETS) { runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`); runArgs.push(`--env=TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 50e9b81f47..26e86a1809 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -134,6 +134,28 @@ export class KubernetesWorkloadManager implements WorkloadManager { ...(this.opts.warmStartUrl ? [{ name: "TRIGGER_WARM_START_URL", value: this.opts.warmStartUrl }] : []), + ...(this.opts.heartbeatIntervalSeconds + ? [ + { + name: "TRIGGER_HEARTBEAT_INTERVAL_SECONDS", + value: `${this.opts.heartbeatIntervalSeconds}`, + }, + ] + : []), + ...(this.opts.snapshotPollIntervalSeconds + ? [ + { + name: "TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS", + value: `${this.opts.snapshotPollIntervalSeconds}`, + }, + ] + : []), + ...(this.opts.additionalEnvVars + ? Object.entries(this.opts.additionalEnvVars).map(([key, value]) => ({ + name: key, + value: value, + })) + : []), ], }, ], diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index ed06abc8c8..6eddef22c2 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -6,6 +6,9 @@ export interface WorkloadManagerOptions { workloadApiPort: number; warmStartUrl?: string; imagePullSecrets?: string[]; + heartbeatIntervalSeconds?: number; + snapshotPollIntervalSeconds?: number; + additionalEnvVars?: Record; } export interface WorkloadManager { diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index 1cc57ed48c..6f7c3edce5 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -1,8 +1,12 @@ import { z } from "zod"; import { prisma, PrismaClientOrTransaction } from "~/db.server"; +export const FEATURE_FLAG = { + defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId", +} as const; + const FeatureFlagCatalog = { - defaultWorkerInstanceGroupId: z.string(), + [FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts index c654dd3bf5..e33c3056fe 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts @@ -2,7 +2,7 @@ import { WorkerInstanceGroup, WorkerInstanceGroupType } from "@trigger.dev/datab import { WithRunEngine } from "../baseService.server"; import { WorkerGroupTokenService } from "./workerGroupTokenService.server"; import { logger } from "~/services/logger.server"; -import { makeFlags, makeSetFlags } from "~/v3/featureFlags.server"; +import { FEATURE_FLAG, makeFlags, makeSetFlags } from "~/v3/featureFlags.server"; export class WorkerGroupService extends WithRunEngine { private readonly defaultNamePrefix = "worker_group"; @@ -49,14 +49,14 @@ export class WorkerGroupService extends WithRunEngine { const getFlag = makeFlags(this._prisma); const defaultWorkerInstanceGroupId = await getFlag({ - key: "defaultWorkerInstanceGroupId", + key: FEATURE_FLAG.defaultWorkerInstanceGroupId, }); // If there's no global default yet we should set it to the new worker group if (!defaultWorkerInstanceGroupId) { const setFlag = makeSetFlags(this._prisma); await setFlag({ - key: "defaultWorkerInstanceGroupId", + key: FEATURE_FLAG.defaultWorkerInstanceGroupId, value: workerGroup.id, }); } @@ -169,7 +169,7 @@ export class WorkerGroupService extends WithRunEngine { const flags = makeFlags(this._prisma); const defaultWorkerInstanceGroupId = await flags({ - key: "defaultWorkerInstanceGroupId", + key: FEATURE_FLAG.defaultWorkerInstanceGroupId, }); if (!defaultWorkerInstanceGroupId) { diff --git a/internal-packages/database/prisma/migrations/20250331105838_make_checkpoint_image_ref_optional/migration.sql b/internal-packages/database/prisma/migrations/20250331105838_make_checkpoint_image_ref_optional/migration.sql new file mode 100644 index 0000000000..b5989cc616 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250331105838_make_checkpoint_image_ref_optional/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRunCheckpoint" ALTER COLUMN "imageRef" DROP NOT NULL; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 216ed36851..b73a9f8e1f 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2064,7 +2064,7 @@ model TaskRunCheckpoint { type TaskRunCheckpointType location String - imageRef String + imageRef String? reason String? metadata String? diff --git a/packages/cli-v3/src/config.ts b/packages/cli-v3/src/config.ts index 222d05312d..8cca2d1605 100644 --- a/packages/cli-v3/src/config.ts +++ b/packages/cli-v3/src/config.ts @@ -173,7 +173,7 @@ async function resolveConfig( ["run_engine_v2" as const].concat(config.compatibilityFlags ?? []) ); - const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node-22" : DEFAULT_RUNTIME; + const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node" : DEFAULT_RUNTIME; const mergedConfig = defu( { diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index c45a651c1d..efcfe8086e 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -161,7 +161,7 @@ export type CheckpointType = z.infer; export const CheckpointInput = z.object({ type: CheckpointType, location: z.string(), - imageRef: z.string(), + imageRef: z.string().nullish(), reason: z.string().nullish(), }); @@ -217,7 +217,7 @@ export const DequeueMessageCheckpoint = z.object({ id: z.string(), type: CheckpointType, location: z.string(), - imageRef: z.string(), + imageRef: z.string().nullish(), reason: z.string().nullish(), }); export type DequeueMessageCheckpoint = z.infer;