Skip to content

Commit

Permalink
Merge pull request #18 from henriqueribeiro/revert-17-master
Browse files Browse the repository at this point in the history
Revert "Extra failure handling for Batch"
  • Loading branch information
henriqueribeiro authored Apr 4, 2022
2 parents 463116c + 22bc5d7 commit cb44d45
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1265,16 +1265,70 @@ trait StandardAsyncExecutionActor
def handleExecutionResult(status: StandardAsyncRunState,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {

// get the memory retry code.
def memoryRetryRC: Future[Boolean] = {
// convert int to boolean
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}
// read if the file exists
def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}

// get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & waiting 5m before retry.")
Thread.sleep(300000)
Future("1")
}
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.returnCode)
jobRC <- readRCFile(fileExists)
} yield jobRC
}

// get path to sderr
val stderr = jobPaths.standardPaths.error
lazy val stderrAsOption: Option[Path] = Option(stderr)
// get the three needed variables, using helper functions below, or direct assignment.
// get the three needed variables, using functions above or direct assignment.
val stderrSizeAndReturnCodeAndMemoryRetry = for {
returnCodeAsString <- JobExitCode
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
retryWithMoreMemory <- memoryRetryRC(oldHandle.pendingJob)
retryWithMoreMemory <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, retryWithMoreMemory)

stderrSizeAndReturnCodeAndMemoryRetry flatMap {
Expand All @@ -1283,36 +1337,25 @@ trait StandardAsyncExecutionActor

if (isDone(status)) {
tryReturnCodeAsInt match {
// stderr not empty : retry
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
// job was aborted (cancelled by user?)
// on AWS OOM kill are code 137 : check retryWithMoreMemory here
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'")
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
// job considered ok by accepted exit code
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// job failed on out-of-memory : retry
case Success(returnCodeAsInt) if retryWithMoreMemory =>
jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
// unaccepted return code : retry.
case Success(returnCodeAsInt) =>
jobLogger.debug(s"Retrying with wrong exit code : '${returnCodeAsString}'")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Failure(_) =>
jobLogger.warn(s"General failure of job with exit code : '${returnCodeAsString}'")
Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption), kvPairsToSave = None))
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if retryWithMoreMemory && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
jobLogger.debug(s"job not done but retrying already? : ${status.toString()}")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
case _ =>
Expand All @@ -1330,63 +1373,6 @@ trait StandardAsyncExecutionActor
}
}

// helper function for handleExecutionResult : get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & retry.")
//Thread.sleep(300000)
Future("1")
}
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.returnCode)
jobRC <- readRCFile(fileExists)
} yield jobRC
}

// helper function for handleExecutionResult : get the memory retry code.
def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = {
// job is used in aws override version. use here to prevent compilation error.
log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
// convert int to boolean
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}
// read if the file exists
def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}


