Skip to content

Commit

Permalink
Add not_found error management
Browse files Browse the repository at this point in the history
Signed-off-by: jorgee <jorge.ejarque@seqera.io>
  • Loading branch information
jorgee committed Jan 21, 2025
1 parent caf8dbd commit e60d4b9
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.cloud.google.batch

import com.google.api.gax.rpc.NotFoundException
import com.google.cloud.batch.v1.Task

import java.nio.file.Path
import java.util.regex.Pattern

Expand Down Expand Up @@ -459,23 +462,45 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
final status = client.getTaskStatus(jobId, taskId)
final newState = status?.state as String
if( newState ) {
log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState"
taskState = newState
timestamp = now
}
if( newState == 'PENDING' ) {
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
if( lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED') )
log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}"
try {
final status = client.getTaskStatus(jobId, taskId)
inspectTaskStatus(status)
}catch (NotFoundException e) {
manageNotFound(tasks)
}
}
return taskState
}

private void inspectTaskStatus(com.google.cloud.batch.v1.TaskStatus status) {
final newState = status?.state as String
if (newState) {
log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState"
taskState = newState
timestamp = System.currentTimeMillis()
}
if (newState == 'PENDING') {
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
if (lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED'))
log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}"
}
}

protected String manageNotFound( Iterable<Task> tasks) {
// If task is array, check if the in the task list
if (tasks.size() > 1) {
for (Task t in tasks) {
if (t.name == client.generateTaskName(jobId, taskId)) {
inspectTaskStatus(t.status)
return taskState
}
}
}
// if not array or it task is not in the list, check job status.
checkJobStatus()
}

protected String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId)
final newState = jobStatus?.state as String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class BatchClient {
}

Task describeTask(String jobId, String taskId) {
final name = TaskName.of(projectId, location, jobId, 'group0', taskId)
final name = generateTaskName(jobId, taskId)
return apply(()-> batchServiceClient.getTask(name))
}

Expand Down Expand Up @@ -141,6 +141,10 @@ class BatchClient {
return location
}

String generateTaskName(String jobId, String taskId) {
TaskName.of(projectId, location, jobId, 'group0', taskId)
}

/**
* Creates a retry policy using the configuration specified by {@link BatchRetryConfig}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package nextflow.cloud.google.batch

import com.google.api.gax.grpc.GrpcStatusCode
import com.google.api.gax.rpc.NotFoundException
import com.google.api.gax.rpc.StatusCode
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task
import io.grpc.Status

import java.nio.file.Path

Expand Down Expand Up @@ -619,4 +623,36 @@ class GoogleBatchTaskHandlerTest extends Specification {
handler.getTaskState() == "FAILED"
handler.getJobError().message == message
}

def 'should manage not found when getting task state '() {
given:
def jobId = '1'
def taskId = '1'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))

when:
client.generateTaskName(jobId, taskId) >> "$jobId/group0/$taskId"
//Force errors
client.getTaskStatus(jobId, taskId) >> { throw new NotFoundException(new Exception("Error"), GrpcStatusCode.of(Status.Code.NOT_FOUND), false) }
client.listTasks(jobId) >> TASK_LIST
client.getJobStatus(jobId) >> makeJobStatus(JOB_STATUS, "")
then:
handler.getTaskState() == EXPECTED

where:
EXPECTED | JOB_STATUS | TASK_LIST
"FAILED" | JobStatus.State.FAILED | {[ makeTask("1/group0/2", TaskStatus.State.PENDING), makeTask("1/group0/3", TaskStatus.State.PENDING) ].iterator() } // Task not in the list, get from job
"SUCCEEDED" | JobStatus.State.FAILED | {[ makeTask("1/group0/1", TaskStatus.State.SUCCEEDED), makeTask("1/group0/2", TaskStatus.State.PENDING)].iterator() } //Task in the list, get from task status
}

def makeTask(String name, TaskStatus.State state){
Task.newBuilder().setName(name)
.setStatus(TaskStatus.newBuilder().setState(state).build())
.build()

}
}

0 comments on commit e60d4b9

Please sign in to comment.