Skip to content

Remove supervisor docker binary requirement #2062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
May 17, 2025
5 changes: 5 additions & 0 deletions .changeset/lazy-panthers-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Improve structured logs
5 changes: 5 additions & 0 deletions .changeset/rare-beds-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add verbose structured log level
2 changes: 1 addition & 1 deletion apps/supervisor/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ MANAGED_WORKER_SECRET=managed-secret
# Point this at the webapp in prod
TRIGGER_API_URL=http://localhost:3030

# Point this at the OTel collector in prod
# Point this at the webapp or an OTel collector in prod
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:3030/otel
# Use this on macOS
# OTEL_EXPORTER_OTLP_ENDPOINT=http://host.docker.internal:3030/otel
Expand Down
3 changes: 0 additions & 3 deletions apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.3",
"nanoid": "^5.0.9",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
"std-env": "^3.8.0",
"tinyexec": "^0.3.1",
"zod": "3.23.8"
},
"devDependencies": {
"@types/dockerode": "^3.3.33",
"docker-api-ts": "^0.2.2",
"vitest": "^1.4.0"
}
}
5 changes: 4 additions & 1 deletion apps/supervisor/src/clients/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import { Informer } from "@kubernetes/client-node";
import { ListPromise } from "@kubernetes/client-node";
import { KubernetesObject } from "@kubernetes/client-node";
import { assertExhaustive } from "@trigger.dev/core/utils";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";

export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";

const logger = new SimpleStructuredLogger("kubernetes-client");

