Skip to content

Commit

Permalink
Merge pull request #23 from geertvandeweyer/master
Browse files Browse the repository at this point in the history
handling of very rare early/late job killing
  • Loading branch information
henriqueribeiro authored Aug 19, 2022
2 parents 1d78426 + 1b7defa commit 7b98f9a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final case class WrongReturnCode(jobTag: String, returnCode: Int, stderrPath: Op
override def getMessage = s"Job $jobTag exited with return code $returnCode which has not been declared as a valid return code. See 'continueOnReturnCode' runtime attribute for more details."
}

final case class UnExpectedStatus(jobTag: String, returnCode: Int, jobStatus: String, stderrPath: Option[Path]) extends KnownJobFailureException {
override def getMessage = s"Job $jobTag exited with success code '$returnCode' but failed status '$jobStatus'. Suspecting spot kill and retrying."
}

final case class ReturnCodeIsNotAnInt(jobTag: String, returnCode: String, stderrPath: Option[Path]) extends KnownJobFailureException {
override def getMessage = {
if (returnCode.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ trait StandardAsyncExecutionActor
* to re-do this before sending the response.
*/
private var jobPathsUpdated: Boolean = false
private def updateJobPaths(): Unit = if (!jobPathsUpdated) {
def updateJobPaths(): Unit = if (!jobPathsUpdated) {
// .get's are safe on stdout and stderr after falling back to default names above.
jobPaths.standardPaths = StandardPaths(
output = hostPathFromContainerPath(executionStdout),
Expand Down Expand Up @@ -1162,7 +1162,7 @@ trait StandardAsyncExecutionActor
configurationDescriptor.slowJobWarningAfter foreach { duration => self ! WarnAboutSlownessAfter(handle.pendingJob.jobId, duration) }

tellKvJobId(handle.pendingJob) map { _ =>
if (logJobIds) jobLogger.info(s"job id: ${handle.pendingJob.jobId}")
if (logJobIds) jobLogger.debug(s"job id: ${handle.pendingJob.jobId}")
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId))
/*
NOTE: Because of the async nature of the Scala Futures, there is a point in time where we have submitted this or
Expand Down Expand Up @@ -1281,7 +1281,7 @@ trait StandardAsyncExecutionActor
stderrSizeAndReturnCodeAndMemoryRetry flatMap {
case (stderrSize, returnCodeAsString, retryWithMoreMemory) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

jobLogger.debug(s"Handling execution Result with status '${status.toString()}' and returnCode ${returnCodeAsString}")
if (isDone(status)) {
tryReturnCodeAsInt match {
// stderr not empty : retry
Expand All @@ -1293,12 +1293,17 @@ trait StandardAsyncExecutionActor
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'")
Future.successful(AbortedExecutionHandle)
// if instance killed after RC.txt creation : edge case with status == Failed AND returnCode == [accepted values] => retry.
case Success(returnCodeAsInt) if status.toString() == "Failed" && continueOnReturnCode.continueFor(returnCodeAsInt) =>
jobLogger.debug(s"Suspected spot kill due to status/RC mismatch")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(UnExpectedStatus(jobDescriptor.key.tag, returnCodeAsInt, status.toString(), stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
// job considered ok by accepted exit code
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// job failed on out-of-memory : retry
case Success(returnCodeAsInt) if retryWithMoreMemory =>
jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
jobLogger.info(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
// unaccepted return code : retry.
Expand Down Expand Up @@ -1339,8 +1344,7 @@ trait StandardAsyncExecutionActor
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & retry.")
//Thread.sleep(300000)
jobLogger.debug("RC file not found. Setting job to failed.")
Future("1")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
log.info(template, jobTag, data.failedCopyAttempts, callCachingParameters.maxFailedCopyAttempts, data.aggregatedHashString)
} else {
log.info(s"BT-322 {} cache hit copying nomatch: could not find a suitable cache hit.", jobTag)
workflowLogger.info("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag)
workflowLogger.debug("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag)
}

runJob(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,38 @@ import java.io.FileNotFoundException
import akka.actor.ActorRef
import akka.pattern.AskSupport
import akka.util.Timeout

import cats.implicits._

import common.exception.MessageAggregation
import common.collections.EnhancedCollections._
import common.util.StringUtil._
import common.validation.Validation._

import cromwell.backend._
import cromwell.backend.async.{ExecutionHandle, PendingExecutionHandle}
import cromwell.backend.async._ //{ExecutionHandle, PendingExecutionHandle}
import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest
import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus}
import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus}
import cromwell.backend.impl.aws.io._
import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.backend.OutputEvaluator._

import cromwell.core._
import cromwell.core.path.Path
import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder, PathFactory}
import cromwell.core.io.{DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.retry.SimpleExponentialBackoff

import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder

import cromwell.services.keyvalue.KvClient

import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.services.batch.BatchClient
//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import software.amazon.awssdk.services.batch.model._

import wom.callable.Callable.OutputDefinition
Expand All @@ -67,7 +77,7 @@ import wom.expression.NoIoFunctionSet
import wom.types.{WomArrayType, WomSingleFileType}
import wom.values._

import scala.concurrent.{Future, Promise}
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
Expand Down Expand Up @@ -510,7 +520,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case NotifyOfStatus(_, _, Some(value)) =>
Future.successful(value)
case NotifyOfStatus(_, _, None) =>
jobLogger.info("Having to fall back to AWS query for status")
jobLogger.debug("Having to fall back to AWS query for status")
Future.fromTry(job.status(jobId))
case other =>
val message = s"Programmer Error (please report this): Received an unexpected message from the OccasionalPollingActor: $other"
Expand All @@ -536,8 +546,24 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'"))
val nrAttempts = jobDetail.attempts.size
val lastattempt = jobDetail.attempts.get(nrAttempts-1)
var containerRC = lastattempt.container.exitCode
// if job is terminated/cancelled before starting, there are no attempts.
val lastattempt =
try {
jobDetail.attempts.get(nrAttempts-1)
} catch {
case _ : Throwable => null
}
if (lastattempt == null ) {
Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.")
false
}

var containerRC =
try {
lastattempt.container.exitCode
} catch {
case _ : Throwable => null
}
// if missing, set to failed.
if (containerRC == null ) {
Log.debug(s"No RC found for job '${job.jobId}', most likely a spot kill")
Expand Down Expand Up @@ -621,8 +647,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
override def getTerminalEvents(runStatus: RunStatus): Seq[ExecutionEvent] = {
runStatus match {
case successStatus: RunStatus.Succeeded => successStatus.eventList
case unknown =>
throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown")
case unknown => {
throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown")
}
}
}

Expand All @@ -642,4 +669,38 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
)
}

override def handleExecutionSuccess(runStatus: StandardAsyncRunState,
handle: StandardAsyncPendingExecutionHandle,
returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
evaluateOutputs() map {
case ValidJobOutputs(outputs) =>
// Need to make sure the paths are up to date before sending the detritus back in the response
updateJobPaths()
// If instance is terminated while copying stdout/stderr : status is failed while jobs outputs are ok
// => Retryable
if (runStatus.toString().equals("Failed")) {
jobLogger.warn("Got Failed RunStatus for success Execution")

val exception = new MessageAggregation {
override def exceptionContext: String = "Got Failed RunStatus for success Execution"
override def errorMessages: Traversable[String] = Array("Got Failed RunStatus for success Execution")
}
FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None)
} else {
SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus))
}
case InvalidJobOutputs(errors) =>
val exception = new MessageAggregation {
override def exceptionContext: String = "Failed to evaluate job outputs"
override def errorMessages: Traversable[String] = errors.toList
}
FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None)
case JobOutputsEvaluationException(exception: Exception) if retryEvaluateOutputsAggregated(exception) =>
// Return the execution handle in this case to retry the operation
handle
case JobOutputsEvaluationException(ex) => FailedNonRetryableExecutionHandle(ex, kvPairsToSave = None)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|
| # get the multipart chunk size
| chunk_size=$$(_get_multipart_chunk_size $$local_path)
| echo "chunk size : $$chunk_size bytes"
| local MP_THRESHOLD=${mp_threshold}
| # then set them
| $awsCmd configure set default.s3.multipart_threshold $$MP_THRESHOLD
Expand Down Expand Up @@ -378,7 +377,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
//calls the client to submit the job
def callClient(definitionArn: String, awsBatchAttributes: AwsBatchAttributes): Aws[F, SubmitJobResponse] = {

Log.info(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script")
Log.debug(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script")
Log.info(s"Submitting taskId: $taskId, script: $batch_script")

val submit: F[SubmitJobResponse] =
async.delay(batchClient.submitJob(
Expand Down

0 comments on commit 7b98f9a

Please sign in to comment.