Skip to content

Misc v4 checkpoint fixes #1859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
356a90b
logs for optional services
nicktrn Mar 26, 2025
a3a57d9
print env vars on startup in debug mode
nicktrn Mar 26, 2025
d2dd535
routes need to explicitly ask to keep connection alive
nicktrn Mar 27, 2025
af8adf5
Merge remote-tracking branch 'origin/main' into fix/v4-misc
nicktrn Mar 27, 2025
29ce91a
log indicators for now
nicktrn Mar 27, 2025
47f3c33
make workload api listen host configurable
nicktrn Mar 27, 2025
30077c6
expose supervisor metrics and make more configurable
nicktrn Mar 27, 2025
d48aa89
Merge branch 'fix/checkpoints' into fix/v4-misc
nicktrn Mar 27, 2025
a7efef1
configurable pull secrets, no defaults
nicktrn Mar 27, 2025
208711f
remove restore route
nicktrn Mar 31, 2025
3ca58dc
run controller to handle queued executing
nicktrn Mar 31, 2025
756973f
fix v3 deploys in v4 project
nicktrn Mar 31, 2025
ea07b62
update admin worker route
nicktrn Mar 31, 2025
356d810
only start pod cleaner et al in k8s mode
nicktrn Mar 31, 2025
9d6b2ce
set new worker group as default if none yet
nicktrn Mar 31, 2025
21da4e8
Merge remote-tracking branch 'origin/main' into fix/v4-misc
nicktrn Mar 31, 2025
bf36a53
make image ref optional
nicktrn Mar 31, 2025
b74d15d
checkpoint image ref is optional for output as well
nicktrn Mar 31, 2025
3218787
export feature flag const
nicktrn Mar 31, 2025
b8f9dc1
one last image ref type fix
nicktrn Mar 31, 2025
1b35d6b
Merge remote-tracking branch 'origin/main' into fix/v4-checkpoints
nicktrn Mar 31, 2025
0309078
make runner intervals configurable
nicktrn Apr 1, 2025
75c796a
ability to set arbitrary env vars on new runners
nicktrn Apr 1, 2025
9bcc67d
set default runtime back to node 21
nicktrn Apr 1, 2025
405f5e9
move all runner env vars to the same section
nicktrn Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import { randomUUID } from "crypto";
import { env as stdEnv } from "std-env";
import { z } from "zod";

const BoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
}

return ["true", "1"].includes(val.toLowerCase().trim());
}, z.boolean());
import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";

const Env = z.object({
// This will come from `spec.nodeName` in k8s
Expand All @@ -30,6 +23,11 @@ const Env = z.object({
TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller

// Runner settings
RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(),
RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(),
RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv)

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
Expand Down
80 changes: 80 additions & 0 deletions apps/supervisor/src/envUtil.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { describe, it, expect } from "vitest";
import { BoolEnv, AdditionalEnvVars } from "./envUtil.js";

describe("BoolEnv", () => {
it("should parse string 'true' as true", () => {
expect(BoolEnv.parse("true")).toBe(true);
expect(BoolEnv.parse("TRUE")).toBe(true);
expect(BoolEnv.parse("True")).toBe(true);
});

it("should parse string '1' as true", () => {
expect(BoolEnv.parse("1")).toBe(true);
});

it("should parse string 'false' as false", () => {
expect(BoolEnv.parse("false")).toBe(false);
expect(BoolEnv.parse("FALSE")).toBe(false);
expect(BoolEnv.parse("False")).toBe(false);
});

it("should handle whitespace", () => {
expect(BoolEnv.parse(" true ")).toBe(true);
expect(BoolEnv.parse(" 1 ")).toBe(true);
});

it("should pass through boolean values", () => {
expect(BoolEnv.parse(true)).toBe(true);
expect(BoolEnv.parse(false)).toBe(false);
});

it("should return false for invalid inputs", () => {
expect(BoolEnv.parse("invalid")).toBe(false);
expect(BoolEnv.parse("")).toBe(false);
});
});

describe("AdditionalEnvVars", () => {
it("should parse single key-value pair", () => {
expect(AdditionalEnvVars.parse("FOO=bar")).toEqual({ FOO: "bar" });
});

it("should parse multiple key-value pairs", () => {
expect(AdditionalEnvVars.parse("FOO=bar,BAZ=qux")).toEqual({
FOO: "bar",
BAZ: "qux",
});
});

it("should handle whitespace", () => {
expect(AdditionalEnvVars.parse(" FOO = bar , BAZ = qux ")).toEqual({
FOO: "bar",
BAZ: "qux",
});
});

it("should return undefined for empty string", () => {
expect(AdditionalEnvVars.parse("")).toBeUndefined();
});

it("should return undefined for invalid format", () => {
expect(AdditionalEnvVars.parse("invalid")).toBeUndefined();
});

it("should skip invalid pairs but include valid ones", () => {
expect(AdditionalEnvVars.parse("FOO=bar,INVALID,BAZ=qux")).toEqual({
FOO: "bar",
BAZ: "qux",
});
});

it("should pass through undefined", () => {
expect(AdditionalEnvVars.parse(undefined)).toBeUndefined();
});

it("should handle empty values", () => {
expect(AdditionalEnvVars.parse("FOO=,BAR=value")).toEqual({
BAR: "value",
});
});
});
39 changes: 39 additions & 0 deletions apps/supervisor/src/envUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { z } from "zod";

export const BoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
}

return ["true", "1"].includes(val.toLowerCase().trim());
}, z.boolean());

