diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 13d120aba6..cc4472f1e7 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,5 +1,6 @@ import { Attributes } from "@opentelemetry/api"; import { + MachinePresetName, TaskRunContext, TaskRunError, TaskRunErrorCodes, @@ -8,9 +9,7 @@ import { TaskRunExecutionRetry, TaskRunFailedExecutionResult, TaskRunSuccessfulExecutionResult, - exceptionEventEnhancer, flattenAttributes, - internalErrorFromUnexpectedExit, isManualOutOfMemoryError, sanitizeError, shouldRetryError, @@ -32,8 +31,8 @@ import { CancelAttemptService } from "./cancelAttempt.server"; import { CreateCheckpointService } from "./createCheckpoint.server"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { RetryAttemptService } from "./retryAttempt.server"; -import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; import { getTaskEventStoreTableForRun } from "../taskEventStore.server"; +import { socketIo } from "../handleSocketIo.server"; type FoundAttempt = Awaited>; @@ -256,9 +255,12 @@ export class CompleteAttemptService extends BaseService { let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); let isOOMRetry = false; + let isOOMAttempt = isOOMError(completion.error); + let isOnMaxOOMMachine = false; + let oomMachine: MachinePresetName | undefined; - //OOM errors should retry (if an OOM machine is specified) - if (isOOMError(completion.error)) { + //OOM errors should retry (if an OOM machine is specified, and we're not already on it) + if (isOOMAttempt) { const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run: { ...taskRunAttempt.taskRun, @@ -268,10 +270,10 @@ export class CompleteAttemptService extends BaseService { execution, }); - if ( - retryConfig?.outOfMemory?.machine && - retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset - ) { + oomMachine = retryConfig?.outOfMemory?.machine; + isOnMaxOOMMachine = oomMachine === taskRunAttempt.taskRun.machinePreset; + + if (oomMachine && !isOnMaxOOMMachine) { //we will retry isOOMRetry = true; retriableError = true; @@ -290,7 +292,7 @@ export class CompleteAttemptService extends BaseService { id: taskRunAttempt.taskRunId, }, data: { - machinePreset: retryConfig.outOfMemory.machine, + machinePreset: oomMachine, }, }); } @@ -309,11 +311,17 @@ export class CompleteAttemptService extends BaseService { environment, checkpoint, forceRequeue: isOOMRetry, + oomMachine, }); } // The attempt has failed and we won't retry + if (isOOMAttempt && isOnMaxOOMMachine && environment.type !== "DEVELOPMENT") { + // The attempt failed due to an OOM error but we're already on the machine we should retry on + exitRun(taskRunAttempt.taskRunId); + } + // Now we need to "complete" the task run event/span await eventRepository.completeEvent( getTaskEventStoreTableForRun(taskRunAttempt.taskRun), @@ -507,6 +515,11 @@ export class CompleteAttemptService extends BaseService { if (forceRequeue) { logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id }); + + // The run won't know it should shut down as we make the decision to force requeue here + // This also ensures that this change is backwards compatible with older workers + exitRun(run.id); + await retryViaQueue(); return; } @@ -544,6 +557,7 @@ export class CompleteAttemptService extends BaseService { environment, checkpoint, forceRequeue = false, + oomMachine, }: { execution: TaskRunExecution; executionRetry: TaskRunExecutionRetry; @@ -552,29 +566,38 @@ export class CompleteAttemptService extends BaseService { environment: AuthenticatedEnvironment; checkpoint?: CheckpointData; forceRequeue?: boolean; + /** Setting this will also alter the retry span message */ + oomMachine?: MachinePresetName; }) { const retryAt = new Date(executionRetry.timestamp); // Retry the task run - await eventRepository.recordEvent(`Retry #${execution.attempt.number} delay`, { - taskSlug: taskRunAttempt.taskRun.taskIdentifier, - environment, - attributes: { - metadata: this.#generateMetadataAttributesForNextAttempt(execution), - properties: { - retryAt: retryAt.toISOString(), - }, - runId: taskRunAttempt.taskRun.friendlyId, - style: { - icon: "schedule-attempt", + await eventRepository.recordEvent( + `Retry #${execution.attempt.number} delay${oomMachine ? " after OOM" : ""}`, + { + taskSlug: taskRunAttempt.taskRun.taskIdentifier, + environment, + attributes: { + metadata: this.#generateMetadataAttributesForNextAttempt(execution), + properties: { + retryAt: retryAt.toISOString(), + previousMachine: oomMachine + ? taskRunAttempt.taskRun.machinePreset ?? undefined + : undefined, + nextMachine: oomMachine, + }, + runId: taskRunAttempt.taskRun.friendlyId, + style: { + icon: "schedule-attempt", + }, + queueId: taskRunAttempt.queueId, + queueName: taskRunAttempt.taskRun.queue, }, - queueId: taskRunAttempt.queueId, - queueName: taskRunAttempt.taskRun.queue, - }, - context: taskRunAttempt.taskRun.traceContext as Record, - spanIdSeed: `retry-${taskRunAttempt.number + 1}`, - endTime: retryAt, - }); + context: taskRunAttempt.taskRun.traceContext as Record, + spanIdSeed: `retry-${taskRunAttempt.number + 1}`, + endTime: retryAt, + } + ); logger.debug("[CompleteAttemptService] Retrying", { taskRun: taskRunAttempt.taskRun.friendlyId, @@ -753,3 +776,10 @@ function isOOMError(error: TaskRunError) { return false; } + +function exitRun(runId: string) { + socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { + version: "v1", + runId, + }); +}