Skip to content

Managed run controller revamp #1927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8e84846
update nypm to support text-based bun lockfiles
nicktrn Apr 10, 2025
a3c94fd
fix retry spans
nicktrn Apr 10, 2025
74e7c27
only download debug logs if admin
nicktrn Apr 10, 2025
392b96f
add nypm changeset
nicktrn Apr 10, 2025
d3d6c48
Merge remote-tracking branch 'origin/main' into fix/v4-beta.0
nicktrn Apr 11, 2025
060bbc1
pull out env override logic
nicktrn Apr 11, 2025
40b4dfb
use runner env gather helper
nicktrn Apr 11, 2025
ce2558e
handle dev flushing failures gracefully
nicktrn Apr 11, 2025
9bc9a15
fix path normalization for init.ts
nicktrn Apr 11, 2025
81b8c4b
add logger
nicktrn Apr 11, 2025
0fd10c9
add execution heartbeat service
nicktrn Apr 11, 2025
990ac46
add snapshot poller service
nicktrn Apr 11, 2025
030d4f8
fix poller
nicktrn Apr 11, 2025
21ad68c
add changesets
nicktrn Apr 11, 2025
aebe34e
Merge remote-tracking branch 'origin/main' into fix/v4-beta.0
nicktrn Apr 11, 2025
bbd7ea6
create socket in constructor
nicktrn Apr 11, 2025
1afe3a7
enable strictPropertyInitialization
nicktrn Apr 11, 2025
9cf17ce
deprecate dequeue from version
nicktrn Apr 11, 2025
6fff053
start is not async
nicktrn Apr 11, 2025
76ac4a8
dependency injection in prep for tests
nicktrn Apr 11, 2025
1f76bc7
add warm start count to all controller logs
nicktrn Apr 11, 2025
e77b14b
add restore count
nicktrn Apr 11, 2025
fa6866e
Merge remote-tracking branch 'origin/main' into fix/controller-rework
nicktrn Apr 11, 2025
71cd80e
pull out run execution logic
nicktrn Apr 14, 2025
0e2171e
Merge remote-tracking branch 'origin/main' into fix/controller-rework
nicktrn Apr 14, 2025
e03f417
temp disable pre
nicktrn Apr 14, 2025
235372d
add a controller log when starting an execution
nicktrn Apr 14, 2025
650533b
refactor execution and squash some bugs
nicktrn Apr 14, 2025
fdf14e0
cleanup completed docker containers by default
nicktrn Apr 15, 2025
1643743
execution fixes and logging improvements
nicktrn Apr 15, 2025
eea6ce3
don't throw afet abort cleanup
nicktrn Apr 15, 2025
eea2106
poller should use private interval
nicktrn Apr 15, 2025
a57a967
Merge remote-tracking branch 'origin/main' into fix/controller-rework
nicktrn Apr 15, 2025
b61b360
rename heartbeat service file
nicktrn Apr 15, 2025
a434e98
rename HeartbeatService to IntervalService
nicktrn Apr 15, 2025
1023d13
restore old heartbeat service but deprecate it
nicktrn Apr 15, 2025
419dd21
use the new interval service everywhere
nicktrn Apr 15, 2025
dce194f
Revert "temp disable pre"
nicktrn Apr 15, 2025
2d0bef9
add changeset
nicktrn Apr 15, 2025
df845e1
replace all run engine find uniques with find first
nicktrn Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tricky-houses-invite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Managed run controller performance and reliability improvements
2 changes: 1 addition & 1 deletion .configs/tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

