Skip to content

Fixes for v4 waits and restores #1868

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"build": "tsc",
"dev": "tsx --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
"start": "node dist/index.js",
"test:watch": "vitest",
"test:run": "vitest --no-file-parallelism --run",
"test:watch": "vitest --no-file-parallelism",
"typecheck": "tsc --noEmit"
},
"dependencies": {
Expand All @@ -24,6 +25,7 @@
},
"devDependencies": {
"@types/dockerode": "^3.3.33",
"docker-api-ts": "^0.2.2"
"docker-api-ts": "^0.2.2",
"vitest": "^1.4.0"
}
}
1 change: 1 addition & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const Env = z.object({
// Optional services
TRIGGER_WARM_START_URL: z.string().optional(),
TRIGGER_CHECKPOINT_URL: z.string().optional(),
TRIGGER_METADATA_URL: z.string().optional(),

// Used by the workload manager, e.g docker/k8s
DOCKER_NETWORK: z.string().default("host"),
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ManagedSupervisor {
workloadApiDomain: env.TRIGGER_WORKLOAD_API_DOMAIN,
workloadApiPort: env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL,
warmStartUrl: this.warmStartUrl,
metadataUrl: env.TRIGGER_METADATA_URL,
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS,
snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS,
Expand Down
107 changes: 105 additions & 2 deletions apps/supervisor/src/services/failedPodHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,102 @@ describe("FailedPodHandler Integration Tests", () => {
await handler.stop();
}
}, 60000);

it("should handle graceful shutdown pods differently", async () => {
const handler = new FailedPodHandler({ namespace, k8s, register });

try {
// Create first batch of pods before starting handler
const firstBatchPodNames = await createTestPods({
k8sApi: k8s,
namespace,
count: 2,
exitCode: FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE,
});

// Wait for pods to reach Failed state
await waitForPodsPhase({
k8sApi: k8s,
namespace,
podNames: firstBatchPodNames,
phase: "Failed",
});

// Start the handler
await handler.start();

// Wait for first batch to be deleted
await waitForPodsDeletion({
k8sApi: k8s,
namespace,
podNames: firstBatchPodNames,
});

// Create second batch of pods after handler is running
const secondBatchPodNames = await createTestPods({
k8sApi: k8s,
namespace,
count: 3,
exitCode: FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE,
});

// Wait for second batch to be deleted
await waitForPodsDeletion({
k8sApi: k8s,
namespace,
podNames: secondBatchPodNames,
});

// Verify metrics
const metrics = handler.getMetrics();

// Check informer events were recorded for both batches
const informerEvents = await metrics.informerEventsTotal.get();
expect(informerEvents.values).toContainEqual(
expect.objectContaining({
labels: expect.objectContaining({
namespace,
verb: "add",
}),
value: 5, // 2 from first batch + 3 from second batch
})
);

// Check pods were processed as graceful shutdowns
const processedPods = await metrics.processedPodsTotal.get();

// Should not be marked as Failed
const failedPods = processedPods.values.find(
(v) => v.labels.namespace === namespace && v.labels.status === "Failed"
);
expect(failedPods).toBeUndefined();

// Should be marked as GracefulShutdown
const gracefulShutdowns = processedPods.values.find(
(v) => v.labels.namespace === namespace && v.labels.status === "GracefulShutdown"
);
expect(gracefulShutdowns).toBeDefined();
expect(gracefulShutdowns?.value).toBe(5); // Total from both batches

// Check pods were still deleted
const deletedPods = await metrics.deletedPodsTotal.get();
expect(deletedPods.values).toContainEqual(
expect.objectContaining({
labels: expect.objectContaining({
namespace,
status: "Failed",
}),
value: 5, // Total from both batches
})
);

// Check no deletion errors were recorded
const deletionErrors = await metrics.deletionErrorsTotal.get();
expect(deletionErrors.values).toHaveLength(0);
} finally {
await handler.stop();
}
}, 30000);
});

