Skip to content

Misc v4 fixes and improved logging #1831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions apps/supervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,15 @@
api_url=http://localhost:3030
wg_name=my-worker

# edit these
# edit this
admin_pat=tr_pat_...
project_id=clsw6q8wz...

curl -sS \
-X POST \
"$api_url/admin/api/v1/workers" \
-H "Authorization: Bearer $admin_pat" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"$wg_name\",
\"makeDefault\": true,
\"projectId\": \"$project_id\"
}"
-d "{\"name\": \"$wg_name\"}"
```

2. Create `.env` and set the worker token
Expand All @@ -47,3 +42,26 @@ pnpm exec trigger deploy --self-hosted
# The additional network flag is required on linux
pnpm exec trigger deploy --self-hosted --network host
```

## Additional worker groups

When adding more worker groups you might also want to make them the default for a specific project. This will allow you to test it without having to change the global default:

```sh
api_url=http://localhost:3030
wg_name=my-worker

# edit these
admin_pat=tr_pat_...
project_id=clsw6q8wz...

curl -sS \
-X POST \
"$api_url/admin/api/v1/workers" \
-H "Authorization: Bearer $admin_pat" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"$wg_name\",
\"makeDefaultForProjectId\": \"$project_id\"
}"
```
8 changes: 8 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const Env = z.object({
.transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase()))
.default("http"),
TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default
TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"),
TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller

Expand All @@ -41,6 +42,7 @@ const Env = z.object({
DOCKER_NETWORK: z.string().default("host"),
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(),
ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false),
KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv

// Used by the resource monitor
OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
Expand All @@ -53,7 +55,10 @@ const Env = z.object({
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

// Metrics
METRICS_ENABLED: BoolEnv.default(true),
METRICS_COLLECT_DEFAULTS: BoolEnv.default(true),
METRICS_HOST: z.string().default("127.0.0.1"),
METRICS_PORT: z.coerce.number().int().default(9090),

// Pod cleaner
POD_CLEANER_ENABLED: BoolEnv.default(true),
Expand All @@ -63,6 +68,9 @@ const Env = z.object({
// Failed pod handler
FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true),
FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000),

// Debug
DEBUG: BoolEnv.default(false),
});

export const env = Env.parse(stdEnv);
97 changes: 58 additions & 39 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if (env.METRICS_COLLECT_DEFAULTS) {

class ManagedSupervisor {
private readonly workerSession: SupervisorSession;
private readonly httpServer: HttpServer;
private readonly metricsServer?: HttpServer;
private readonly workloadServer: WorkloadServer;
private readonly workloadManager: WorkloadManager;
private readonly logger = new SimpleStructuredLogger("managed-worker");
Expand All @@ -44,24 +44,15 @@ class ManagedSupervisor {
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;

constructor() {
const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;
const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env;

if (env.POD_CLEANER_ENABLED) {
this.podCleaner = new PodCleaner({
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
if (env.DEBUG) {
console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets });
}

if (env.FAILED_POD_HANDLER_ENABLED) {
this.failedPodHandler = new FailedPodHandler({
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
}
const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
Expand All @@ -70,12 +61,43 @@ class ManagedSupervisor {
}

if (this.isKubernetes) {
if (env.POD_CLEANER_ENABLED) {
this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", {
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
this.podCleaner = new PodCleaner({
register,
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Pod cleaner disabled");
}

if (env.FAILED_POD_HANDLER_ENABLED) {
this.logger.log("[ManagedWorker] 🔁 Failed pod handler enabled", {
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
this.failedPodHandler = new FailedPodHandler({
register,
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
}

this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.workloadManager = new KubernetesWorkloadManager({
workloadApiProtocol,
workloadApiDomain,
workloadApiPort: workloadApiPortExternal,
warmStartUrl: this.warmStartUrl,
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
});
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
Expand Down Expand Up @@ -224,16 +246,21 @@ class ManagedSupervisor {
}
});

// Used for health checks and metrics
this.httpServer = new HttpServer({ port: 8080, host: "0.0.0.0" }).route("/health", "GET", {
handler: async ({ reply }) => {
reply.text("OK");
},
});
if (env.METRICS_ENABLED) {
this.metricsServer = new HttpServer({
port: env.METRICS_PORT,
host: env.METRICS_HOST,
metrics: {
register,
expose: true,
},
});
}

// Responds to workload requests only
this.workloadServer = new WorkloadServer({
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL,
workerClient: this.workerSession.httpClient,
checkpointClient: this.checkpointClient,
});
Expand Down Expand Up @@ -299,13 +326,10 @@ class ManagedSupervisor {
async start() {
this.logger.log("[ManagedWorker] Starting up");

if (this.podCleaner) {
await this.podCleaner.start();
}

if (this.failedPodHandler) {
await this.failedPodHandler.start();
}
// Optional services
await this.podCleaner?.start();
await this.failedPodHandler?.start();
await this.metricsServer?.start();

if (env.TRIGGER_WORKLOAD_API_ENABLED) {
this.logger.log("[ManagedWorker] Workload API enabled", {
Expand All @@ -319,21 +343,16 @@ class ManagedSupervisor {
}

await this.workerSession.start();

await this.httpServer.start();
}

async stop() {
this.logger.log("[ManagedWorker] Shutting down");
await this.httpServer.stop();
await this.workerSession.stop();

if (this.podCleaner) {
await this.podCleaner.stop();
}

if (this.failedPodHandler) {
await this.failedPodHandler.stop();
}
// Optional services
await this.podCleaner?.stop();
await this.failedPodHandler?.stop();
await this.metricsServer?.stop();
}
}

Expand Down
13 changes: 5 additions & 8 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,15 @@ export class KubernetesWorkloadManager implements WorkloadManager {
}
}

private getImagePullSecrets(): k8s.V1LocalObjectReference[] | undefined {
return this.opts.imagePullSecrets?.map((name) => ({ name }));
}

get #defaultPodSpec(): Omit<k8s.V1PodSpec, "containers"> {
return {
restartPolicy: "Never",
automountServiceAccountToken: false,
imagePullSecrets: [
{
name: "registry-trigger",
},
{
name: "registry-trigger-failover",
},
],
imagePullSecrets: this.getImagePullSecrets(),
nodeSelector: {
nodetype: "worker-re2",
},
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface WorkloadManagerOptions {
workloadApiDomain?: string; // If unset, will use orchestrator-specific default
workloadApiPort: number;
warmStartUrl?: string;
imagePullSecrets?: string[];
}

export interface WorkloadManager {
Expand Down
15 changes: 14 additions & 1 deletion apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
} from "@trigger.dev/core/v3/workers";
import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOnly";
import { type IncomingMessage } from "node:http";
import { register } from "../metrics.js";

// Use the official export when upgrading to socket.io@4.8.0
interface DefaultEventsMap {
Expand Down Expand Up @@ -121,7 +122,19 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
}

private createHttpServer({ host, port }: { host: string; port: number }) {
return new HttpServer({ port, host })
return new HttpServer({
port,
host,
metrics: {
register,
expose: false,
},
})
.route("/health", "GET", {
handler: async ({ reply }) => {
reply.text("OK");
},
})
.route(
"/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/start",
"POST",
Expand Down
10 changes: 4 additions & 6 deletions apps/webapp/app/routes/admin.api.v1.workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv
const RequestBodySchema = z.object({
name: z.string().optional(),
description: z.string().optional(),
projectId: z.string().optional(),
makeDefault: z.boolean().optional(),
makeDefaultForProjectId: z.string().optional(),
});

export async function action({ request }: ActionFunctionArgs) {
Expand All @@ -35,22 +34,21 @@ export async function action({ request }: ActionFunctionArgs) {

try {
const rawBody = await request.json();
const { name, description, projectId, makeDefault } = RequestBodySchema.parse(rawBody ?? {});
const { name, description, makeDefaultForProjectId } = RequestBodySchema.parse(rawBody ?? {});

const service = new WorkerGroupService();
const { workerGroup, token } = await service.createWorkerGroup({
name,
description,
});

if (makeDefault && projectId) {
if (makeDefaultForProjectId) {
await prisma.project.update({
where: {
id: projectId,
id: makeDefaultForProjectId,
},
data: {
defaultWorkerGroupId: workerGroup.id,
engine: "V2",
},
});
}
Expand Down

This file was deleted.

Loading