export const AdditionalEnvVars = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
}

if (!val) {
return undefined;
}

try {
const result = val.split(",").reduce(
(acc, pair) => {
const [key, value] = pair.split("=");
if (!key || !value) {
return acc;
}
acc[key.trim()] = value.trim();
return acc;
},
{} as Record<string, string>
);

// Return undefined if no valid key-value pairs were found
return Object.keys(result).length === 0 ? undefined : result;
} catch (error) {
console.warn("Failed to parse additional env vars", { error, val });
return undefined;
}
}, z.record(z.string(), z.string()).optional());
32 changes: 14 additions & 18 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { SupervisorSession } from "@trigger.dev/core/v3/workers";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { env } from "./env.js";
import { WorkloadServer } from "./workloadServer/index.js";
import { type WorkloadManager } from "./workloadManager/types.js";
import type { WorkloadManagerOptions, WorkloadManager } from "./workloadManager/types.js";
import Docker from "dockerode";
import { z } from "zod";
import { type DequeuedMessage } from "@trigger.dev/core/v3";
Expand Down Expand Up @@ -50,16 +50,23 @@ class ManagedSupervisor {
console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets });
}

const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
warmStartUrl: this.warmStartUrl,
});
}

const workloadManagerOptions = {
workloadApiProtocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
workloadApiDomain: env.TRIGGER_WORKLOAD_API_DOMAIN,
workloadApiPort: env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL,
warmStartUrl: this.warmStartUrl,
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS,
snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS,
additionalEnvVars: env.RUNNER_ADDITIONAL_ENV_VARS,
} satisfies WorkloadManagerOptions;

