Skip to content

Fix run container exits after OOM retries #1701

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 58 additions & 28 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Attributes } from "@opentelemetry/api";
import {
MachinePresetName,
TaskRunContext,
TaskRunError,
TaskRunErrorCodes,
Expand All @@ -8,9 +9,7 @@ import {
TaskRunExecutionRetry,
TaskRunFailedExecutionResult,
TaskRunSuccessfulExecutionResult,
exceptionEventEnhancer,
flattenAttributes,
internalErrorFromUnexpectedExit,
isManualOutOfMemoryError,
sanitizeError,
shouldRetryError,
Expand All @@ -32,8 +31,8 @@ import { CancelAttemptService } from "./cancelAttempt.server";
import { CreateCheckpointService } from "./createCheckpoint.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { RetryAttemptService } from "./retryAttempt.server";
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
import { socketIo } from "../handleSocketIo.server";

type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;

Expand Down Expand Up @@ -256,9 +255,12 @@ export class CompleteAttemptService extends BaseService {

let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
let isOOMRetry = false;
let isOOMAttempt = isOOMError(completion.error);
let isOnMaxOOMMachine = false;
let oomMachine: MachinePresetName | undefined;

//OOM errors should retry (if an OOM machine is specified)
if (isOOMError(completion.error)) {
//OOM errors should retry (if an OOM machine is specified, and we're not already on it)
if (isOOMAttempt) {
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({
run: {
...taskRunAttempt.taskRun,
Expand All @@ -268,10 +270,10 @@ export class CompleteAttemptService extends BaseService {
execution,
});

if (
retryConfig?.outOfMemory?.machine &&
retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset
) {
oomMachine = retryConfig?.outOfMemory?.machine;
isOnMaxOOMMachine = oomMachine === taskRunAttempt.taskRun.machinePreset;

if (oomMachine && !isOnMaxOOMMachine) {
//we will retry
isOOMRetry = true;
retriableError = true;
Expand All @@ -290,7 +292,7 @@ export class CompleteAttemptService extends BaseService {
id: taskRunAttempt.taskRunId,
},
data: {
machinePreset: retryConfig.outOfMemory.machine,
machinePreset: oomMachine,
},
});
}
Expand All @@ -309,11 +311,17 @@ export class CompleteAttemptService extends BaseService {
environment,
checkpoint,
forceRequeue: isOOMRetry,
oomMachine,
});
}

// The attempt has failed and we won't retry

if (isOOMAttempt && isOnMaxOOMMachine && environment.type !== "DEVELOPMENT") {
// The attempt failed due to an OOM error but we're already on the machine we should retry on
exitRun(taskRunAttempt.taskRunId);
}

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
Expand Down Expand Up @@ -507,6 +515,11 @@ export class CompleteAttemptService extends BaseService {

if (forceRequeue) {
logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id });

// The run won't know it should shut down as we make the decision to force requeue here
// This also ensures that this change is backwards compatible with older workers
exitRun(run.id);

await retryViaQueue();
return;
}
Expand Down Expand Up @@ -544,6 +557,7 @@ export class CompleteAttemptService extends BaseService {
environment,
checkpoint,
forceRequeue = false,
oomMachine,
}: {
execution: TaskRunExecution;
executionRetry: TaskRunExecutionRetry;
Expand All @@ -552,29 +566,38 @@ export class CompleteAttemptService extends BaseService {
environment: AuthenticatedEnvironment;
checkpoint?: CheckpointData;
forceRequeue?: boolean;
/** Setting this will also alter the retry span message */
oomMachine?: MachinePresetName;
}) {
const retryAt = new Date(executionRetry.timestamp);

// Retry the task run
await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, {
taskSlug: taskRunAttempt.taskRun.taskIdentifier,
environment,
attributes: {
metadata: this.#generateMetadataAttributesForNextAttempt(execution),
properties: {
retryAt: retryAt.toISOString(),
},
runId: taskRunAttempt.taskRun.friendlyId,
style: {
icon: "schedule-attempt",
await eventRepository.recordEvent(
`Retry #${execution.attempt.number} delay${oomMachine ? " after OOM" : ""}`,
{
taskSlug: taskRunAttempt.taskRun.taskIdentifier,
environment,
attributes: {
metadata: this.#generateMetadataAttributesForNextAttempt(execution),
properties: {
retryAt: retryAt.toISOString(),
previousMachine: oomMachine
? taskRunAttempt.taskRun.machinePreset ?? undefined
: undefined,
nextMachine: oomMachine,
},
runId: taskRunAttempt.taskRun.friendlyId,
style: {
icon: "schedule-attempt",
},
queueId: taskRunAttempt.queueId,
queueName: taskRunAttempt.taskRun.queue,
},
queueId: taskRunAttempt.queueId,
queueName: taskRunAttempt.taskRun.queue,
},
context: taskRunAttempt.taskRun.traceContext as Record<string, string | undefined>,
spanIdSeed: `retry-${taskRunAttempt.number + 1}`,
endTime: retryAt,
});
context: taskRunAttempt.taskRun.traceContext as Record<string, string | undefined>,
spanIdSeed: `retry-${taskRunAttempt.number + 1}`,
endTime: retryAt,
}
);

logger.debug("[CompleteAttemptService] Retrying", {
taskRun: taskRunAttempt.taskRun.friendlyId,
Expand Down Expand Up @@ -753,3 +776,10 @@ function isOOMError(error: TaskRunError) {

return false;
}

function exitRun(runId: string) {
socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", {
version: "v1",
runId,
});
}
Loading