From 2d32e093bd99ae5fdd5cb019b3d608f5c3fab3c0 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 22 Apr 2025 08:11:20 +0100 Subject: [PATCH 1/6] handle queued status change gracefully --- packages/cli-v3/src/entryPoints/managed/execution.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index 4b8a11866f..58d41443dc 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -269,6 +269,13 @@ export class RunExecution { this.abortExecution(); return; } + case "QUEUED": { + this.sendDebugLog("Run was re-queued", snapshotMetadata); + + // Pretend we've just suspended the run. This will kill the process without failing the run. + await this.taskRunProcess?.suspend(); + return; + } case "FINISHED": { this.sendDebugLog("Run is finished", snapshotMetadata); @@ -402,8 +409,7 @@ export class RunExecution { return; } - case "RUN_CREATED": - case "QUEUED": { + case "RUN_CREATED": { this.sendDebugLog("Invalid status change", snapshotMetadata); this.abortExecution(); From 3aeb86b57b4e94bda2f650bb05596bf4ff5c6ea4 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Apr 2025 13:54:09 +0100 Subject: [PATCH 2/6] add resource monitor to references --- references/hello-world/src/resourceMonitor.ts | 488 ++++++++++++++++++ 1 file changed, 488 insertions(+) create mode 100644 references/hello-world/src/resourceMonitor.ts diff --git a/references/hello-world/src/resourceMonitor.ts b/references/hello-world/src/resourceMonitor.ts new file mode 100644 index 0000000000..9b4bf28501 --- /dev/null +++ b/references/hello-world/src/resourceMonitor.ts @@ -0,0 +1,488 @@ +import { promisify } from "node:util"; +import { exec } from "node:child_process"; +import os from "node:os"; +import { promises as fs } from "node:fs"; +import { type Context, logger } from "@trigger.dev/sdk"; + +const execAsync = promisify(exec); + +export type DiskMetrics = { + total: number; + used: number; + free: number; + percentUsed: number; + warning?: string; +}; + +export type MemoryMetrics = { + total: number; + free: number; + used: number; + percentUsed: number; +}; + +export type NodeProcessMetrics = { + memoryUsage: number; + memoryUsagePercent: number; +}; + +export type TargetProcessMetrics = { + method: string; + processName: string; + count: number; + processes: ProcessInfo[]; + averages: { + cpu: number; + memory: number; + rss: number; + vsz: number; + } | null; + totals: { + cpu: number; + memory: number; + rss: number; + vsz: number; + } | null; +}; + +export type ProcessMetrics = { + node: NodeProcessMetrics; + target: TargetProcessMetrics | null; +}; + +type ProcessInfo = { + user: string; + pid: number; + cpu: number; + mem: number; + vsz: number; + rss: number; + command: string; +}; + +export type SystemMetrics = { + disk: DiskMetrics; + memory: MemoryMetrics; +}; + +export type ResourceMonitorConfig = { + dirName?: string; + processName?: string; + ctx: Context; +}; + +// Constants +const DISK_LIMIT_GB = 10; +const DISK_LIMIT_BYTES = DISK_LIMIT_GB * 1024 * 1024 * 1024; // 10Gi in bytes + +/** + * Utility class for monitoring system resources and process metrics + */ +export class ResourceMonitor { + private logInterval: NodeJS.Timeout | null = null; + private logger: typeof logger; + private dirName: string; + private processName: string; + private ctx: Context; + + constructor(config: ResourceMonitorConfig) { + this.logger = logger; + this.dirName = config.dirName ?? "/tmp"; + this.processName = config.processName ?? "node"; + this.ctx = config.ctx; + } + + /** + * Start periodic resource monitoring + * @param intervalMs Monitoring interval in milliseconds + */ + startMonitoring(intervalMs = 10000): void { + if (intervalMs < 1000) { + intervalMs = 1000; + this.logger.warn("ResourceMonitor: intervalMs is less than 1000, setting to 1000"); + } + + if (this.logInterval) { + clearInterval(this.logInterval); + } + + this.logInterval = setInterval(this.logResources.bind(this), intervalMs); + } + + /** + * Stop resource monitoring + */ + stopMonitoring(): void { + if (this.logInterval) { + clearInterval(this.logInterval); + this.logInterval = null; + } + } + + private async logResources() { + try { + await this.logResourceSnapshot("RESOURCE_MONITOR"); + } catch (error) { + this.logger.error( + `Resource monitoring error: ${error instanceof Error ? error.message : String(error)}` + ); + } + } + + /** + * Get combined system metrics (disk and memory) + */ + private async getSystemMetrics(): Promise { + const [disk, memory] = await Promise.all([this.getDiskMetrics(), this.getMemoryMetrics()]); + return { disk, memory }; + } + + /** + * Get disk space information + */ + private async getDiskMetrics(): Promise { + try { + // Even with permission errors, du will output a total + const { stdout, stderr } = await execAsync(`du -sb ${this.dirName} || true`); + + // Get the last line of stdout which contains the total + const lastLine = stdout.split("\n").filter(Boolean).pop() || ""; + const usedBytes = parseInt(lastLine.split("\t")[0], 10); + + const effectiveTotal = DISK_LIMIT_BYTES; + const effectiveUsed = Math.min(usedBytes, DISK_LIMIT_BYTES); + const effectiveFree = effectiveTotal - effectiveUsed; + const percentUsed = (effectiveUsed / effectiveTotal) * 100; + + const metrics: DiskMetrics = { + total: effectiveTotal, + used: effectiveUsed, + free: effectiveFree, + percentUsed, + }; + + // If we had permission errors, add a warning + if (stderr.includes("Permission denied") || stderr.includes("cannot access")) { + metrics.warning = "Some directories were not accessible"; + } else if (stderr.includes("No such file or directory")) { + metrics.warning = "The directory does not exist"; + } + + return metrics; + } catch (error) { + this.logger.error( + `Error getting disk metrics: ${error instanceof Error ? error.message : String(error)}` + ); + return { + free: DISK_LIMIT_BYTES, + total: DISK_LIMIT_BYTES, + used: 0, + percentUsed: 0, + warning: "Failed to measure disk usage", + }; + } + } + + /** + * Get memory metrics + */ + private getMemoryMetrics(): MemoryMetrics { + const total = os.totalmem(); + const free = os.freemem(); + const used = total - free; + const percentUsed = (used / total) * 100; + + return { total, free, used, percentUsed }; + } + + /** + * Get process-specific metrics using /proc filesystem + */ + private async getProcMetrics(pids: number[]): Promise { + return Promise.all( + pids.map(async (pid) => { + try { + // Read process status + const status = await fs.readFile(`/proc/${pid}/status`, "utf8"); + const cmdline = await fs.readFile(`/proc/${pid}/cmdline`, "utf8"); + const stat = await fs.readFile(`/proc/${pid}/stat`, "utf8"); + + // Parse VmRSS (resident set size) from status + const rss = parseInt(status.match(/VmRSS:\s+(\d+)/)?.[1] ?? "0", 10); + // Parse VmSize (virtual memory size) from status + const vsz = parseInt(status.match(/VmSize:\s+(\d+)/)?.[1] ?? "0", 10); + // Get process owner + const user = (await fs.stat(`/proc/${pid}`)).uid.toString(); + + // Parse CPU stats from /proc/[pid]/stat + const stats = stat.split(" "); + const utime = parseInt(stats[13], 10); + const stime = parseInt(stats[14], 10); + const starttime = parseInt(stats[21], 10); + + // Calculate CPU percentage + const totalTime = utime + stime; + const uptime = os.uptime(); + const hertz = 100; // Usually 100 on Linux + const elapsedTime = uptime - starttime / hertz; + const cpuUsage = 100 * (totalTime / hertz / elapsedTime); + + // Calculate memory percentage against total system memory + const totalMem = os.totalmem(); + const memoryPercent = (rss * 1024 * 100) / totalMem; + + return { + user, + pid, + cpu: cpuUsage, + mem: memoryPercent, + vsz, + rss, + command: cmdline.replace(/\0/g, " ").trim(), + }; + } catch (error) { + return null; + } + }) + ).then((results) => results.filter((r): r is ProcessInfo => r !== null)); + } + + /** + * Find PIDs for a process name using /proc filesystem + */ + private async findPidsByName(processName: string): Promise { + try { + const pids: number[] = []; + const procDirs = await fs.readdir("/proc"); + + for (const dir of procDirs) { + if (!/^\d+$/.test(dir)) continue; + + try { + const cmdline = await fs.readFile(`/proc/${dir}/cmdline`, "utf8"); + if (cmdline.includes(processName)) { + pids.push(parseInt(dir, 10)); + } + } catch { + // Ignore errors reading individual process info + continue; + } + } + + return pids; + } catch { + return []; + } + } + + /** + * Get process-specific metrics + */ + private async getProcessMetrics(): Promise { + // Get Node.js process metrics + const totalMemory = os.totalmem(); + // Convert GB to bytes (machine.memory is in GB) + const machineMemoryBytes = this.ctx.machine + ? this.ctx.machine.memory * 1024 * 1024 * 1024 + : totalMemory; + const nodeMemoryUsage = process.memoryUsage().rss; + + // Node process percentage is based on machine memory if available, otherwise system memory + const nodeMemoryPercent = (nodeMemoryUsage / machineMemoryBytes) * 100; + + const nodeMetrics: NodeProcessMetrics = { + memoryUsage: nodeMemoryUsage, + memoryUsagePercent: nodeMemoryPercent, + }; + + let method = "ps"; + + try { + let processes: ProcessInfo[] = []; + + // Try ps first, fall back to /proc if it fails + try { + const { stdout: psOutput } = await execAsync( + `ps aux | grep ${this.processName} | grep -v grep` + ); + + if (psOutput.trim()) { + processes = psOutput + .trim() + .split("\n") + .map((line) => { + const parts = line.trim().split(/\s+/); + return { + user: parts[0], + pid: parseInt(parts[1], 10), + cpu: parseFloat(parts[2]), + mem: parseFloat(parts[3]), + vsz: parseInt(parts[4], 10), + rss: parseInt(parts[5], 10), + command: parts.slice(10).join(" "), + }; + }); + } + } catch { + // ps failed, try /proc instead + method = "proc"; + const pids = await this.findPidsByName(this.processName); + processes = await this.getProcMetrics(pids); + } + + if (processes.length === 0) { + return { + node: nodeMetrics, + target: { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + }, + }; + } + + // For CPU: + // - ps shows CPU percentage per core (e.g., 100% = 1 core) + // - machine.cpu is in cores (e.g., 0.5 = half a core) + // - we want to show percentage of allocated CPU (e.g., 100% = using all allocated CPU) + const availableCpu = this.ctx.machine?.cpu ?? os.cpus().length; + const cpuNormalizer = availableCpu * 100; // Convert to basis points for better precision with fractional CPUs + + // For Memory: + // - ps 'mem' is already a percentage of system memory + // - we need to convert it to a percentage of machine memory + // - if machine memory is 0.5GB and system has 16GB, we multiply the percentage by 32 + const memoryScaleFactor = this.ctx.machine ? totalMemory / machineMemoryBytes : 1; + + const totals = processes.reduce( + (acc, proc) => ({ + cpu: acc.cpu + proc.cpu, + // Scale memory percentage to machine memory + // TODO: test this + memory: acc.memory + proc.mem * memoryScaleFactor, + rss: acc.rss + proc.rss, + vsz: acc.vsz + proc.vsz, + }), + { cpu: 0, memory: 0, rss: 0, vsz: 0 } + ); + + const count = processes.length; + + const averages = { + cpu: totals.cpu / (count * cpuNormalizer), + memory: totals.memory / count, + rss: totals.rss / count, + vsz: totals.vsz / count, + }; + + return { + node: nodeMetrics, + target: { + method, + processName: this.processName, + count, + processes, + averages, + totals: { + cpu: totals.cpu / cpuNormalizer, + memory: totals.memory, + rss: totals.rss, + vsz: totals.vsz, + }, + }, + }; + } catch (error) { + return { + node: nodeMetrics, + target: { + method, + processName: this.processName, + count: 0, + processes: [], + averages: null, + totals: null, + }, + }; + } + } + + /** + * Log a snapshot of current resource usage + */ + async logResourceSnapshot(label = "Resource Snapshot"): Promise { + try { + const [systemMetrics, processMetrics] = await Promise.all([ + this.getSystemMetrics(), + this.getProcessMetrics(), + ]); + + const formatBytes = (bytes: number) => (bytes / (1024 * 1024)).toFixed(2); + const formatPercent = (value: number) => value.toFixed(1); + + this.logger.info(label, { + system: { + disk: { + limitGiB: DISK_LIMIT_GB, + dirName: this.dirName, + usedGiB: (systemMetrics.disk.used / (1024 * 1024 * 1024)).toFixed(2), + freeGiB: (systemMetrics.disk.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.disk.percentUsed), + warning: systemMetrics.disk.warning, + }, + memory: { + freeGB: (systemMetrics.memory.free / (1024 * 1024 * 1024)).toFixed(2), + percentUsed: formatPercent(systemMetrics.memory.percentUsed), + }, + }, + constraints: this.ctx.machine + ? { + cpu: this.ctx.machine.cpu, + memoryGB: this.ctx.machine.memory, + diskGB: DISK_LIMIT_BYTES / (1024 * 1024 * 1024), + } + : { + cpu: os.cpus().length, + memoryGB: Math.floor(os.totalmem() / (1024 * 1024 * 1024)), + note: "Using system resources (no machine constraints specified)", + }, + process: { + node: { + memoryUsageMB: formatBytes(processMetrics.node.memoryUsage), + memoryUsagePercent: formatPercent(processMetrics.node.memoryUsagePercent), + }, + target: processMetrics.target + ? { + method: processMetrics.target.method, + processName: processMetrics.target.processName, + count: processMetrics.target.count, + averages: processMetrics.target.averages + ? { + cpuPercent: formatPercent(processMetrics.target.averages.cpu * 100), + memoryPercent: formatPercent(processMetrics.target.averages.memory), + rssMB: formatBytes(processMetrics.target.averages.rss * 1024), + vszMB: formatBytes(processMetrics.target.averages.vsz * 1024), + } + : null, + totals: processMetrics.target.totals + ? { + cpuPercent: formatPercent(processMetrics.target.totals.cpu * 100), + memoryPercent: formatPercent(processMetrics.target.totals.memory), + rssMB: formatBytes(processMetrics.target.totals.rss * 1024), + vszMB: formatBytes(processMetrics.target.totals.vsz * 1024), + } + : null, + } + : null, + }, + timestamp: new Date().toISOString(), + }); + } catch (error) { + this.logger.error( + `Error logging resource snapshot: ${error instanceof Error ? error.message : String(error)}` + ); + } + } +} From 13afaa2c353077de1830e8dbd31ac73e2102f83f Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 17 Apr 2025 13:54:20 +0100 Subject: [PATCH 3/6] add resource monitor example --- references/hello-world/src/trigger/example.ts | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index b72fa03fa9..040818f1de 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -1,5 +1,6 @@ import { batch, logger, task, timeout, wait } from "@trigger.dev/sdk"; import { setTimeout } from "timers/promises"; +import { ResourceMonitor } from "../resourceMonitor.js"; export const helloWorldTask = task({ id: "hello-world", @@ -206,3 +207,28 @@ export const hooksTask = task({ logger.info("Hello, world from the cleanup hook", { payload }); }, }); + +export const resourceMonitorTest = task({ + id: "resource-monitor-test", + run: async (payload: { dirName?: string; processName?: string }, { ctx }) => { + logger.info("Hello, resources!", { payload }); + + const resMon = new ResourceMonitor({ + ctx, + dirName: payload.dirName ?? "/tmp", + processName: payload.processName ?? "node", + }); + + resMon.startMonitoring(1_000); + + resMon.logResourceSnapshot(); + + await wait.for({ seconds: 5 }); + + resMon.logResourceSnapshot(); + + return { + message: "Hello, resources!", + }; + }, +}); From 19ebee6f7c05c4390b872d726426d5120cf3ef2b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 22 Apr 2025 08:31:25 +0100 Subject: [PATCH 4/6] improve runtime manager debug logs --- .../src/v3/runtime/managedRuntimeManager.ts | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/core/src/v3/runtime/managedRuntimeManager.ts b/packages/core/src/v3/runtime/managedRuntimeManager.ts index 2a569a6773..d23f800d76 100644 --- a/packages/core/src/v3/runtime/managedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/managedRuntimeManager.ts @@ -27,12 +27,10 @@ export class ManagedRuntimeManager implements RuntimeManager { private ipc: ExecutorToWorkerProcessConnection, private showLogs: boolean ) { - setTimeout(() => { - this.log("Runtime status", { - resolversbyWaitId: this.resolversByWaitId.keys(), - resolversByWaitpoint: this.resolversByWaitpoint.keys(), - }); - }, 1000); + // Log out the runtime status on a long interval to help debug stuck executions + setInterval(() => { + this.log("[DEBUG] ManagedRuntimeManager status", this.status); + }, 300_000); } disable(): void { @@ -178,14 +176,14 @@ export class ManagedRuntimeManager implements RuntimeManager { } if (!waitId) { - this.log("No waitId found for waitpoint", waitpoint); + this.log("No waitId found for waitpoint", { ...this.status, ...waitpoint }); return; } const resolve = this.resolversByWaitId.get(waitId); if (!resolve) { - this.log("No resolver found for waitId", waitId); + this.log("No resolver found for waitId", { ...this.status, waitId }); return; } @@ -227,4 +225,11 @@ export class ManagedRuntimeManager implements RuntimeManager { if (!this.showLogs) return; console.log(`[${new Date().toISOString()}] ${message}`, args); } + + private get status() { + return { + resolversbyWaitId: Array.from(this.resolversByWaitId.keys()), + resolversByWaitpoint: Array.from(this.resolversByWaitpoint.keys()), + }; + } } From f8ad30e43467f636dd35681dbdde0d930096160d Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 22 Apr 2025 08:53:13 +0100 Subject: [PATCH 5/6] fix resource monitor example --- references/hello-world/src/trigger/example.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 040818f1de..ae3a65007d 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -221,11 +221,13 @@ export const resourceMonitorTest = task({ resMon.startMonitoring(1_000); - resMon.logResourceSnapshot(); + await resMon.logResourceSnapshot(); await wait.for({ seconds: 5 }); - resMon.logResourceSnapshot(); + await resMon.logResourceSnapshot(); + + resMon.stopMonitoring(); return { message: "Hello, resources!", From 1c8f8a85dd13ad9702c02e9eb1ea974d8ca74d48 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 22 Apr 2025 15:30:29 +0100 Subject: [PATCH 6/6] add changeset --- .changeset/wet-deers-think.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/wet-deers-think.md diff --git a/.changeset/wet-deers-think.md b/.changeset/wet-deers-think.md new file mode 100644 index 0000000000..9002d7b94f --- /dev/null +++ b/.changeset/wet-deers-think.md @@ -0,0 +1,6 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Fix QUEUED status snapshot handler