/**
* Send the job id of the running job to the key value store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object RetryableRequestSupport {
case _: SocketException => true
case _: SocketTimeoutException => true
case ioE: IOException if Option(ioE.getMessage).exists(_.contains("Error getting access token for service account")) => true
case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE) || isAws504(ioE)
case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE)
case other =>
// Infinitely retryable is a subset of retryable
isInfinitelyRetryable(other)
Expand Down Expand Up @@ -86,28 +86,4 @@ object RetryableRequestSupport {
msg.contains("504 Gateway Timeout")
)
}
// AWS timeout error
def isAws504(failure: Throwable): Boolean = {
Option(failure.getMessage).exists(msg =>
(
// timeout in reading form s3.
msg.contains("Could not read from s3") &&
msg.contains("Timeout waiting for connection")
) || (
// reading in cromwell wdl (read_lines() etc)
msg.contains("Failed to evaluate") &&
msg.contains("s3://") &&
msg.contains("Timed out after")
)
)
}
// General AWS IO error : all items unreadable except rc.txt files (might be missing)
// => mainly for testing. will retry mis-specified s3 paths as well...
def isAwsIO(failure:Throwable): Boolean = {
Option(failure.getMessage).exists(msg =>
msg.contains("Could not read from s3") &&
! msg.contains("-rc.txt")
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
import cromwell.services.keyvalue.KvClient
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.services.batch.BatchClient
//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import software.amazon.awssdk.services.batch.model._

import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import wom.callable.Callable.OutputDefinition
import wom.core.FullyQualifiedName
import wom.expression.NoIoFunctionSet
Expand Down Expand Up @@ -187,13 +184,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Option(configuration.awsAuth),
configuration.fsxMntPoint)
}

// setup batch client to query job container info
lazy val batchClient: BatchClient = {
val builder = BatchClient.builder()
configureClient(builder, batchJob.optAwsAuthMode, batchJob.configRegion)
}

/* Tries to abort the job in flight
*
* @param job A StandardAsyncJob object (has jobId value) to cancel
Expand Down Expand Up @@ -524,31 +514,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield guaranteedAnswer
}

// new OOM detection
override def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future {
Log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'"))
val nrAttempts = jobDetail.attempts.size
val lastattempt = jobDetail.attempts.get(nrAttempts-1)
val containerRC = lastattempt.container.exitCode
// if not zero => get reason, else set retry to false.
containerRC.toString() match {
case "0" =>
Log.debug("container exit code was zero")
false
case _ =>
val containerStatusReason = lastattempt.container.reason
Log.warn(s"Job failed with Container status reason : '${containerStatusReason}'")
val RetryMemoryKeys = memoryRetryErrorKeys.toList.flatten
val retry = RetryMemoryKeys.exists(containerStatusReason.contains)
Log.debug(s"Retry job based on provided keys : '${retry}'")
retry
}


}

// Despite being a "runtime" exception, BatchExceptions for 429 (too many requests) are *not* fatal:
override def isFatal(throwable: Throwable): Boolean = throwable match {
case be: BatchException => !be.getMessage.contains("Status Code: 429")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| fi
| # copy
| $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" ||
| { echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue)
| # check data integrity
| _check_data_integrity $$destination $$s3_path ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| (echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # copy succeeded
| break
| done
Expand Down Expand Up @@ -203,18 +203,18 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| destination=$${destination%/}
| # glob directory. do recursive copy
| $awsCmd s3 cp --no-progress $$local_path $$destination --recursive --exclude "cromwell_glob_control_file" ||
| { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # check integrity for each of the files
| for FILE in $$(cd $$local_path ; ls | grep -v cromwell_glob_control_file); do
| _check_data_integrity $$local_path/$$FILE $$destination/$$FILE ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; }
| ( echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2)
| done
| else
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" ||
| { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # check content length for data integrity
| _check_data_integrity $$local_path $$destination ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| fi
| # copy succeeded
| break
Expand All @@ -235,10 +235,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| exit 1
| fi
| s3_content_length=$$($awsCmd s3api head-object --bucket "$$bucket" --key "$$key" --query 'ContentLength') ||
| { echo "Attempt to get head of object failed for $$s3_path." && return 1 ; }
| ( echo "Attempt to get head of object failed for $$s3_path." && return 1 )
| # local
| local_content_length=$$(LC_ALL=C ls -dn -- "$$local_path" | awk '{print $$5; exit}' ) ||
| { echo "Attempt to get local content length failed for $$_local_path." && return 1; }
| ( echo "Attempt to get local content length failed for $$_local_path." && return 1 )
| # compare
| if [[ "$$s3_content_length" -eq "$$local_content_length" ]]; then
| true
Expand Down Expand Up @@ -277,7 +277,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
s"""
|touch ${output.name}
|_s3_delocalize_with_retry ${output.name} ${output.s3key}
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi""".stripMargin
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory "; fi""".stripMargin


case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,6 @@ Notes:
- local hashing means that all used containers are pulled. Make sure you have enough storage
- enable a database to make the cache persistent over cromwell restarts

### Retry with more memory

Cromwell can be configured to retry jobs with more allocated memory, under a defined set of conditions. To enable this, set the following parameters:

cromwell configuration: `cromwell.config`:
```
// set the maximal amount of retries.
// backend.providers.AWSBatch.config.default-runtime-attribues.maxRetries
backend {
providers {
AWSBatch {
config {
default-runtime-attributes {
maxRetries: 6
}
}
}
}
}
// set the keys for Out-Of-Memory killing.
// system.io.memory-retry-error-keys
system{
io{
memory-retry-error-keys = ["OutOfMemory","Killed"]
}
}
```

Workflow specific runtime options : `workflow_options.json`:
```
{
"memory_retry_multiplier" : 1.5
}
```

When providing the options.json file during workflow submission, jobs that were terminated due to insufficient memory will be retried 6 times, with increasing memory allocation. For example 4Gb => 6Gb => 9Gb => 13.5Gb => ...

Note: Retries of jobs using the `awsBatchRetryAttempts` counter do *not* increase memory allocation.



AWS Batch
---------

Expand Down

0 comments on commit cb44d45

Please sign in to comment.