1
1
import { Attributes } from "@opentelemetry/api" ;
2
2
import {
3
+ MachinePresetName ,
3
4
TaskRunContext ,
4
5
TaskRunError ,
5
6
TaskRunErrorCodes ,
8
9
TaskRunExecutionRetry ,
9
10
TaskRunFailedExecutionResult ,
10
11
TaskRunSuccessfulExecutionResult ,
11
- exceptionEventEnhancer ,
12
12
flattenAttributes ,
13
- internalErrorFromUnexpectedExit ,
14
13
isManualOutOfMemoryError ,
15
14
sanitizeError ,
16
15
shouldRetryError ,
@@ -32,8 +31,8 @@ import { CancelAttemptService } from "./cancelAttempt.server";
32
31
import { CreateCheckpointService } from "./createCheckpoint.server" ;
33
32
import { FinalizeTaskRunService } from "./finalizeTaskRun.server" ;
34
33
import { RetryAttemptService } from "./retryAttempt.server" ;
35
- import { updateMetadataService } from "~/services/metadata/updateMetadata.server" ;
36
34
import { getTaskEventStoreTableForRun } from "../taskEventStore.server" ;
35
+ import { socketIo } from "../handleSocketIo.server" ;
37
36
38
37
type FoundAttempt = Awaited < ReturnType < typeof findAttempt > > ;
39
38
@@ -256,9 +255,12 @@ export class CompleteAttemptService extends BaseService {
256
255
257
256
let retriableError = shouldRetryError ( taskRunErrorEnhancer ( completion . error ) ) ;
258
257
let isOOMRetry = false ;
258
+ let isOOMAttempt = isOOMError ( completion . error ) ;
259
+ let isOnMaxOOMMachine = false ;
260
+ let oomMachine : MachinePresetName | undefined ;
259
261
260
- //OOM errors should retry (if an OOM machine is specified)
261
- if ( isOOMError ( completion . error ) ) {
262
+ //OOM errors should retry (if an OOM machine is specified, and we're not already on it )
263
+ if ( isOOMAttempt ) {
262
264
const retryConfig = FailedTaskRunRetryHelper . getRetryConfig ( {
263
265
run : {
264
266
...taskRunAttempt . taskRun ,
@@ -268,10 +270,10 @@ export class CompleteAttemptService extends BaseService {
268
270
execution,
269
271
} ) ;
270
272
271
- if (
272
- retryConfig ?. outOfMemory ?. machine &&
273
- retryConfig . outOfMemory . machine !== taskRunAttempt . taskRun . machinePreset
274
- ) {
273
+ oomMachine = retryConfig ?. outOfMemory ?. machine ;
274
+ isOnMaxOOMMachine = oomMachine === taskRunAttempt . taskRun . machinePreset ;
275
+
276
+ if ( oomMachine && ! isOnMaxOOMMachine ) {
275
277
//we will retry
276
278
isOOMRetry = true ;
277
279
retriableError = true ;
@@ -290,7 +292,7 @@ export class CompleteAttemptService extends BaseService {
290
292
id : taskRunAttempt . taskRunId ,
291
293
} ,
292
294
data : {
293
- machinePreset : retryConfig . outOfMemory . machine ,
295
+ machinePreset : oomMachine ,
294
296
} ,
295
297
} ) ;
296
298
}
@@ -309,11 +311,17 @@ export class CompleteAttemptService extends BaseService {
309
311
environment,
310
312
checkpoint,
311
313
forceRequeue : isOOMRetry ,
314
+ oomMachine,
312
315
} ) ;
313
316
}
314
317
315
318
// The attempt has failed and we won't retry
316
319
320
+ if ( isOOMAttempt && isOnMaxOOMMachine && environment . type !== "DEVELOPMENT" ) {
321
+ // The attempt failed due to an OOM error but we're already on the machine we should retry on
322
+ exitRun ( taskRunAttempt . taskRunId ) ;
323
+ }
324
+
317
325
// Now we need to "complete" the task run event/span
318
326
await eventRepository . completeEvent (
319
327
getTaskEventStoreTableForRun ( taskRunAttempt . taskRun ) ,
@@ -507,6 +515,11 @@ export class CompleteAttemptService extends BaseService {
507
515
508
516
if ( forceRequeue ) {
509
517
logger . debug ( "[CompleteAttemptService] Forcing retry via queue" , { runId : run . id } ) ;
518
+
519
+ // The run won't know it should shut down as we make the decision to force requeue here
520
+ // This also ensures that this change is backwards compatible with older workers
521
+ exitRun ( run . id ) ;
522
+
510
523
await retryViaQueue ( ) ;
511
524
return ;
512
525
}
@@ -544,6 +557,7 @@ export class CompleteAttemptService extends BaseService {
544
557
environment,
545
558
checkpoint,
546
559
forceRequeue = false ,
560
+ oomMachine,
547
561
} : {
548
562
execution : TaskRunExecution ;
549
563
executionRetry : TaskRunExecutionRetry ;
@@ -552,29 +566,38 @@ export class CompleteAttemptService extends BaseService {
552
566
environment : AuthenticatedEnvironment ;
553
567
checkpoint ?: CheckpointData ;
554
568
forceRequeue ?: boolean ;
569
+ /** Setting this will also alter the retry span message */
570
+ oomMachine ?: MachinePresetName ;
555
571
} ) {
556
572
const retryAt = new Date ( executionRetry . timestamp ) ;
557
573
558
574
// Retry the task run
559
- await eventRepository . recordEvent ( `Retry #${ execution . attempt . number } delay` , {
560
- taskSlug : taskRunAttempt . taskRun . taskIdentifier ,
561
- environment,
562
- attributes : {
563
- metadata : this . #generateMetadataAttributesForNextAttempt( execution ) ,
564
- properties : {
565
- retryAt : retryAt . toISOString ( ) ,
566
- } ,
567
- runId : taskRunAttempt . taskRun . friendlyId ,
568
- style : {
569
- icon : "schedule-attempt" ,
575
+ await eventRepository . recordEvent (
576
+ `Retry #${ execution . attempt . number } delay${ oomMachine ? " after OOM" : "" } ` ,
577
+ {
578
+ taskSlug : taskRunAttempt . taskRun . taskIdentifier ,
579
+ environment,
580
+ attributes : {
581
+ metadata : this . #generateMetadataAttributesForNextAttempt( execution ) ,
582
+ properties : {
583
+ retryAt : retryAt . toISOString ( ) ,
584
+ previousMachine : oomMachine
585
+ ? taskRunAttempt . taskRun . machinePreset ?? undefined
586
+ : undefined ,
587
+ nextMachine : oomMachine ,
588
+ } ,
589
+ runId : taskRunAttempt . taskRun . friendlyId ,
590
+ style : {
591
+ icon : "schedule-attempt" ,
592
+ } ,
593
+ queueId : taskRunAttempt . queueId ,
594
+ queueName : taskRunAttempt . taskRun . queue ,
570
595
} ,
571
- queueId : taskRunAttempt . queueId ,
572
- queueName : taskRunAttempt . taskRun . queue ,
573
- } ,
574
- context : taskRunAttempt . taskRun . traceContext as Record < string , string | undefined > ,
575
- spanIdSeed : `retry-${ taskRunAttempt . number + 1 } ` ,
576
- endTime : retryAt ,
577
- } ) ;
596
+ context : taskRunAttempt . taskRun . traceContext as Record < string , string | undefined > ,
597
+ spanIdSeed : `retry-${ taskRunAttempt . number + 1 } ` ,
598
+ endTime : retryAt ,
599
+ }
600
+ ) ;
578
601
579
602
logger . debug ( "[CompleteAttemptService] Retrying" , {
580
603
taskRun : taskRunAttempt . taskRun . friendlyId ,
@@ -753,3 +776,10 @@ function isOOMError(error: TaskRunError) {
753
776
754
777
return false ;
755
778
}
779
+
780
+ function exitRun ( runId : string ) {
781
+ socketIo . coordinatorNamespace . emit ( "REQUEST_RUN_CANCELLATION" , {
782
+ version : "v1" ,
783
+ runId,
784
+ } ) ;
785
+ }
0 commit comments