export function createK8sApi() {
const kubeConfig = getKubeConfig();

Expand All @@ -31,7 +34,7 @@ export function createK8sApi() {
export type K8sApi = ReturnType<typeof createK8sApi>;

function getKubeConfig() {
console.log("getKubeConfig()", { RUNTIME_ENV });
logger.debug("getKubeConfig()", { RUNTIME_ENV });

const kubeConfig = new k8s.KubeConfig();

Expand Down
17 changes: 14 additions & 3 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ const Env = z.object({
RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(),
RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv)
RUNNER_DOCKER_AUTOREMOVE: BoolEnv.default(true),
/**
* Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:<name|id>`.
* Any other value is taken as a custom network's name to which all runners should connect to.
*
* Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation.
*
* **WARNING**: Specifying multiple networks will slightly increase startup times.
*
* @default "host"
*/
RUNNER_DOCKER_NETWORKS: z.string().default("host"),

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
Expand All @@ -43,14 +54,14 @@ const Env = z.object({
TRIGGER_METADATA_URL: z.string().optional(),

// Used by the workload manager, e.g docker/k8s
DOCKER_NETWORK: z.string().default("host"),
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(),
ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false),
KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv

// Used by the resource monitor
OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),
RESOURCE_MONITOR_ENABLED: BoolEnv.default(false),
RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),

// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
Expand Down
5 changes: 4 additions & 1 deletion apps/supervisor/src/envUtil.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { z } from "zod";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";

const logger = new SimpleStructuredLogger("env-util");

export const BoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
Expand Down Expand Up @@ -33,7 +36,7 @@ export const AdditionalEnvVars = z.preprocess((val) => {
// Return undefined if no valid key-value pairs were found
return Object.keys(result).length === 0 ? undefined : result;
} catch (error) {
console.warn("Failed to parse additional env vars", { error, val });
logger.warn("Failed to parse additional env vars", { error, val });
return undefined;
}
}, z.record(z.string(), z.string()).optional());
94 changes: 49 additions & 45 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3";
import {
DockerResourceMonitor,
KubernetesResourceMonitor,
NoopResourceMonitor,
type ResourceMonitor,
} from "./resourceMonitor.js";
import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js";
Expand All @@ -33,7 +34,7 @@ class ManagedSupervisor {
private readonly metricsServer?: HttpServer;
private readonly workloadServer: WorkloadServer;
private readonly workloadManager: WorkloadManager;
private readonly logger = new SimpleStructuredLogger("managed-worker");
private readonly logger = new SimpleStructuredLogger("managed-supervisor");
private readonly resourceMonitor: ResourceMonitor;
private readonly checkpointClient?: CheckpointClient;

Expand All @@ -47,11 +48,11 @@ class ManagedSupervisor {
const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env;

if (env.DEBUG) {
console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets });
this.logger.debug("Starting up", { envWithoutSecrets });
}

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
this.logger.log("🔥 Warm starts enabled", {
warmStartUrl: this.warmStartUrl,
});
}
Expand All @@ -69,9 +70,19 @@ class ManagedSupervisor {
dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE,
} satisfies WorkloadManagerOptions;

this.resourceMonitor = env.RESOURCE_MONITOR_ENABLED
? this.isKubernetes
? new KubernetesResourceMonitor(createK8sApi(), env.TRIGGER_WORKER_INSTANCE_NAME)
: new DockerResourceMonitor(new Docker())
: new NoopResourceMonitor();

this.workloadManager = this.isKubernetes
? new KubernetesWorkloadManager(workloadManagerOptions)
: new DockerWorkloadManager(workloadManagerOptions);

if (this.isKubernetes) {
if (env.POD_CLEANER_ENABLED) {
this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", {
this.logger.log("🧹 Pod cleaner enabled", {
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
Expand All @@ -83,11 +94,11 @@ class ManagedSupervisor {
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Pod cleaner disabled");
this.logger.warn("Pod cleaner disabled");
}

if (env.FAILED_POD_HANDLER_ENABLED) {
this.logger.log("[ManagedWorker] 🔁 Failed pod handler enabled", {
this.logger.log("🔁 Failed pod handler enabled", {
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
Expand All @@ -97,17 +108,14 @@ class ManagedSupervisor {
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
this.logger.warn("Failed pod handler disabled");
}
}

this.resourceMonitor = new KubernetesResourceMonitor(
createK8sApi(),
env.TRIGGER_WORKER_INSTANCE_NAME
if (env.TRIGGER_DEQUEUE_INTERVAL_MS > env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS) {
this.logger.warn(
`⚠️ TRIGGER_DEQUEUE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_INTERVAL_MS}) is greater than TRIGGER_DEQUEUE_IDLE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS}) - did you mix them up?`
);
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
this.workloadManager = new DockerWorkloadManager(workloadManagerOptions);
}

this.workerSession = new SupervisorSession({
Expand All @@ -123,12 +131,17 @@ class ManagedSupervisor {
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
preDequeue: async () => {
if (!env.RESOURCE_MONITOR_ENABLED) {
return {};
}

if (this.isKubernetes) {
// Not used in k8s for now
return {};
}

const resources = await this.resourceMonitor.getNodeResources();

return {
maxResources: {
cpu: resources.cpuAvailable,
Expand All @@ -144,7 +157,7 @@ class ManagedSupervisor {
});

if (env.TRIGGER_CHECKPOINT_URL) {
this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", {
this.logger.log("🥶 Checkpoints enabled", {
checkpointUrl: env.TRIGGER_CHECKPOINT_URL,
});

Expand All @@ -155,43 +168,34 @@ class ManagedSupervisor {
});
}

// setInterval(async () => {
// const resources = await this.resourceMonitor.getNodeResources(true);
// this.logger.debug("[ManagedWorker] Current resources", { resources });
// }, 1000);

this.workerSession.on("runNotification", async ({ time, run }) => {
this.logger.log("[ManagedWorker] runNotification", { time, run });
this.logger.log("runNotification", { time, run });

this.workloadServer.notifyRun({ run });
});

this.workerSession.on("runQueueMessage", async ({ time, message }) => {
this.logger.log(
`[ManagedWorker] Received message with timestamp ${time.toLocaleString()}`,
message
);
this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message);

if (message.completedWaitpoints.length > 0) {
this.logger.debug("[ManagedWorker] Run has completed waitpoints", {
this.logger.debug("Run has completed waitpoints", {
runId: message.run.id,
completedWaitpoints: message.completedWaitpoints.length,
});
// TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it
}

if (!message.image) {
this.logger.error("[ManagedWorker] Run has no image", { runId: message.run.id });
this.logger.error("Run has no image", { runId: message.run.id });
return;
}

const { checkpoint, ...rest } = message;

if (checkpoint) {
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });
this.logger.log("Restoring run", { runId: message.run.id });

if (!this.checkpointClient) {
this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id });
this.logger.error("No checkpoint client", { runId: message.run.id });
return;
}

Expand All @@ -206,23 +210,23 @@ class ManagedSupervisor {
});

if (didRestore) {
this.logger.log("[ManagedWorker] Restore successful", { runId: message.run.id });
this.logger.log("Restore successful", { runId: message.run.id });
} else {
this.logger.error("[ManagedWorker] Restore failed", { runId: message.run.id });
this.logger.error("Restore failed", { runId: message.run.id });
}
} catch (error) {
this.logger.error("[ManagedWorker] Failed to restore run", { error });
this.logger.error("Failed to restore run", { error });
}

return;
}

this.logger.log("[ManagedWorker] Scheduling run", { runId: message.run.id });
this.logger.log("Scheduling run", { runId: message.run.id });

const didWarmStart = await this.tryWarmStart(message);

if (didWarmStart) {
this.logger.log("[ManagedWorker] Warm start successful", { runId: message.run.id });
this.logger.log("Warm start successful", { runId: message.run.id });
return;
}

Expand All @@ -249,7 +253,7 @@ class ManagedSupervisor {
// memory: message.run.machine.memory,
// });
} catch (error) {
this.logger.error("[ManagedWorker] Failed to create workload", { error });
this.logger.error("Failed to create workload", { error });
}
});

Expand Down Expand Up @@ -277,12 +281,12 @@ class ManagedSupervisor {
}

async onRunConnected({ run }: { run: { friendlyId: string } }) {
this.logger.debug("[ManagedWorker] Run connected", { run });
this.logger.debug("Run connected", { run });
this.workerSession.subscribeToRunNotifications([run.friendlyId]);
}

async onRunDisconnected({ run }: { run: { friendlyId: string } }) {
this.logger.debug("[ManagedWorker] Run disconnected", { run });
this.logger.debug("Run disconnected", { run });
this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]);
}

Expand All @@ -303,7 +307,7 @@ class ManagedSupervisor {
});

if (!res.ok) {
this.logger.error("[ManagedWorker] Warm start failed", {
this.logger.error("Warm start failed", {
runId: dequeuedMessage.run.id,
});
return false;
Expand All @@ -313,7 +317,7 @@ class ManagedSupervisor {
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);

if (!parsedData.success) {
this.logger.error("[ManagedWorker] Warm start response invalid", {
this.logger.error("Warm start response invalid", {
runId: dequeuedMessage.run.id,
data,
});
Expand All @@ -322,7 +326,7 @@ class ManagedSupervisor {

return parsedData.data.didWarmStart;
} catch (error) {
this.logger.error("[ManagedWorker] Warm start error", {
this.logger.error("Warm start error", {
runId: dequeuedMessage.run.id,
error,
});
Expand All @@ -331,29 +335,29 @@ class ManagedSupervisor {
}

async start() {
this.logger.log("[ManagedWorker] Starting up");
this.logger.log("Starting up");

// Optional services
await this.podCleaner?.start();
await this.failedPodHandler?.start();
await this.metricsServer?.start();

if (env.TRIGGER_WORKLOAD_API_ENABLED) {
this.logger.log("[ManagedWorker] Workload API enabled", {
this.logger.log("Workload API enabled", {
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
domain: env.TRIGGER_WORKLOAD_API_DOMAIN,
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
});
await this.workloadServer.start();
} else {
this.logger.warn("[ManagedWorker] Workload API disabled");
this.logger.warn("Workload API disabled");
}

await this.workerSession.start();
}

async stop() {
this.logger.log("[ManagedWorker] Shutting down");
this.logger.log("Shutting down");
await this.workerSession.stop();

// Optional services
Expand Down
Loading