Skip to content

Warm start and restore improvements #1793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class ManagedSupervisor {
});

if (env.TRIGGER_CHECKPOINT_URL) {
this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", {
checkpointUrl: env.TRIGGER_CHECKPOINT_URL,
});

this.checkpointClient = new CheckpointClient({
apiUrl: new URL(env.TRIGGER_CHECKPOINT_URL),
workerClient: this.workerSession.httpClient,
Expand Down Expand Up @@ -126,8 +130,13 @@ class ManagedSupervisor {
if (message.checkpoint) {
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });

if (!this.checkpointClient) {
this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id });
return;
}

try {
const didRestore = await this.checkpointClient?.restoreRun({
const didRestore = await this.checkpointClient.restoreRun({
runFriendlyId: message.run.friendlyId,
snapshotFriendlyId: message.snapshot.friendlyId,
checkpoint: message.checkpoint,
Expand Down Expand Up @@ -214,33 +223,41 @@ class ManagedSupervisor {

const warmStartUrlWithPath = new URL("/warm-start", this.warmStartUrl);

const res = await fetch(warmStartUrlWithPath.href, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ dequeuedMessage }),
});

if (!res.ok) {
this.logger.error("[ManagedWorker] Warm start failed", {
runId: dequeuedMessage.run.id,
try {
const res = await fetch(warmStartUrlWithPath.href, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ dequeuedMessage }),
});
return false;
}

const data = await res.json();
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);
if (!res.ok) {
this.logger.error("[ManagedWorker] Warm start failed", {
runId: dequeuedMessage.run.id,
});
return false;
}

const data = await res.json();
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);

if (!parsedData.success) {
this.logger.error("[ManagedWorker] Warm start response invalid", {
if (!parsedData.success) {
this.logger.error("[ManagedWorker] Warm start response invalid", {
runId: dequeuedMessage.run.id,
data,
});
return false;
}

return parsedData.data.didWarmStart;
} catch (error) {
this.logger.error("[ManagedWorker] Warm start error", {
runId: dequeuedMessage.run.id,
data,
error,
});
return false;
}

return parsedData.data.didWarmStart;
}

async start() {
Expand Down
24 changes: 0 additions & 24 deletions apps/supervisor/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,6 @@
import { customAlphabet } from "nanoid";

export function getDockerHostDomain() {
const isMacOs = process.platform === "darwin";
const isWindows = process.platform === "win32";

return isMacOs || isWindows ? "host.docker.internal" : "localhost";
}

export class IdGenerator {
private alphabet: string;
private length: number;
private prefix: string;

constructor({ alphabet, length, prefix }: { alphabet: string; length: number; prefix: string }) {
this.alphabet = alphabet;
this.length = length;
this.prefix = prefix;
}

generate(): string {
return `${this.prefix}${customAlphabet(this.alphabet, this.length)()}`;
}
}

export const RunnerId = new IdGenerator({
alphabet: "123456789abcdefghijkmnopqrstuvwxyz",
length: 20,
prefix: "runner_",
});
3 changes: 2 additions & 1 deletion apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import {
type WorkloadManager,
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { x } from "tinyexec";
import { env } from "../env.js";
import { getDockerHostDomain, RunnerId } from "../util.js";
import { getDockerHostDomain } from "../util.js";

export class DockerWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");
Expand Down
5 changes: 2 additions & 3 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { RunnerId } from "../util.js";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
Expand Down Expand Up @@ -48,15 +48,14 @@ export class KubernetesWorkloadManager implements WorkloadManager {
app: "task-run",
"app.kubernetes.io/part-of": "trigger-worker",
"app.kubernetes.io/component": "create",
run: opts.runId,
},
},
spec: {
...this.#defaultPodSpec,
terminationGracePeriodSeconds: 60 * 60,
containers: [
{
name: runnerId,
name: "run-controller",
image: opts.image,
ports: [
{
Expand Down
21 changes: 10 additions & 11 deletions apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,30 +200,29 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
handler: async ({ reply, params, req }) => {
console.debug("Suspend request", { params, headers: req.headers });

const runnerId = this.runnerIdFromRequest(req);

if (!runnerId) {
console.error("Invalid headers for suspend request", {
...params,
headers: req.headers,
});
if (!this.checkpointClient) {
reply.json(
{
ok: false,
error: "Invalid headers",
error: "Checkpoints disabled",
} satisfies WorkloadSuspendRunResponseBody,
false,
400
);
return;
}

if (!this.checkpointClient) {
console.error("Checkpoint client unavailable - suspending impossible", { params });
const runnerId = this.runnerIdFromRequest(req);

if (!runnerId) {
console.error("Invalid headers for suspend request", {
...params,
headers: req.headers,
});
reply.json(
{
ok: false,
error: "Suspends are not enabled",
error: "Invalid headers",
} satisfies WorkloadSuspendRunResponseBody,
false,
400
Expand Down
51 changes: 1 addition & 50 deletions packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,15 @@ import {
DevDequeueRequestBody,
DevDequeueResponseBody,
PromoteDeploymentResponseBody,
ListRunResponse,
} from "@trigger.dev/core/v3";
import { zodfetch, zodfetchSSE, ApiError } from "@trigger.dev/core/v3/zodfetch";
import { ApiResult, wrapZodFetch, zodfetchSSE } from "@trigger.dev/core/v3/zodfetch";
import { logger } from "./utilities/logger.js";
import {
WorkloadDebugLogRequestBody,
WorkloadHeartbeatRequestBody,
WorkloadHeartbeatResponseBody,
WorkloadRunAttemptCompleteRequestBody,
WorkloadRunAttemptCompleteResponseBody,
WorkloadRunAttemptStartRequestBody,
WorkloadRunAttemptStartResponseBody,
WorkloadRunLatestSnapshotResponseBody,
} from "@trigger.dev/core/v3/workers";
Expand Down Expand Up @@ -644,50 +642,3 @@ export class CliApiClient {
);
}
}

type ApiResult<TSuccessResult> =
| { success: true; data: TSuccessResult }
| {
success: false;
error: string;
};

async function wrapZodFetch<T extends z.ZodTypeAny>(
schema: T,
url: string,
requestInit?: RequestInit
): Promise<ApiResult<z.infer<T>>> {
try {
const response = await zodfetch(schema, url, requestInit, {
retry: {
minTimeoutInMs: 500,
maxTimeoutInMs: 5000,
maxAttempts: 5,
factor: 2,
randomize: false,
},
});

return {
success: true,
data: response,
};
} catch (error) {
if (error instanceof ApiError) {
return {
success: false,
error: error.message,
};
} else if (error instanceof Error) {
return {
success: false,
error: error.message,
};
} else {
return {
success: false,
error: String(error),
};
}
}
}
Loading