Skip to content

Add v4 timeline metrics to prod runs #1884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 4, 2025
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class ManagedSupervisor {

try {
await this.workloadManager.create({
dequeuedAt: message.dequeuedAt,
envId: message.environment.id,
envType: message.environment.type,
image: message.image,
Expand Down
2 changes: 2 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class DockerWorkloadManager implements WorkloadManager {
"run",
"--detach",
`--network=${env.DOCKER_NETWORK}`,
`--env=TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`,
`--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`,
`--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`,
Expand Down
14 changes: 13 additions & 1 deletion apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ export class KubernetesWorkloadManager implements WorkloadManager {
],
resources: this.#getResourcesForMachine(opts.machine),
env: [
{
name: "TRIGGER_DEQUEUED_AT_MS",
value: opts.dequeuedAt.getTime().toString(),
},
{
name: "TRIGGER_POD_SCHEDULED_AT_MS",
value: Date.now().toString(),
},
{
name: "TRIGGER_RUN_ID",
value: opts.runFriendlyId,
Expand Down Expand Up @@ -97,7 +105,11 @@ export class KubernetesWorkloadManager implements WorkloadManager {
},
{
name: "TRIGGER_WORKER_INSTANCE_NAME",
value: env.TRIGGER_WORKER_INSTANCE_NAME,
valueFrom: {
fieldRef: {
fieldPath: "spec.nodeName",
},
},
},
{
name: "OTEL_EXPORTER_OTLP_ENDPOINT",
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface WorkloadManagerCreateOptions {
machine: MachinePreset;
version: string;
nextAttemptNumber?: number;
dequeuedAt: Date;
// identifiers
envId: string;
envType: EnvironmentType;
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/utils/timelineSpanEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ function getAdminOnlyForEvent(event: string): boolean {
return true;
}
case "import": {
return true;
return false;
}
case "lazy_payload": {
return true;
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/test/timelineSpanEvents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false);

// Only dequeue and fork events should be visible for non-admins
expect(result.length).toBe(2);
expect(result.length).toBe(3);
expect(result.some((event) => event.name === "Dequeued")).toBe(true);
expect(result.some((event) => event.name === "Launched")).toBe(true);
expect(result.some((event) => event.name === "Attempt created")).toBe(false);
expect(result.some((event) => event.name.includes("Importing"))).toBe(false);
expect(result.some((event) => event.name.includes("Importing"))).toBe(true);
});

test("should include all events when isAdmin is true", () => {
Expand Down Expand Up @@ -220,7 +220,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result.some((event) => event.name === "Attempt created")).toBe(false);
});

test("should filter import events for non-admin when fork event exists", () => {
test.skip("should filter import events for non-admin when fork event exists", () => {
const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false);

// With fork event, import should be hidden for non-admins
Expand Down
15 changes: 6 additions & 9 deletions packages/cli-v3/src/entryPoints/dev-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,15 @@ export class DevRunController {
try {
await this.cancelAttempt();
} catch (error) {
logger.debug("Failed to cancel attempt, shutting down", {
logger.debug("Failed to cancel attempt, killing task run process", {
error,
});

//todo kill the process?
try {
await this.taskRunProcess?.kill("SIGKILL");
} catch (error) {
logger.debug("Failed to cancel attempt, failed to kill task run process", { error });
}

return;
}
Expand Down Expand Up @@ -512,8 +516,6 @@ export class DevRunController {
snapshot: snapshot.friendlyId,
});

// TODO: We may already be executing this run, this may be a new attempt
// This is the only case where incrementing the attempt number is allowed
this.enterRunPhase(run, snapshot);

const metrics = [
Expand All @@ -539,9 +541,6 @@ export class DevRunController {
try {
return await this.executeRun({ run, snapshot, execution, envVars, metrics });
} catch (error) {
// TODO: Handle the case where we're in the warm start phase or executing a new run
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch

logger.debug("Error while executing attempt", {
error,
});
Expand Down Expand Up @@ -570,8 +569,6 @@ export class DevRunController {
error: completionResult.error,
});

// TODO: Maybe we should keep retrying for a while longer

this.runFinished();
return;
}
Expand Down
107 changes: 99 additions & 8 deletions packages/cli-v3/src/entryPoints/managed-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type CompleteRunAttemptResult,
HeartbeatService,
type RunExecutionData,
type TaskRunExecutionMetrics,
type TaskRunExecutionResult,
type TaskRunFailedExecutionResult,
WorkerManifest,
Expand All @@ -25,6 +26,11 @@ import { assertExhaustive } from "../utilities/assertExhaustive.js";
import { setTimeout as sleep } from "timers/promises";
import { io, type Socket } from "socket.io-client";

const DateEnv = z
.string()
.transform((val) => new Date(parseInt(val, 10)))
.pipe(z.date());

// All IDs are friendly IDs
const Env = z.object({
// Set at build time
Expand All @@ -50,6 +56,10 @@ const Env = z.object({
TRIGGER_RUNNER_ID: z.string(),
TRIGGER_METADATA_URL: z.string().optional(),

// Timeline metrics
TRIGGER_POD_SCHEDULED_AT_MS: DateEnv,
TRIGGER_DEQUEUED_AT_MS: DateEnv,

// May be overridden
TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]),
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
Expand Down Expand Up @@ -238,6 +248,14 @@ class ManagedRunController {

if (!response.success) {
console.error("[ManagedRunController] Heartbeat failed", { error: response.error });

this.sendDebugLog({
runId: this.runFriendlyId,
message: "heartbeat: failed",
properties: {
error: response.error,
},
});
}
},
intervalMs: this.heartbeatIntervalSeconds * 1000,
Expand Down Expand Up @@ -620,6 +638,14 @@ class ManagedRunController {
if (!continuationResult.success) {
console.error("Failed to continue execution", { error: continuationResult.error });

this.sendDebugLog({
runId: run.friendlyId,
message: "failed to continue execution",
properties: {
error: continuationResult.error,
},
});

this.waitForNextRun();
return;
}
Expand Down Expand Up @@ -734,10 +760,14 @@ class ManagedRunController {
private async startAndExecuteRunAttempt({
runFriendlyId,
snapshotFriendlyId,
dequeuedAt,
podScheduledAt,
isWarmStart = false,
}: {
runFriendlyId: string;
snapshotFriendlyId: string;
dequeuedAt?: Date;
podScheduledAt?: Date;
isWarmStart?: boolean;
}) {
if (!this.socket) {
Expand All @@ -749,39 +779,79 @@ class ManagedRunController {
snapshot: { friendlyId: snapshotFriendlyId },
});

const attemptStartedAt = Date.now();

const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, {
isWarmStart,
});

if (!start.success) {
console.error("[ManagedRunController] Failed to start run", { error: start.error });

this.sendDebugLog({
runId: runFriendlyId,
message: "failed to start run attempt",
properties: {
error: start.error,
},
});

this.waitForNextRun();
return;
}

const attemptDuration = Date.now() - attemptStartedAt;

const { run, snapshot, execution, envVars } = start.data;

logger.debug("[ManagedRunController] Started run", {
runId: run.friendlyId,
snapshot: snapshot.friendlyId,
});

// TODO: We may already be executing this run, this may be a new attempt
// This is the only case where incrementing the attempt number is allowed
this.enterRunPhase(run, snapshot);

const metrics = [
{
name: "start",
event: "create_attempt",
timestamp: attemptStartedAt,
duration: attemptDuration,
},
]
.concat(
dequeuedAt
? [
{
name: "start",
event: "dequeue",
timestamp: dequeuedAt.getTime(),
duration: 0,
},
]
: []
)
.concat(
podScheduledAt
? [
{
name: "start",
event: "pod_scheduled",
timestamp: podScheduledAt.getTime(),
duration: 0,
},
]
: []
) satisfies TaskRunExecutionMetrics;

const taskRunEnv = {
...gatherProcessEnv(),
...envVars,
};

try {
return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution });
return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution, metrics });
} catch (error) {
// TODO: Handle the case where we're in the warm start phase or executing a new run
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch

console.error("Error while executing attempt", {
error,
});
Expand Down Expand Up @@ -810,7 +880,13 @@ class ManagedRunController {
error: completionResult.error,
});

// TODO: Maybe we should keep retrying for a while longer
this.sendDebugLog({
runId: run.friendlyId,
message: "completion: failed to submit after error",
properties: {
error: completionResult.error,
},
});

this.waitForNextRun();
return;
Expand Down Expand Up @@ -923,6 +999,7 @@ class ManagedRunController {
this.startAndExecuteRunAttempt({
runFriendlyId: nextRun.run.friendlyId,
snapshotFriendlyId: nextRun.snapshot.friendlyId,
dequeuedAt: nextRun.dequeuedAt,
isWarmStart: true,
}).finally(() => {});
return;
Expand Down Expand Up @@ -1032,7 +1109,10 @@ class ManagedRunController {
snapshot,
envVars,
execution,
}: WorkloadRunAttemptStartResponseBody) {
metrics,
}: WorkloadRunAttemptStartResponseBody & {
metrics?: TaskRunExecutionMetrics;
}) {
this.snapshotPoller.start();

if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
Expand All @@ -1058,6 +1138,7 @@ class ManagedRunController {
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
metrics,
},
messageId: run.friendlyId,
env: envVars,
Expand Down Expand Up @@ -1096,6 +1177,14 @@ class ManagedRunController {
error: completionResult.error,
});

this.sendDebugLog({
runId: run.friendlyId,
message: "completion: failed to submit",
properties: {
error: completionResult.error,
},
});

this.waitForNextRun();
return;
}
Expand Down Expand Up @@ -1212,6 +1301,8 @@ class ManagedRunController {
this.startAndExecuteRunAttempt({
runFriendlyId: env.TRIGGER_RUN_ID,
snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID,
dequeuedAt: env.TRIGGER_DEQUEUED_AT_MS,
podScheduledAt: env.TRIGGER_POD_SCHEDULED_AT_MS,
}).finally(() => {});
return;
}
Expand Down