"strict": true,
"alwaysStrict": true,
"strictPropertyInitialization": false,
"strictPropertyInitialization": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"noUnusedLocals": false,
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const Env = z.object({
RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(),
RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(),
RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv)
RUNNER_DOCKER_AUTOREMOVE: BoolEnv.default(true),

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class ManagedSupervisor {
heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS,
snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS,
additionalEnvVars: env.RUNNER_ADDITIONAL_ENV_VARS,
dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE,
} satisfies WorkloadManagerOptions;

if (this.isKubernetes) {
Expand Down
12 changes: 6 additions & 6 deletions apps/supervisor/src/services/podCleaner.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { K8sApi } from "../clients/kubernetes.js";
import { createK8sApi } from "../clients/kubernetes.js";
import { HeartbeatService } from "@trigger.dev/core/v3";
import { IntervalService } from "@trigger.dev/core/v3";
import { Counter, Gauge, Registry } from "prom-client";
import { register } from "../metrics.js";

Expand All @@ -19,7 +19,7 @@ export class PodCleaner {
private readonly namespace: string;

private readonly batchSize: number;
private readonly deletionHeartbeat: HeartbeatService;
private readonly deletionInterval: IntervalService;

// Metrics
private readonly register: Registry;
Expand All @@ -32,10 +32,10 @@ export class PodCleaner {
this.namespace = opts.namespace;
this.batchSize = opts.batchSize ?? 500;

this.deletionHeartbeat = new HeartbeatService({
this.deletionInterval = new IntervalService({
intervalMs: opts.intervalMs ?? 10000,
leadingEdge: true,
heartbeat: this.deleteCompletedPods.bind(this),
onInterval: this.deleteCompletedPods.bind(this),
});

// Initialize metrics
Expand All @@ -57,11 +57,11 @@ export class PodCleaner {
}

async start() {
this.deletionHeartbeat.start();
this.deletionInterval.start();
}

async stop() {
this.deletionHeartbeat.stop();
this.deletionInterval.stop();
}

private async deleteCompletedPods() {
Expand Down
4 changes: 4 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ export class DockerWorkloadManager implements WorkloadManager {
`--name=${runnerId}`,
];

if (this.opts.dockerAutoremove) {
runArgs.push("--rm");
}

if (this.opts.warmStartUrl) {
runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
}
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface WorkloadManagerOptions {
heartbeatIntervalSeconds?: number;
snapshotPollIntervalSeconds?: number;
additionalEnvVars?: Record<string, string>;
dockerAutoremove?: boolean;
}

export interface WorkloadManager {
Expand Down
2 changes: 1 addition & 1 deletion apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
logger.debug("runConnected", { ...getSocketMetadata() });

// If there's already a run ID set, we should "disconnect" it from this socket
if (socket.data.runFriendlyId) {
if (socket.data.runFriendlyId && socket.data.runFriendlyId !== friendlyId) {
logger.debug("runConnected: disconnecting existing run", {
...getSocketMetadata(),
newRunId: friendlyId,
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
clientWebsocketMessages,
HeartbeatService,
IntervalService,
serverWebsocketMessages,
} from "@trigger.dev/core/v3";
import { ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
Expand All @@ -19,7 +19,7 @@ export class AuthenticatedSocketConnection {
private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
private _consumer: DevQueueConsumer;
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;
private _pingService: HeartbeatService;
private _pingService: IntervalService;

constructor(
public ws: WebSocket,
Expand Down Expand Up @@ -75,8 +75,8 @@ export class AuthenticatedSocketConnection {
// });
});

this._pingService = new HeartbeatService({
heartbeat: async () => {
this._pingService = new IntervalService({
onInterval: async () => {
if (ws.readyState !== WebSocket.OPEN) {
logger.debug("[AuthenticatedSocketConnection] Websocket not open, skipping ping");
return;
Expand Down
10 changes: 4 additions & 6 deletions internal-packages/run-engine/src/engine/db/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export async function getWorkerDeploymentFromWorker(
prisma: PrismaClientOrTransaction,
workerId: string
): Promise<WorkerDeploymentWithWorkerTasks | null> {
const worker = await prisma.backgroundWorker.findUnique({
const worker = await prisma.backgroundWorker.findFirst({
where: {
id: workerId,
},
Expand Down Expand Up @@ -264,12 +264,10 @@ export async function getManagedWorkerFromCurrentlyPromotedDeployment(
prisma: PrismaClientOrTransaction,
environmentId: string
): Promise<WorkerDeploymentWithWorkerTasks | null> {
const promotion = await prisma.workerDeploymentPromotion.findUnique({
const promotion = await prisma.workerDeploymentPromotion.findFirst({
where: {
environmentId_label: {
environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
include: {
deployment: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class BatchSystem {
*/
async #tryCompleteBatch({ batchId }: { batchId: string }) {
return startSpan(this.$.tracer, "#tryCompleteBatch", async (span) => {
const batch = await this.$.prisma.batchTaskRun.findUnique({
const batch = await this.$.prisma.batchTaskRun.findFirst({
select: {
status: true,
runtimeEnvironmentId: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,10 @@ export class RunAttemptSystem {
throw new ServiceValidationError("Task run is not locked", 400);
}

const queue = await prisma.taskQueue.findUnique({
const queue = await prisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: environment.id,
name: taskRun.queue,
},
runtimeEnvironmentId: environment.id,
name: taskRun.queue,
},
});

Expand Down Expand Up @@ -1199,7 +1197,7 @@ export class RunAttemptSystem {

async #getAuthenticatedEnvironmentFromRun(runId: string, tx?: PrismaClientOrTransaction) {
const prisma = tx ?? this.$.prisma;
const taskRun = await prisma.taskRun.findUnique({
const taskRun = await prisma.taskRun.findFirst({
where: {
id: runId,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class TtlSystem {
}

//only expire "PENDING" runs
const run = await prisma.taskRun.findUnique({ where: { id: runId } });
const run = await prisma.taskRun.findFirst({ where: { id: runId } });

if (!run) {
this.$.logger.debug("Could not find enqueued run to expire", {
Expand Down
16 changes: 6 additions & 10 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,10 @@ export class WaitpointSystem {
const prisma = tx ?? this.$.prisma;

const existingWaitpoint = idempotencyKey
? await prisma.waitpoint.findUnique({
? await prisma.waitpoint.findFirst({
where: {
environmentId_idempotencyKey: {
environmentId,
idempotencyKey,
},
environmentId,
idempotencyKey,
},
})
: undefined;
Expand Down Expand Up @@ -241,12 +239,10 @@ export class WaitpointSystem {
tags?: string[];
}): Promise<{ waitpoint: Waitpoint; isCached: boolean }> {
const existingWaitpoint = idempotencyKey
? await this.$.prisma.waitpoint.findUnique({
? await this.$.prisma.waitpoint.findFirst({
where: {
environmentId_idempotencyKey: {
environmentId,
idempotencyKey,
},
environmentId,
idempotencyKey,
},
})
: undefined;
Expand Down
22 changes: 10 additions & 12 deletions packages/cli-v3/e2e/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TaskRunProcess } from "../src/executions/taskRunProcess.js";
import { createTestHttpServer } from "@epic-web/test-server/http";
import { TestCase, TestCaseRun } from "./fixtures.js";
import { access } from "node:fs/promises";
import { MachinePreset } from "@trigger.dev/core/v3";

export type PackageManager = "npm" | "pnpm" | "yarn";

Expand Down Expand Up @@ -295,6 +296,13 @@ export async function executeTestCaseRun({
},
});

const machine = {
name: "small-1x",
cpu: 1,
memory: 256,
centsPerMs: 0.0000001,
} satisfies MachinePreset;

try {
const taskRunProcess = new TaskRunProcess({
workerManifest: workerManifest!,
Expand All @@ -314,12 +322,7 @@ export async function executeTestCaseRun({
version: "1.0.0",
contentHash,
},
machine: {
name: "small-1x",
cpu: 1,
memory: 256,
centsPerMs: 0.0000001,
},
machineResources: machine,
}).initialize();

const result = await taskRunProcess.execute({
Expand Down Expand Up @@ -372,12 +375,7 @@ export async function executeTestCaseRun({
ref: "main",
name: "test",
},
machine: {
name: "small-1x",
cpu: 1,
memory: 256,
centsPerMs: 0.0000001,
},
machine,
},
},
messageId: "run_1234",
Expand Down
34 changes: 22 additions & 12 deletions packages/cli-v3/src/dev/devSupervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ export async function startWorkerRuntime(options: WorkerRuntimeOptions): Promise
* - Receiving snapshot update pings (via socket)
*/
class DevSupervisor implements WorkerRuntime {
private config: DevConfigResponseBody;
private config?: DevConfigResponseBody;
private disconnectPresence: (() => void) | undefined;
private lastManifest?: BuildManifest;
private latestWorkerId?: string;

/** Receive notifications when runs change state */
private socket: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
private socket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
private socketIsReconnecting = false;

/** Workers are versions of the code */
Expand Down Expand Up @@ -93,7 +93,7 @@ class DevSupervisor implements WorkerRuntime {

this.runLimiter = pLimit(maxConcurrentRuns);

this.#createSocket();
this.socket = this.#createSocket();

//start an SSE connection for presence
this.disconnectPresence = await this.#startPresenceConnection();
Expand All @@ -105,7 +105,7 @@ class DevSupervisor implements WorkerRuntime {
async shutdown(): Promise<void> {
this.disconnectPresence?.();
try {
this.socket.close();
this.socket?.close();
} catch (error) {
logger.debug("[DevSupervisor] shutdown, socket failed to close", { error });
}
Expand Down Expand Up @@ -187,6 +187,10 @@ class DevSupervisor implements WorkerRuntime {
* For the latest version we will pull from the main queue, so we don't specify that.
*/
async #dequeueRuns() {
if (!this.config) {
throw new Error("No config, can't dequeue runs");
}

if (!this.latestWorkerId) {
//try again later
logger.debug(`[DevSupervisor] dequeueRuns. No latest worker ID, trying again later`);
Expand Down Expand Up @@ -409,13 +413,14 @@ class DevSupervisor implements WorkerRuntime {
const wsUrl = new URL(this.options.client.apiURL);
wsUrl.pathname = "/dev-worker";

this.socket = io(wsUrl.href, {
const socket = io(wsUrl.href, {
transports: ["websocket"],
extraHeaders: {
Authorization: `Bearer ${this.options.client.accessToken}`,
},
});
this.socket.on("run:notify", async ({ version, run }) => {

socket.on("run:notify", async ({ version, run }) => {
logger.debug("[DevSupervisor] Received run notification", { version, run });

this.options.client.dev.sendDebugLog(run.friendlyId, {
Expand All @@ -434,10 +439,11 @@ class DevSupervisor implements WorkerRuntime {

await controller.getLatestSnapshot();
});
this.socket.on("connect", () => {

socket.on("connect", () => {
logger.debug("[DevSupervisor] Connected to supervisor");

if (this.socket.recovered || this.socketIsReconnecting) {
if (socket.recovered || this.socketIsReconnecting) {
logger.debug("[DevSupervisor] Socket recovered");
eventBus.emit("socketConnectionReconnected", `Connection was recovered`);
}
Expand All @@ -448,19 +454,21 @@ class DevSupervisor implements WorkerRuntime {
controller.resubscribeToRunNotifications();
}
});
this.socket.on("connect_error", (error) => {

socket.on("connect_error", (error) => {
logger.debug("[DevSupervisor] Connection error", { error });
});
this.socket.on("disconnect", (reason, description) => {

socket.on("disconnect", (reason, description) => {
logger.debug("[DevSupervisor] socket was disconnected", {
reason,
description,
active: this.socket.active,
active: socket.active,
});

if (reason === "io server disconnect") {
// the disconnection was initiated by the server, you need to manually reconnect
this.socket.connect();
socket.connect();
} else {
this.socketIsReconnecting = true;
eventBus.emit("socketConnectionDisconnected", reason);
Expand All @@ -472,6 +480,8 @@ class DevSupervisor implements WorkerRuntime {
connections: Array.from(this.socketConnections),
});
}, 5000);

return socket;
}

#subscribeToRunNotifications() {
Expand Down
Loading