async function createTestPods({
Expand All @@ -325,6 +421,7 @@ async function createTestPods({
namePrefix = "test-pod",
command = ["/bin/sh", "-c", shouldFail ? "exit 1" : "exit 0"],
randomizeName = true,
exitCode,
}: {
k8sApi: K8sApi;
namespace: string;
Expand All @@ -334,9 +431,15 @@ async function createTestPods({
namePrefix?: string;
command?: string[];
randomizeName?: boolean;
exitCode?: number;
}) {
const createdPods: string[] = [];

// If exitCode is specified, override the command
if (exitCode !== undefined) {
command = ["/bin/sh", "-c", `exit ${exitCode}`];
}

for (let i = 0; i < count; i++) {
const podName = randomizeName
? `${namePrefix}-${i}-${Math.random().toString(36).substring(2, 15)}`
Expand All @@ -352,7 +455,7 @@ async function createTestPods({
restartPolicy: "Never",
containers: [
{
name: "test",
name: "run-controller", // Changed to match the name we check in failedPodHandler
image: "busybox:1.37.0",
command,
},
Expand Down Expand Up @@ -470,7 +573,7 @@ async function deleteAllPodsInNamespace({
const podNames = pods.items.map((p) => p.metadata?.name ?? "");

// Delete all pods
await k8sApi.core.deleteCollectionNamespacedPod({ namespace });
await k8sApi.core.deleteCollectionNamespacedPod({ namespace, gracePeriodSeconds: 0 });

// Wait for all pods to be deleted
await waitForPodsDeletion({ k8sApi, namespace, podNames });
Expand Down
19 changes: 18 additions & 1 deletion apps/supervisor/src/services/failedPodHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Counter, Registry, Histogram } from "prom-client";
import { register } from "../metrics.js";
import { setTimeout } from "timers/promises";

type PodStatus = "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown";
type PodStatus = "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown" | "GracefulShutdown";

export type FailedPodHandlerOptions = {
namespace: string;
Expand Down Expand Up @@ -34,6 +34,8 @@ export class FailedPodHandler {
private readonly processingDurationSeconds: Histogram<string>;
private readonly informerEventsTotal: Counter;

static readonly GRACEFUL_SHUTDOWN_EXIT_CODE = 200;

constructor(opts: FailedPodHandlerOptions) {
this.id = Math.random().toString(36).substring(2, 15);
this.logger = new SimpleStructuredLogger("failed-pod-handler", LogLevel.debug, {
Expand Down Expand Up @@ -206,6 +208,21 @@ export class FailedPodHandler {

private async processFailedPod(pod: V1Pod) {
this.logger.info("pod-failed: processing pod", this.podSummary(pod));

const mainContainer = pod.status?.containerStatuses?.find((c) => c.name === "run-controller");

// If it's our special "graceful shutdown" exit code, don't process it further, just delete it
if (
mainContainer?.state?.terminated?.exitCode === FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE
) {
this.logger.debug("pod-failed: graceful shutdown detected", this.podSummary(pod));
this.processedPodsTotal.inc({
namespace: this.namespace,
status: "GracefulShutdown",
});
return;
}

this.processedPodsTotal.inc({
namespace: this.namespace,
status: this.podStatus(pod),
Expand Down
2 changes: 1 addition & 1 deletion apps/supervisor/src/services/podCleaner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe("PodCleaner Integration Tests", () => {
register.clear();

// Delete all pods in the namespace
await k8s.core.deleteCollectionNamespacedPod({ namespace });
await k8s.core.deleteCollectionNamespacedPod({ namespace, gracePeriodSeconds: 0 });
});

it("should clean up succeeded pods", async () => {
Expand Down
4 changes: 4 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ export class DockerWorkloadManager implements WorkloadManager {
runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
}

if (this.opts.metadataUrl) {
runArgs.push(`--env=TRIGGER_METADATA_URL=${this.opts.metadataUrl}`);
}

if (this.opts.heartbeatIntervalSeconds) {
runArgs.push(
`--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`
Expand Down
3 changes: 3 additions & 0 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ export class KubernetesWorkloadManager implements WorkloadManager {
...(this.opts.warmStartUrl
? [{ name: "TRIGGER_WARM_START_URL", value: this.opts.warmStartUrl }]
: []),
...(this.opts.metadataUrl
? [{ name: "TRIGGER_METADATA_URL", value: this.opts.metadataUrl }]
: []),
...(this.opts.heartbeatIntervalSeconds
? [
{
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface WorkloadManagerOptions {
workloadApiDomain?: string; // If unset, will use orchestrator-specific default
workloadApiPort: number;
warmStartUrl?: string;
metadataUrl?: string;
imagePullSecrets?: string[];
heartbeatIntervalSeconds?: number;
snapshotPollIntervalSeconds?: number;
Expand Down
37 changes: 21 additions & 16 deletions packages/cli-v3/src/entryPoints/managed-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const Env = z.object({
TRIGGER_WORKER_INSTANCE_NAME: z.string(),
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5),
TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0),
TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1),
});

const env = Env.parse(stdEnv);
Expand All @@ -82,6 +84,8 @@ type Metadata = {
TRIGGER_WORKER_INSTANCE_NAME: string | undefined;
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined;
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined;
TRIGGER_SUCCESS_EXIT_CODE: number | undefined;
TRIGGER_FAILURE_EXIT_CODE: number | undefined;
};

class MetadataClient {
Expand Down Expand Up @@ -122,6 +126,9 @@ class ManagedRunController {
private workerApiUrl: string;
private workerInstanceName: string;

private successExitCode = env.TRIGGER_SUCCESS_EXIT_CODE;
private failureExitCode = env.TRIGGER_FAILURE_EXIT_CODE;

private state:
| {
phase: "RUN";
Expand Down Expand Up @@ -220,11 +227,7 @@ class ManagedRunController {

const response = await this.httpClient.heartbeatRun(
this.runFriendlyId,
this.snapshotFriendlyId,
{
cpu: 0,
memory: 0,
}
this.snapshotFriendlyId
);

if (!response.success) {
Expand Down Expand Up @@ -669,6 +672,14 @@ class ManagedRunController {

logger.log("Processing env overrides", { env: overrides });

if (overrides.TRIGGER_SUCCESS_EXIT_CODE) {
this.successExitCode = overrides.TRIGGER_SUCCESS_EXIT_CODE;
}

if (overrides.TRIGGER_FAILURE_EXIT_CODE) {
this.failureExitCode = overrides.TRIGGER_FAILURE_EXIT_CODE;
}

if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
this.heartbeatIntervalSeconds = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS;
this.runHeartbeat.updateInterval(this.heartbeatIntervalSeconds * 1000);
Expand Down Expand Up @@ -821,7 +832,7 @@ class ManagedRunController {

if (!this.warmStartClient) {
console.error("waitForNextRun: warm starts disabled, shutting down");
this.exitProcess(0);
this.exitProcess(this.successExitCode);
}

// Check the service is up and get additional warm start config
Expand All @@ -832,7 +843,7 @@ class ManagedRunController {
warmStartUrl: env.TRIGGER_WARM_START_URL,
error: connect.error,
});
this.exitProcess(0);
this.exitProcess(this.successExitCode);
}

const connectionTimeoutMs =
Expand Down Expand Up @@ -860,7 +871,7 @@ class ManagedRunController {
connectionTimeoutMs,
keepaliveMs,
});
this.exitProcess(0);
this.exitProcess(this.successExitCode);
}

const nextRun = await this.warmStartClient.warmStart({
Expand All @@ -871,7 +882,7 @@ class ManagedRunController {

if (!nextRun) {
console.error("waitForNextRun: warm start failed, shutting down");
this.exitProcess(0);
this.exitProcess(this.successExitCode);
}

console.log("waitForNextRun: got next run", { nextRun });
Expand All @@ -884,7 +895,7 @@ class ManagedRunController {
return;
} catch (error) {
console.error("waitForNextRun: unexpected error", { error });
this.exitProcess(1);
this.exitProcess(this.failureExitCode);
} finally {
this.waitForNextRunLock = false;
}
Expand Down Expand Up @@ -1112,12 +1123,6 @@ class ManagedRunController {
async start() {
logger.debug("[ManagedRunController] Starting up");

// TODO: remove this after testing
setTimeout(() => {
console.error("[ManagedRunController] Exiting after 5 minutes");
this.exitProcess(1);
}, 60 * 5000);

// Websocket notifications are only an optimisation so we don't need to wait for a successful connection
this.createSocket();

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/v3/runEngineWorker/supervisor/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ export const WorkerApiDequeueResponseBody = DequeuedMessage.array();
export type WorkerApiDequeueResponseBody = z.infer<typeof WorkerApiDequeueResponseBody>;

export const WorkerApiRunHeartbeatRequestBody = z.object({
cpu: z.number(),
memory: z.number(),
cpu: z.number().optional(),
memory: z.number().optional(),
});
export type WorkerApiRunHeartbeatRequestBody = z.infer<typeof WorkerApiRunHeartbeatRequestBody>;

Expand Down
Loading