if (this.isKubernetes) {
if (env.POD_CLEANER_ENABLED) {
this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", {
Expand Down Expand Up @@ -92,21 +99,10 @@ class ManagedSupervisor {
}

this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.workloadManager = new KubernetesWorkloadManager({
workloadApiProtocol,
workloadApiDomain,
workloadApiPort: workloadApiPortExternal,
warmStartUrl: this.warmStartUrl,
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
});
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
this.workloadManager = new DockerWorkloadManager({
workloadApiProtocol,
workloadApiDomain,
workloadApiPort: workloadApiPortExternal,
warmStartUrl: this.warmStartUrl,
});
this.workloadManager = new DockerWorkloadManager(workloadManagerOptions);
}

this.workerSession = new SupervisorSession({
Expand Down
18 changes: 18 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ export class DockerWorkloadManager implements WorkloadManager {
runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
}

if (this.opts.heartbeatIntervalSeconds) {
runArgs.push(
`--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`
);
}

if (this.opts.snapshotPollIntervalSeconds) {
runArgs.push(
`--env=TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}`
);
}

if (this.opts.additionalEnvVars) {
Object.entries(this.opts.additionalEnvVars).forEach(([key, value]) => {
runArgs.push(`--env=${key}=${value}`);
});
}

if (env.ENFORCE_MACHINE_PRESETS) {
runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`);
runArgs.push(`--env=TRIGGER_MACHINE_CPU=${opts.machine.cpu}`);
Expand Down
22 changes: 22 additions & 0 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ export class KubernetesWorkloadManager implements WorkloadManager {
...(this.opts.warmStartUrl
? [{ name: "TRIGGER_WARM_START_URL", value: this.opts.warmStartUrl }]
: []),
...(this.opts.heartbeatIntervalSeconds
? [
{
name: "TRIGGER_HEARTBEAT_INTERVAL_SECONDS",
value: `${this.opts.heartbeatIntervalSeconds}`,
},
]
: []),
...(this.opts.snapshotPollIntervalSeconds
? [
{
name: "TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS",
value: `${this.opts.snapshotPollIntervalSeconds}`,
},
]
: []),
...(this.opts.additionalEnvVars
? Object.entries(this.opts.additionalEnvVars).map(([key, value]) => ({
name: key,
value: value,
}))
: []),
],
},
],
Expand Down
3 changes: 3 additions & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ export interface WorkloadManagerOptions {
workloadApiPort: number;
warmStartUrl?: string;
imagePullSecrets?: string[];
heartbeatIntervalSeconds?: number;
snapshotPollIntervalSeconds?: number;
additionalEnvVars?: Record<string, string>;
}

export interface WorkloadManager {
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/featureFlags.server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { z } from "zod";
import { prisma, PrismaClientOrTransaction } from "~/db.server";

export const FEATURE_FLAG = {
defaultWorkerInstanceGroupId: "defaultWorkerInstanceGroupId",
} as const;

const FeatureFlagCatalog = {
defaultWorkerInstanceGroupId: z.string(),
[FEATURE_FLAG.defaultWorkerInstanceGroupId]: z.string(),
};

type FeatureFlagKey = keyof typeof FeatureFlagCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { WorkerInstanceGroup, WorkerInstanceGroupType } from "@trigger.dev/datab
import { WithRunEngine } from "../baseService.server";
import { WorkerGroupTokenService } from "./workerGroupTokenService.server";
import { logger } from "~/services/logger.server";
import { makeFlags, makeSetFlags } from "~/v3/featureFlags.server";
import { FEATURE_FLAG, makeFlags, makeSetFlags } from "~/v3/featureFlags.server";

export class WorkerGroupService extends WithRunEngine {
private readonly defaultNamePrefix = "worker_group";
Expand Down Expand Up @@ -49,14 +49,14 @@ export class WorkerGroupService extends WithRunEngine {

const getFlag = makeFlags(this._prisma);
const defaultWorkerInstanceGroupId = await getFlag({
key: "defaultWorkerInstanceGroupId",
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
});

// If there's no global default yet we should set it to the new worker group
if (!defaultWorkerInstanceGroupId) {
const setFlag = makeSetFlags(this._prisma);
await setFlag({
key: "defaultWorkerInstanceGroupId",
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
value: workerGroup.id,
});
}
Expand Down Expand Up @@ -169,7 +169,7 @@ export class WorkerGroupService extends WithRunEngine {
const flags = makeFlags(this._prisma);

const defaultWorkerInstanceGroupId = await flags({
key: "defaultWorkerInstanceGroupId",
key: FEATURE_FLAG.defaultWorkerInstanceGroupId,
});

if (!defaultWorkerInstanceGroupId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "TaskRunCheckpoint" ALTER COLUMN "imageRef" DROP NOT NULL;
2 changes: 1 addition & 1 deletion internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -2064,7 +2064,7 @@ model TaskRunCheckpoint {

type TaskRunCheckpointType
location String
imageRef String
imageRef String?
reason String?
metadata String?

Expand Down
2 changes: 1 addition & 1 deletion packages/cli-v3/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async function resolveConfig(
["run_engine_v2" as const].concat(config.compatibilityFlags ?? [])
);

const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node-22" : DEFAULT_RUNTIME;
const defaultRuntime: BuildRuntime = features.run_engine_v2 ? "node" : DEFAULT_RUNTIME;

const mergedConfig = defu(
{
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export type CheckpointType = z.infer<typeof CheckpointType>;
export const CheckpointInput = z.object({
type: CheckpointType,
location: z.string(),
imageRef: z.string(),
imageRef: z.string().nullish(),
reason: z.string().nullish(),
});

Expand Down Expand Up @@ -217,7 +217,7 @@ export const DequeueMessageCheckpoint = z.object({
id: z.string(),
type: CheckpointType,
location: z.string(),
imageRef: z.string(),
imageRef: z.string().nullish(),
reason: z.string().nullish(),
});
export type DequeueMessageCheckpoint = z.infer<typeof DequeueMessageCheckpoint>;
Expand Down