diff --git a/.changeset/lazy-panthers-shop.md b/.changeset/lazy-panthers-shop.md new file mode 100644 index 0000000000..fa622e087e --- /dev/null +++ b/.changeset/lazy-panthers-shop.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Improve structured logs diff --git a/.changeset/rare-beds-accept.md b/.changeset/rare-beds-accept.md new file mode 100644 index 0000000000..dccd97a96a --- /dev/null +++ b/.changeset/rare-beds-accept.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add verbose structured log level diff --git a/apps/supervisor/.env.example b/apps/supervisor/.env.example index da91ebb6aa..652fc03942 100644 --- a/apps/supervisor/.env.example +++ b/apps/supervisor/.env.example @@ -7,7 +7,7 @@ MANAGED_WORKER_SECRET=managed-secret # Point this at the webapp in prod TRIGGER_API_URL=http://localhost:3030 -# Point this at the OTel collector in prod +# Point this at the webapp or an OTel collector in prod OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:3030/otel # Use this on macOS # OTEL_EXPORTER_OTLP_ENDPOINT=http://host.docker.internal:3030/otel diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 8b29055f9a..7d0c148cb3 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -16,16 +16,13 @@ "@kubernetes/client-node": "^1.0.0", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.3", - "nanoid": "^5.0.9", "prom-client": "^15.1.0", "socket.io": "4.7.4", "std-env": "^3.8.0", - "tinyexec": "^0.3.1", "zod": "3.23.8" }, "devDependencies": { "@types/dockerode": "^3.3.33", - "docker-api-ts": "^0.2.2", "vitest": "^1.4.0" } } diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index 77fd4144d6..f66e57e435 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -3,9 +3,12 @@ import { Informer } from "@kubernetes/client-node"; import { ListPromise } from "@kubernetes/client-node"; import { KubernetesObject } from "@kubernetes/client-node"; import { assertExhaustive } from "@trigger.dev/core/utils"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; +const logger = new SimpleStructuredLogger("kubernetes-client"); + export function createK8sApi() { const kubeConfig = getKubeConfig(); @@ -31,7 +34,7 @@ export function createK8sApi() { export type K8sApi = ReturnType; function getKubeConfig() { - console.log("getKubeConfig()", { RUNTIME_ENV }); + logger.debug("getKubeConfig()", { RUNTIME_ENV }); const kubeConfig = new k8s.KubeConfig(); diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index cd9bf5bead..a4f6596011 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -29,6 +29,17 @@ const Env = z.object({ RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) RUNNER_DOCKER_AUTOREMOVE: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + RUNNER_DOCKER_NETWORKS: z.string().default("host"), // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), @@ -43,14 +54,14 @@ const Env = z.object({ TRIGGER_METADATA_URL: z.string().optional(), // Used by the workload manager, e.g docker/k8s - DOCKER_NETWORK: z.string().default("host"), OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false), KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv // Used by the resource monitor - OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), // Kubernetes specific settings KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), diff --git a/apps/supervisor/src/envUtil.ts b/apps/supervisor/src/envUtil.ts index 41dd5ca22a..95d44d6c45 100644 --- a/apps/supervisor/src/envUtil.ts +++ b/apps/supervisor/src/envUtil.ts @@ -1,4 +1,7 @@ import { z } from "zod"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; + +const logger = new SimpleStructuredLogger("env-util"); export const BoolEnv = z.preprocess((val) => { if (typeof val !== "string") { @@ -33,7 +36,7 @@ export const AdditionalEnvVars = z.preprocess((val) => { // Return undefined if no valid key-value pairs were found return Object.keys(result).length === 0 ? undefined : result; } catch (error) { - console.warn("Failed to parse additional env vars", { error, val }); + logger.warn("Failed to parse additional env vars", { error, val }); return undefined; } }, z.record(z.string(), z.string()).optional()); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index b32e4b00ef..eb5ca4679f 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3"; import { DockerResourceMonitor, KubernetesResourceMonitor, + NoopResourceMonitor, type ResourceMonitor, } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; @@ -33,7 +34,7 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; - private readonly logger = new SimpleStructuredLogger("managed-worker"); + private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; @@ -47,11 +48,11 @@ class ManagedSupervisor { const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env; if (env.DEBUG) { - console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets }); + this.logger.debug("Starting up", { envWithoutSecrets }); } if (this.warmStartUrl) { - this.logger.log("[ManagedWorker] ๐Ÿ”ฅ Warm starts enabled", { + this.logger.log("๐Ÿ”ฅ Warm starts enabled", { warmStartUrl: this.warmStartUrl, }); } @@ -69,9 +70,19 @@ class ManagedSupervisor { dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE, } satisfies WorkloadManagerOptions; + this.resourceMonitor = env.RESOURCE_MONITOR_ENABLED + ? this.isKubernetes + ? new KubernetesResourceMonitor(createK8sApi(), env.TRIGGER_WORKER_INSTANCE_NAME) + : new DockerResourceMonitor(new Docker()) + : new NoopResourceMonitor(); + + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { - this.logger.log("[ManagedWorker] ๐Ÿงน Pod cleaner enabled", { + this.logger.log("๐Ÿงน Pod cleaner enabled", { namespace: env.KUBERNETES_NAMESPACE, batchSize: env.POD_CLEANER_BATCH_SIZE, intervalMs: env.POD_CLEANER_INTERVAL_MS, @@ -83,11 +94,11 @@ class ManagedSupervisor { intervalMs: env.POD_CLEANER_INTERVAL_MS, }); } else { - this.logger.warn("[ManagedWorker] Pod cleaner disabled"); + this.logger.warn("Pod cleaner disabled"); } if (env.FAILED_POD_HANDLER_ENABLED) { - this.logger.log("[ManagedWorker] ๐Ÿ” Failed pod handler enabled", { + this.logger.log("๐Ÿ” Failed pod handler enabled", { namespace: env.KUBERNETES_NAMESPACE, reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, }); @@ -97,17 +108,14 @@ class ManagedSupervisor { reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, }); } else { - this.logger.warn("[ManagedWorker] Failed pod handler disabled"); + this.logger.warn("Failed pod handler disabled"); } + } - this.resourceMonitor = new KubernetesResourceMonitor( - createK8sApi(), - env.TRIGGER_WORKER_INSTANCE_NAME + if (env.TRIGGER_DEQUEUE_INTERVAL_MS > env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS) { + this.logger.warn( + `โš ๏ธ TRIGGER_DEQUEUE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_INTERVAL_MS}) is greater than TRIGGER_DEQUEUE_IDLE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS}) - did you mix them up?` ); - this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions); - } else { - this.resourceMonitor = new DockerResourceMonitor(new Docker()); - this.workloadManager = new DockerWorkloadManager(workloadManagerOptions); } this.workerSession = new SupervisorSession({ @@ -123,12 +131,17 @@ class ManagedSupervisor { runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, preDequeue: async () => { + if (!env.RESOURCE_MONITOR_ENABLED) { + return {}; + } + if (this.isKubernetes) { // Not used in k8s for now return {}; } const resources = await this.resourceMonitor.getNodeResources(); + return { maxResources: { cpu: resources.cpuAvailable, @@ -144,7 +157,7 @@ class ManagedSupervisor { }); if (env.TRIGGER_CHECKPOINT_URL) { - this.logger.log("[ManagedWorker] ๐Ÿฅถ Checkpoints enabled", { + this.logger.log("๐Ÿฅถ Checkpoints enabled", { checkpointUrl: env.TRIGGER_CHECKPOINT_URL, }); @@ -155,43 +168,34 @@ class ManagedSupervisor { }); } - // setInterval(async () => { - // const resources = await this.resourceMonitor.getNodeResources(true); - // this.logger.debug("[ManagedWorker] Current resources", { resources }); - // }, 1000); - this.workerSession.on("runNotification", async ({ time, run }) => { - this.logger.log("[ManagedWorker] runNotification", { time, run }); + this.logger.log("runNotification", { time, run }); this.workloadServer.notifyRun({ run }); }); this.workerSession.on("runQueueMessage", async ({ time, message }) => { - this.logger.log( - `[ManagedWorker] Received message with timestamp ${time.toLocaleString()}`, - message - ); + this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); if (message.completedWaitpoints.length > 0) { - this.logger.debug("[ManagedWorker] Run has completed waitpoints", { + this.logger.debug("Run has completed waitpoints", { runId: message.run.id, completedWaitpoints: message.completedWaitpoints.length, }); - // TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it } if (!message.image) { - this.logger.error("[ManagedWorker] Run has no image", { runId: message.run.id }); + this.logger.error("Run has no image", { runId: message.run.id }); return; } const { checkpoint, ...rest } = message; if (checkpoint) { - this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id }); + this.logger.log("Restoring run", { runId: message.run.id }); if (!this.checkpointClient) { - this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id }); + this.logger.error("No checkpoint client", { runId: message.run.id }); return; } @@ -206,23 +210,23 @@ class ManagedSupervisor { }); if (didRestore) { - this.logger.log("[ManagedWorker] Restore successful", { runId: message.run.id }); + this.logger.log("Restore successful", { runId: message.run.id }); } else { - this.logger.error("[ManagedWorker] Restore failed", { runId: message.run.id }); + this.logger.error("Restore failed", { runId: message.run.id }); } } catch (error) { - this.logger.error("[ManagedWorker] Failed to restore run", { error }); + this.logger.error("Failed to restore run", { error }); } return; } - this.logger.log("[ManagedWorker] Scheduling run", { runId: message.run.id }); + this.logger.log("Scheduling run", { runId: message.run.id }); const didWarmStart = await this.tryWarmStart(message); if (didWarmStart) { - this.logger.log("[ManagedWorker] Warm start successful", { runId: message.run.id }); + this.logger.log("Warm start successful", { runId: message.run.id }); return; } @@ -249,7 +253,7 @@ class ManagedSupervisor { // memory: message.run.machine.memory, // }); } catch (error) { - this.logger.error("[ManagedWorker] Failed to create workload", { error }); + this.logger.error("Failed to create workload", { error }); } }); @@ -277,12 +281,12 @@ class ManagedSupervisor { } async onRunConnected({ run }: { run: { friendlyId: string } }) { - this.logger.debug("[ManagedWorker] Run connected", { run }); + this.logger.debug("Run connected", { run }); this.workerSession.subscribeToRunNotifications([run.friendlyId]); } async onRunDisconnected({ run }: { run: { friendlyId: string } }) { - this.logger.debug("[ManagedWorker] Run disconnected", { run }); + this.logger.debug("Run disconnected", { run }); this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } @@ -303,7 +307,7 @@ class ManagedSupervisor { }); if (!res.ok) { - this.logger.error("[ManagedWorker] Warm start failed", { + this.logger.error("Warm start failed", { runId: dequeuedMessage.run.id, }); return false; @@ -313,7 +317,7 @@ class ManagedSupervisor { const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); if (!parsedData.success) { - this.logger.error("[ManagedWorker] Warm start response invalid", { + this.logger.error("Warm start response invalid", { runId: dequeuedMessage.run.id, data, }); @@ -322,7 +326,7 @@ class ManagedSupervisor { return parsedData.data.didWarmStart; } catch (error) { - this.logger.error("[ManagedWorker] Warm start error", { + this.logger.error("Warm start error", { runId: dequeuedMessage.run.id, error, }); @@ -331,7 +335,7 @@ class ManagedSupervisor { } async start() { - this.logger.log("[ManagedWorker] Starting up"); + this.logger.log("Starting up"); // Optional services await this.podCleaner?.start(); @@ -339,21 +343,21 @@ class ManagedSupervisor { await this.metricsServer?.start(); if (env.TRIGGER_WORKLOAD_API_ENABLED) { - this.logger.log("[ManagedWorker] Workload API enabled", { + this.logger.log("Workload API enabled", { protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL, domain: env.TRIGGER_WORKLOAD_API_DOMAIN, port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL, }); await this.workloadServer.start(); } else { - this.logger.warn("[ManagedWorker] Workload API disabled"); + this.logger.warn("Workload API disabled"); } await this.workerSession.start(); } async stop() { - this.logger.log("[ManagedWorker] Shutting down"); + this.logger.log("Shutting down"); await this.workerSession.stop(); // Optional services diff --git a/apps/supervisor/src/resourceMonitor.ts b/apps/supervisor/src/resourceMonitor.ts index 2aa844b9c7..507a52bbf6 100644 --- a/apps/supervisor/src/resourceMonitor.ts +++ b/apps/supervisor/src/resourceMonitor.ts @@ -1,5 +1,4 @@ import type Docker from "dockerode"; -import type * as TDocker from "docker-api-ts"; import type { MachineResources } from "@trigger.dev/core/v3"; import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { env } from "./env.js"; @@ -71,18 +70,21 @@ export abstract class ResourceMonitor { } protected applyOverrides(resources: NodeResources): NodeResources { - if (!env.OVERRIDE_CPU_TOTAL && !env.OVERRIDE_MEMORY_TOTAL_GB) { + if ( + !env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL && + !env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB + ) { return resources; } logger.debug("[ResourceMonitor] ๐Ÿ›ก๏ธ Applying resource overrides", { - cpuTotal: env.OVERRIDE_CPU_TOTAL, - memoryTotalGb: env.OVERRIDE_MEMORY_TOTAL_GB, + cpuTotal: env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL, + memoryTotalGb: env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB, }); - const cpuTotal = env.OVERRIDE_CPU_TOTAL ?? resources.cpuTotal; - const memoryTotal = env.OVERRIDE_MEMORY_TOTAL_GB - ? this.gbToBytes(env.OVERRIDE_MEMORY_TOTAL_GB) + const cpuTotal = env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL ?? resources.cpuTotal; + const memoryTotal = env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB + ? this.gbToBytes(env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB) : resources.memoryTotal; const cpuDiff = cpuTotal - resources.cpuTotal; @@ -100,6 +102,11 @@ export abstract class ResourceMonitor { } } +type SystemInfo = { + NCPU: number | undefined; + MemTotal: number | undefined; +}; + export class DockerResourceMonitor extends ResourceMonitor { private docker: Docker; @@ -114,7 +121,7 @@ export class DockerResourceMonitor extends ResourceMonitor { return this.cachedResources; } - const info: TDocker.SystemInfo = await this.docker.info(); + const info: SystemInfo = await this.docker.info(); const stats = await this.docker.listContainers({ all: true }); // Get system-wide resources @@ -208,6 +215,21 @@ export class KubernetesResourceMonitor extends ResourceMonitor { } } +export class NoopResourceMonitor extends ResourceMonitor { + constructor() { + super(NoopResourceParser); + } + + async getNodeResources(): Promise { + return { + cpuTotal: 0, + cpuAvailable: Infinity, + memoryTotal: 0, + memoryAvailable: Infinity, + }; + } +} + abstract class ResourceParser { abstract cpu(cpu: number | string): number; abstract memory(memory: number | string): number; @@ -244,3 +266,13 @@ class KubernetesResourceParser extends ResourceParser { return parseInt(memory); } } + +class NoopResourceParser extends ResourceParser { + cpu(cpu: number): number { + return cpu; + } + + memory(memory: number): number { + return memory; + } +} diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 171e2c0971..3669e492a9 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -4,88 +4,172 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { x } from "tinyexec"; import { env } from "../env.js"; import { getDockerHostDomain, getRunnerId } from "../util.js"; +import Docker from "dockerode"; +import { tryCatch } from "@trigger.dev/core"; export class DockerWorkloadManager implements WorkloadManager { - private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); + private readonly logger = new SimpleStructuredLogger("docker-workload-manager"); + private readonly docker: Docker; + + private readonly runnerNetworks: string[]; constructor(private opts: WorkloadManagerOptions) { + this.docker = new Docker(); + if (opts.workloadApiDomain) { - this.logger.warn("[DockerWorkloadProvider] โš ๏ธ Custom workload API domain", { + this.logger.warn("โš ๏ธ Custom workload API domain", { domain: opts.workloadApiDomain, }); } + + this.runnerNetworks = env.RUNNER_DOCKER_NETWORKS.split(","); } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("[DockerWorkloadProvider] Creating container", { opts }); + this.logger.log("create()", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); - const runArgs = [ - "run", - "--detach", - `--network=${env.DOCKER_NETWORK}`, - `--env=TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`, - `--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`, - `--env=TRIGGER_ENV_ID=${opts.envId}`, - `--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`, - `--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, - `--env=TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`, - `--env=TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`, - `--env=TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`, - `--env=TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`, - `--env=OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`, - `--env=TRIGGER_RUNNER_ID=${runnerId}`, - `--hostname=${runnerId}`, - `--name=${runnerId}`, + // Build environment variables + const envVars: string[] = [ + `TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`, + `TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`, + `TRIGGER_ENV_ID=${opts.envId}`, + `TRIGGER_RUN_ID=${opts.runFriendlyId}`, + `TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, + `TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`, + `TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`, + `TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`, + `TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`, + `OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`, + `TRIGGER_RUNNER_ID=${runnerId}`, ]; - if (this.opts.dockerAutoremove) { - runArgs.push("--rm"); - } - if (this.opts.warmStartUrl) { - runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); + envVars.push(`TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); } if (this.opts.metadataUrl) { - runArgs.push(`--env=TRIGGER_METADATA_URL=${this.opts.metadataUrl}`); + envVars.push(`TRIGGER_METADATA_URL=${this.opts.metadataUrl}`); } if (this.opts.heartbeatIntervalSeconds) { - runArgs.push( - `--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}` - ); + envVars.push(`TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`); } if (this.opts.snapshotPollIntervalSeconds) { - runArgs.push( - `--env=TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}` + envVars.push( + `TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}` ); } if (this.opts.additionalEnvVars) { Object.entries(this.opts.additionalEnvVars).forEach(([key, value]) => { - runArgs.push(`--env=${key}=${value}`); + envVars.push(`${key}=${value}`); }); } + const hostConfig: Docker.HostConfig = { + AutoRemove: !!this.opts.dockerAutoremove, + }; + + const [firstNetwork, ...remainingNetworks] = this.runnerNetworks; + + // Always attach the first network at container creation time. This has the following benefits: + // - If there is only a single network to attach, this will prevent having to make a separate request. + // - If there are multiple networks to attach, this will ensure the runner won't also be connected to the bridge network + hostConfig.NetworkMode = firstNetwork; + if (env.ENFORCE_MACHINE_PRESETS) { - runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`); - runArgs.push(`--env=TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); - runArgs.push(`--env=TRIGGER_MACHINE_MEMORY=${opts.machine.memory}`); + envVars.push(`TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); + envVars.push(`TRIGGER_MACHINE_MEMORY=${opts.machine.memory}`); + + hostConfig.NanoCpus = opts.machine.cpu * 1e9; + hostConfig.Memory = opts.machine.memory * 1024 * 1024 * 1024; } - runArgs.push(`${opts.image}`); + const containerCreateOpts: Docker.ContainerCreateOptions = { + Env: envVars, + name: runnerId, + Hostname: runnerId, + HostConfig: hostConfig, + Image: opts.image, + AttachStdout: false, + AttachStderr: false, + AttachStdin: false, + }; try { - const { stdout, stderr } = await x("docker", runArgs); - this.logger.debug("[DockerWorkloadProvider] Create succeeded", { stdout, stderr }); + // Create container + const container = await this.docker.createContainer(containerCreateOpts); + + // If there are multiple networks to attach to we need to attach the remaining ones after creation + if (remainingNetworks.length > 0) { + await this.attachContainerToNetworks({ + containerId: container.id, + networkNames: remainingNetworks, + }); + } + + // Start container + const startResult = await container.start(); + + this.logger.debug("create succeeded", { + opts, + startResult, + containerId: container.id, + containerCreateOpts, + }); } catch (error) { - this.logger.error("[DockerWorkloadProvider] Create failed:", { opts, error }); + this.logger.error("create failed:", { opts, error, containerCreateOpts }); + } + } + + private async attachContainerToNetworks({ + containerId, + networkNames, + }: { + containerId: string; + networkNames: string[]; + }) { + this.logger.debug("Attaching container to networks", { containerId, networkNames }); + + const [error, networkResults] = await tryCatch( + this.docker.listNetworks({ + filters: { + // Full name matches only to prevent unexpected results + name: networkNames.map((name) => `^${name}$`), + }, + }) + ); + + if (error) { + this.logger.error("Failed to list networks", { networkNames }); + return; + } + + const results = await Promise.allSettled( + networkResults.map((networkInfo) => { + const network = this.docker.getNetwork(networkInfo.Id); + return network.connect({ Container: containerId }); + }) + ); + + if (results.some((r) => r.status === "rejected")) { + this.logger.error("Failed to attach container to some networks", { + containerId, + networkNames, + results, + }); + return; } + + this.logger.debug("Attached container to networks", { + containerId, + networkNames, + results, + }); } } diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index cc41e2bfbf..fb7c12c17a 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -63,6 +63,8 @@ type WorkloadServerOptions = { export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private readonly logger = new SimpleStructuredLogger("workload-server"); + private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< WorkloadClientToServerEvents, @@ -151,7 +153,7 @@ export class WorkloadServer extends EventEmitter { ); if (!startResponse.success) { - console.error("Failed to start run", { + this.logger.error("Failed to start run", { params, error: startResponse.error, }); @@ -171,7 +173,6 @@ export class WorkloadServer extends EventEmitter { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptCompleteRequestBody, handler: async ({ req, reply, params, body }) => { - console.log("headers", req.headers); const completeResponse = await this.workerClient.completeRunAttempt( params.runFriendlyId, params.snapshotFriendlyId, @@ -180,7 +181,7 @@ export class WorkloadServer extends EventEmitter { ); if (!completeResponse.success) { - console.error("Failed to complete run", { + this.logger.error("Failed to complete run", { params, error: completeResponse.error, }); @@ -208,7 +209,7 @@ export class WorkloadServer extends EventEmitter { ); if (!heartbeatResponse.success) { - console.error("Failed to heartbeat run", { + this.logger.error("Failed to heartbeat run", { params, error: heartbeatResponse.error, }); @@ -228,7 +229,7 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, handler: async ({ reply, params, req }) => { - console.debug("Suspend request", { params, headers: req.headers }); + this.logger.debug("Suspend request", { params, headers: req.headers }); if (!this.checkpointClient) { reply.json( @@ -247,7 +248,7 @@ export class WorkloadServer extends EventEmitter { const projectRef = this.projectRefFromRequest(req); if (!runnerId || !deploymentVersion || !projectRef) { - console.error("Invalid headers for suspend request", { + this.logger.error("Invalid headers for suspend request", { ...params, headers: req.headers, }); @@ -283,7 +284,7 @@ export class WorkloadServer extends EventEmitter { }); if (!suspendResult) { - console.error("Failed to suspend run", { params }); + this.logger.error("Failed to suspend run", { params }); return; } }, @@ -295,7 +296,7 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, handler: async ({ req, reply, params }) => { - console.debug("Run continuation request", { params }); + this.logger.debug("Run continuation request", { params }); const continuationResult = await this.workerClient.continueRunExecution( params.runFriendlyId, @@ -304,7 +305,7 @@ export class WorkloadServer extends EventEmitter { ); if (!continuationResult.success) { - console.error("Failed to continue run execution", { params }); + this.logger.error("Failed to continue run execution", { params }); reply.json( { ok: false, @@ -329,7 +330,7 @@ export class WorkloadServer extends EventEmitter { ); if (!latestSnapshotResponse.success) { - console.error("Failed to get latest snapshot", { + this.logger.error("Failed to get latest snapshot", { runId: params.runFriendlyId, error: latestSnapshotResponse.error, }); @@ -355,7 +356,7 @@ export class WorkloadServer extends EventEmitter { ); if (!sinceSnapshotResponse.success) { - console.error("Failed to get snapshots since", { + this.logger.error("Failed to get snapshots since", { runId: params.runFriendlyId, error: sinceSnapshotResponse.error, }); @@ -393,7 +394,7 @@ export class WorkloadServer extends EventEmitter { ); if (!dequeueResponse.success) { - console.error("Failed to get latest snapshot", { + this.logger.error("Failed to get latest snapshot", { deploymentId: params.deploymentId, error: dequeueResponse.error, }); @@ -417,14 +418,14 @@ export class WorkloadServer extends EventEmitter { > = io.of("/workload"); websocketServer.on("disconnect", (socket) => { - console.log("[WorkloadSocket] disconnect", socket.id); + this.logger.log("[WS] disconnect", socket.id); }); websocketServer.use(async (socket, next) => { - function setSocketDataFromHeader( + const setSocketDataFromHeader = ( dataKey: keyof typeof socket.data, headerName: string, required: boolean = true - ) { + ) => { const value = socket.handshake.headers[headerName]; if (value) { @@ -440,27 +441,26 @@ export class WorkloadServer extends EventEmitter { } if (required) { - console.error("[WorkloadSocket] missing required header", { headerName }); + this.logger.error("[WS] missing required header", { headerName }); throw new Error("missing header"); } - } + }; try { setSocketDataFromHeader("deploymentId", WORKLOAD_HEADERS.DEPLOYMENT_ID); setSocketDataFromHeader("runnerId", WORKLOAD_HEADERS.RUNNER_ID); } catch (error) { - console.error("[WorkloadSocket] setSocketDataFromHeader error", { error }); + this.logger.error("[WS] setSocketDataFromHeader error", { error }); socket.disconnect(true); return; } - console.debug("[WorkloadSocket] auth success", socket.data); + this.logger.debug("[WS] auth success", socket.data); next(); }); websocketServer.on("connection", (socket) => { - const logger = new SimpleStructuredLogger("workload-namespace", undefined, { - namespace: "workload", + const socketLogger = this.logger.child({ socketId: socket.id, socketData: socket.data, }); @@ -475,11 +475,11 @@ export class WorkloadServer extends EventEmitter { }; const runConnected = (friendlyId: string) => { - logger.debug("runConnected", { ...getSocketMetadata() }); + socketLogger.debug("runConnected", { ...getSocketMetadata() }); // If there's already a run ID set, we should "disconnect" it from this socket if (socket.data.runFriendlyId && socket.data.runFriendlyId !== friendlyId) { - logger.debug("runConnected: disconnecting existing run", { + socketLogger.debug("runConnected: disconnecting existing run", { ...getSocketMetadata(), newRunId: friendlyId, oldRunId: socket.data.runFriendlyId, @@ -493,14 +493,14 @@ export class WorkloadServer extends EventEmitter { }; const runDisconnected = (friendlyId: string) => { - logger.debug("runDisconnected", { ...getSocketMetadata() }); + socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; }; - logger.log("wsServer socket connected", { ...getSocketMetadata() }); + socketLogger.log("wsServer socket connected", { ...getSocketMetadata() }); // FIXME: where does this get set? if (socket.data.runFriendlyId) { @@ -508,7 +508,7 @@ export class WorkloadServer extends EventEmitter { } socket.on("disconnecting", (reason, description) => { - logger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); + socketLogger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); if (socket.data.runFriendlyId) { runDisconnected(socket.data.runFriendlyId); @@ -516,11 +516,11 @@ export class WorkloadServer extends EventEmitter { }); socket.on("disconnect", (reason, description) => { - logger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); + socketLogger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); }); socket.on("error", (error) => { - logger.error("Socket error", { + socketLogger.error("Socket error", { ...getSocketMetadata(), error: { name: error.name, @@ -531,7 +531,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("run:start", async (message) => { - const log = logger.child({ + const log = socketLogger.child({ eventName: "run:start", ...getSocketMetadata(), ...message, @@ -547,7 +547,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("run:stop", async (message) => { - const log = logger.child({ + const log = socketLogger.child({ eventName: "run:stop", ...getSocketMetadata(), ...message, @@ -571,7 +571,7 @@ export class WorkloadServer extends EventEmitter { const runSocket = this.runSockets.get(run.friendlyId); if (!runSocket) { - console.debug("[WorkloadServer] notifyRun: Run socket not found", { run }); + this.logger.debug("notifyRun: Run socket not found", { run }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), @@ -582,14 +582,14 @@ export class WorkloadServer extends EventEmitter { } runSocket.emit("run:notify", { version: "1", run }); - console.debug("[WorkloadServer] run:notify sent", { run }); + this.logger.debug("run:notify sent", { run }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), message: "run:notify supervisor -> runner", }); } catch (error) { - console.error("[WorkloadServer] Error in notifyRun", { run, error }); + this.logger.error("Error in notifyRun", { run, error }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index 43305b456a..407b9deb5f 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -24,6 +24,7 @@ import { getDefaultWorkerHeaders } from "./util.js"; import { wrapZodFetch } from "../../zodfetch.js"; import { createHeaders } from "../util.js"; import { WORKER_HEADERS } from "../consts.js"; +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; type SupervisorHttpClientOptions = SupervisorClientCommonOptions; @@ -33,6 +34,8 @@ export class SupervisorHttpClient { private readonly instanceName: string; private readonly defaultHeaders: Record; + private readonly logger = new SimpleStructuredLogger("supervisor-http-client"); + constructor(opts: SupervisorHttpClientOptions) { this.apiUrl = opts.apiUrl.replace(/\/$/, ""); this.workerToken = opts.workerToken; @@ -217,10 +220,10 @@ export class SupervisorHttpClient { ); if (!res.success) { - console.error("Failed to send debug log", res); + this.logger.error("Failed to send debug log", { error: res.error }); } } catch (error) { - console.error("Failed to send debug log", { error }); + this.logger.error("Failed to send debug log (caught error)", { error }); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 273c7bbe0a..6eb5572bf3 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -1,3 +1,4 @@ +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; import { SupervisorHttpClient } from "./http.js"; import { WorkerApiDequeueResponseBody } from "./schemas.js"; import { PreDequeueFn, PreSkipFn } from "./types.js"; @@ -19,6 +20,8 @@ export class RunQueueConsumer { private readonly maxRunCount?: number; private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly logger = new SimpleStructuredLogger("queue-consumer"); + private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; @@ -52,42 +55,52 @@ export class RunQueueConsumer { } private async dequeue() { - // Incredibly verbose logging for debugging purposes - // console.debug("[RunQueueConsumer] dequeue()", { enabled: this.isEnabled }); + this.logger.verbose("dequeue()", { + enabled: this.isEnabled, + intervalMs: this.intervalMs, + idleIntervalMs: this.idleIntervalMs, + maxRunCount: this.maxRunCount, + preDequeue: !!this.preDequeue, + preSkip: !!this.preSkip, + }); if (!this.isEnabled) { + this.logger.warn("dequeue() - not enabled"); return; } let preDequeueResult: Awaited> | undefined; if (this.preDequeue) { - // console.debug("[RunQueueConsumer] preDequeue()"); + this.logger.verbose("preDequeue()"); try { preDequeueResult = await this.preDequeue(); } catch (preDequeueError) { - console.error("[RunQueueConsumer] preDequeue error", { error: preDequeueError }); + this.logger.error("preDequeue error", { error: preDequeueError }); } } - // console.debug("[RunQueueConsumer] preDequeueResult", { preDequeueResult }); + this.logger.verbose("preDequeueResult", { preDequeueResult }); if ( preDequeueResult?.skipDequeue || preDequeueResult?.maxResources?.cpu === 0 || preDequeueResult?.maxResources?.memory === 0 ) { + this.logger.debug("skipping dequeue", { preDequeueResult }); + if (this.preSkip) { - console.debug("[RunQueueConsumer] preSkip()"); + this.logger.debug("preSkip()"); try { await this.preSkip(); } catch (preSkipError) { - console.error("[RunQueueConsumer] preSkip error", { error: preSkipError }); + this.logger.error("preSkip error", { error: preSkipError }); } } - return this.scheduleNextDequeue(this.idleIntervalMs); + this.scheduleNextDequeue(this.idleIntervalMs); + return; } let nextIntervalMs = this.idleIntervalMs; @@ -99,7 +112,7 @@ export class RunQueueConsumer { }); if (!response.success) { - console.error("[RunQueueConsumer] Failed to dequeue", { error: response.error }); + this.logger.error("Failed to dequeue", { error: response.error }); } else { try { await this.onDequeue(response.data); @@ -108,17 +121,21 @@ export class RunQueueConsumer { nextIntervalMs = this.intervalMs; } } catch (handlerError) { - console.error("[RunQueueConsumer] onDequeue error", { error: handlerError }); + this.logger.error("onDequeue error", { error: handlerError }); } } } catch (clientError) { - console.error("[RunQueueConsumer] client.dequeue error", { error: clientError }); + this.logger.error("client.dequeue error", { error: clientError }); } this.scheduleNextDequeue(nextIntervalMs); } scheduleNextDequeue(delayMs: number) { + if (delayMs === this.idleIntervalMs && this.idleIntervalMs !== this.intervalMs) { + this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); + } + setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 3f19bb97de..b97a147216 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -9,6 +9,7 @@ import { io, Socket } from "socket.io-client"; import { WorkerClientToServerEvents, WorkerServerToClientEvents } from "../types.js"; import { getDefaultWorkerHeaders } from "./util.js"; import { IntervalService } from "../../utils/interval.js"; +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { queueConsumerEnabled?: boolean; @@ -25,6 +26,8 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { export class SupervisorSession extends EventEmitter { public readonly httpClient: SupervisorHttpClient; + private readonly logger = new SimpleStructuredLogger("supervisor-session"); + private readonly runNotificationsEnabled: boolean; private runNotificationsSocket?: Socket; @@ -54,30 +57,27 @@ export class SupervisorSession extends EventEmitter { this.heartbeat = new IntervalService({ onInterval: async () => { - console.debug("[SupervisorSession] Sending heartbeat"); + this.logger.debug("Sending heartbeat"); const body = this.getHeartbeatBody(); const response = await this.httpClient.heartbeatWorker(body); if (!response.success) { - console.error("[SupervisorSession] Heartbeat failed", { error: response.error }); + this.logger.error("Heartbeat failed", { error: response.error }); } }, intervalMs: opts.heartbeatIntervalSeconds * 1000, leadingEdge: false, onError: async (error) => { - console.error("[SupervisorSession] Failed to send heartbeat", { error }); + this.logger.error("Failed to send heartbeat", { error }); }, }); } private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { - // Incredibly verbose logging for debugging purposes - // console.log("[SupervisorSession] Dequeued messages", { count: messages.length }); - // console.debug("[SupervisorSession] Dequeued messages with contents", messages); + this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { - console.log("[SupervisorSession] Emitting message", { message }); this.emit("runQueueMessage", { time: new Date(), message, @@ -86,10 +86,10 @@ export class SupervisorSession extends EventEmitter { } subscribeToRunNotifications(runFriendlyIds: string[]) { - console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds }); + this.logger.debug("Subscribing to run notifications", { runFriendlyIds }); if (!this.runNotificationsSocket) { - console.error("[SupervisorSession] Socket not connected"); + this.logger.error("Socket not connected"); return; } @@ -106,10 +106,10 @@ export class SupervisorSession extends EventEmitter { } unsubscribeFromRunNotifications(runFriendlyIds: string[]) { - console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds }); + this.logger.debug("Unsubscribing from run notifications", { runFriendlyIds }); if (!this.runNotificationsSocket) { - console.error("[SupervisorSession] Socket not connected"); + this.logger.error("Socket not connected"); return; } @@ -137,26 +137,22 @@ export class SupervisorSession extends EventEmitter { extraHeaders: getDefaultWorkerHeaders(this.opts), }); socket.on("run:notify", ({ version, run }) => { - console.log("[SupervisorSession][WS] Received run notification", { version, run }); + this.logger.debug("[WS] Received run notification", { version, run }); this.emit("runNotification", { time: new Date(), run }); - this.httpClient - .sendDebugLog(run.friendlyId, { - time: new Date(), - message: "run:notify received by supervisor", - }) - .catch((error) => { - console.error("[SupervisorSession] Failed to send debug log", { error }); - }); + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "run:notify received by supervisor", + }); }); socket.on("connect", () => { - console.log("[SupervisorSession][WS] Connected to platform"); + this.logger.log("[WS] Connected to platform"); }); socket.on("connect_error", (error) => { - console.error("[SupervisorSession][WS] Connection error", { error }); + this.logger.error("[WS] Connection error", { error }); }); socket.on("disconnect", (reason, description) => { - console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description }); + this.logger.log("[WS] Disconnected from platform", { reason, description }); }); return socket; @@ -170,30 +166,30 @@ export class SupervisorSession extends EventEmitter { }); if (!connect.success) { - console.error("[SupervisorSession][HTTP] Failed to connect", { error: connect.error }); - throw new Error("[SupervisorSession][HTTP] Failed to connect"); + this.logger.error("Failed to connect", { error: connect.error }); + throw new Error("[SupervisorSession]Failed to connect"); } const { workerGroup } = connect.data; - console.log("[SupervisorSession][HTTP] Connected to platform", { + this.logger.log("Connected to platform", { type: workerGroup.type, name: workerGroup.name, }); if (this.queueConsumerEnabled) { - console.log("[SupervisorSession] Queue consumer enabled"); + this.logger.log("Queue consumer enabled"); await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); this.heartbeat.start(); } else { - console.warn("[SupervisorSession] Queue consumer disabled"); + this.logger.warn("Queue consumer disabled"); } if (this.runNotificationsEnabled) { - console.log("[SupervisorSession] Run notifications enabled"); + this.logger.log("Run notifications enabled"); this.runNotificationsSocket = this.createRunNotificationsSocket(); } else { - console.warn("[SupervisorSession] Run notifications disabled"); + this.logger.warn("Run notifications disabled"); } } diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index a4d37409a2..259b902687 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -116,7 +116,9 @@ export type MachineConfig = z.infer; export const MachinePreset = z.object({ name: MachinePresetName, + /** unit: vCPU */ cpu: z.number(), + /** unit: GB */ memory: z.number(), centsPerMs: z.number(), }); diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index bdbfcb7589..4c4fa4eeb1 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -5,7 +5,7 @@ import { HttpReply, getJsonBody } from "../apps/http.js"; import { Registry, Histogram, Counter } from "prom-client"; import { tryCatch } from "../../utils.js"; -const logger = new SimpleStructuredLogger("worker-http"); +const logger = new SimpleStructuredLogger("http-server"); type RouteHandler< TParams extends z.ZodFirstPartySchemaTypes = z.ZodUnknown, @@ -124,7 +124,7 @@ export class HttpServer { try { const { url, method } = req; - logger.log(`${method} ${url?.split("?")[0]}`, { url }); + logger.debug(`${method} ${url?.split("?")[0]}`, { url }); if (!url) { logger.error("Request URL is empty", { method }); diff --git a/packages/core/src/v3/utils/structuredLogger.ts b/packages/core/src/v3/utils/structuredLogger.ts index 72c675aecd..1aae399bbf 100644 --- a/packages/core/src/v3/utils/structuredLogger.ts +++ b/packages/core/src/v3/utils/structuredLogger.ts @@ -15,12 +15,17 @@ export enum LogLevel { "warn", "info", "debug", + "verbose", } export class SimpleStructuredLogger implements StructuredLogger { + private prettyPrint = ["1", "true"].includes(process.env.PRETTY_LOGS ?? ""); + constructor( private name: string, - private level: LogLevel = ["1", "true"].includes(process.env.DEBUG ?? "") + private level: LogLevel = ["1", "true"].includes(process.env.VERBOSE ?? "") + ? LogLevel.verbose + : ["1", "true"].includes(process.env.DEBUG ?? "") ? LogLevel.debug : LogLevel.info, private fields?: Record @@ -60,6 +65,12 @@ export class SimpleStructuredLogger implements StructuredLogger { this.#structuredLog(console.debug, message, "debug", ...args); } + verbose(message: string, ...args: StructuredArgs) { + if (this.level < LogLevel.verbose) return; + + this.#structuredLog(console.debug, message, "verbose", ...args); + } + addFields(fields: Record) { this.fields = { ...this.fields, @@ -82,6 +93,10 @@ export class SimpleStructuredLogger implements StructuredLogger { ...(args.length === 1 ? args[0] : args), }; - loggerFunction(JSON.stringify(structuredLog)); + if (this.prettyPrint) { + loggerFunction(JSON.stringify(structuredLog, null, 2)); + } else { + loggerFunction(JSON.stringify(structuredLog)); + } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 56c69c4e99..8ef216fb69 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -161,9 +161,6 @@ importers: dockerode: specifier: ^4.0.3 version: 4.0.4 - nanoid: - specifier: ^5.0.9 - version: 5.1.2 prom-client: specifier: ^15.1.0 version: 15.1.0 @@ -173,9 +170,6 @@ importers: std-env: specifier: ^3.8.0 version: 3.8.1 - tinyexec: - specifier: ^0.3.1 - version: 0.3.2 zod: specifier: 3.23.8 version: 3.23.8 @@ -183,9 +177,6 @@ importers: '@types/dockerode': specifier: ^3.3.33 version: 3.3.35 - docker-api-ts: - specifier: ^0.2.2 - version: 0.2.2 vitest: specifier: ^1.4.0 version: 1.6.0(@types/node@20.14.14) @@ -22606,10 +22597,6 @@ packages: /dlv@1.1.3: resolution: {integrity: sha512-+HlytyjlPKnIG8XuRG8WvmBP8xs8P71y+SKKS6ZXWoEgLuePxtDoUEiH7WkdePWrQ5JBpE6aoVqfZfJUQkjXwA==} - /docker-api-ts@0.2.2: - resolution: {integrity: sha512-ayoc0OuS6lY7b64GeUtKcPzbKMkK70Vh3BYLKKG13cXX+/gGS9LyTNVvvJyvZ19Y6kbE4Kbv+2gwRUD17UVTRA==} - dev: true - /docker-compose@0.24.8: resolution: {integrity: sha512-plizRs/Vf15H+GCVxq2EUvyPK7ei9b/cVesHvjnX4xaXjM9spHe2Ytq0BitndFgvTJ3E3NljPNUEl7BAN43iZw==} engines: {node: '>= 6.0.0'}