Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Extra failure handling for Batch" #18

Merged
merged 1 commit into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1265,16 +1265,70 @@ trait StandardAsyncExecutionActor
def handleExecutionResult(status: StandardAsyncRunState,
oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = {

// get the memory retry code.
def memoryRetryRC: Future[Boolean] = {
// convert int to boolean
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}
// read if the file exists
def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}

// get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & waiting 5m before retry.")
Thread.sleep(300000)
Future("1")
}
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.returnCode)
jobRC <- readRCFile(fileExists)
} yield jobRC
}

// get path to sderr
val stderr = jobPaths.standardPaths.error
lazy val stderrAsOption: Option[Path] = Option(stderr)
// get the three needed variables, using helper functions below, or direct assignment.
// get the three needed variables, using functions above or direct assignment.
val stderrSizeAndReturnCodeAndMemoryRetry = for {
returnCodeAsString <- JobExitCode
// Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
// may fail due to race conditions on quickly-executing jobs.
stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L)
retryWithMoreMemory <- memoryRetryRC(oldHandle.pendingJob)
retryWithMoreMemory <- memoryRetryRC
} yield (stderrSize, returnCodeAsString, retryWithMoreMemory)

stderrSizeAndReturnCodeAndMemoryRetry flatMap {
Expand All @@ -1283,36 +1337,25 @@ trait StandardAsyncExecutionActor

if (isDone(status)) {
tryReturnCodeAsInt match {
// stderr not empty : retry
case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
// job was aborted (cancelled by user?)
// on AWS OOM kill are code 137 : check retryWithMoreMemory here
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'")
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
// job considered ok by accepted exit code
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// job failed on out-of-memory : retry
case Success(returnCodeAsInt) if retryWithMoreMemory =>
jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
// unaccepted return code : retry.
case Success(returnCodeAsInt) =>
jobLogger.debug(s"Retrying with wrong exit code : '${returnCodeAsString}'")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
case Failure(_) =>
jobLogger.warn(s"General failure of job with exit code : '${returnCodeAsString}'")
Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption), kvPairsToSave = None))
}
} else {
tryReturnCodeAsInt match {
case Success(returnCodeAsInt) if retryWithMoreMemory && !continueOnReturnCode.continueFor(returnCodeAsInt) =>
jobLogger.debug(s"job not done but retrying already? : ${status.toString()}")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
case _ =>
Expand All @@ -1330,63 +1373,6 @@ trait StandardAsyncExecutionActor
}
}

// helper function for handleExecutionResult : get the exit code of the job.
def JobExitCode: Future[String] = {

// read if the file exists
def readRCFile(fileExists: Boolean): Future[String] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & retry.")
//Thread.sleep(300000)
Future("1")
}
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.returnCode)
jobRC <- readRCFile(fileExists)
} yield jobRC
}

// helper function for handleExecutionResult : get the memory retry code.
def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = {
// job is used in aws override version. use here to prevent compilation error.
log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
// convert int to boolean
def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = {
codeAsOption match {
case Some(codeAsString) =>
Try(codeAsString.trim.toInt) match {
case Success(code) => code match {
case StderrContainsRetryKeysCode => true
case _ => false
}
case Failure(e) =>
log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " +
s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}")
false
}
case None => false
}
}
// read if the file exists
def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = {
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_))
else
Future.successful(None)
}
//finally : assign the yielded variable
for {
fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists)
retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption)
} yield retryWithMoreMemory
}


/**
* Send the job id of the running job to the key value store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object RetryableRequestSupport {
case _: SocketException => true
case _: SocketTimeoutException => true
case ioE: IOException if Option(ioE.getMessage).exists(_.contains("Error getting access token for service account")) => true
case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE) || isAws504(ioE)
case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE)
case other =>
// Infinitely retryable is a subset of retryable
isInfinitelyRetryable(other)
Expand Down Expand Up @@ -86,28 +86,4 @@ object RetryableRequestSupport {
msg.contains("504 Gateway Timeout")
)
}
// AWS timeout error
def isAws504(failure: Throwable): Boolean = {
Option(failure.getMessage).exists(msg =>
(
// timeout in reading form s3.
msg.contains("Could not read from s3") &&
msg.contains("Timeout waiting for connection")
) || (
// reading in cromwell wdl (read_lines() etc)
msg.contains("Failed to evaluate") &&
msg.contains("s3://") &&
msg.contains("Timed out after")
)
)
}
// General AWS IO error : all items unreadable except rc.txt files (might be missing)
// => mainly for testing. will retry mis-specified s3 paths as well...
def isAwsIO(failure:Throwable): Boolean = {
Option(failure.getMessage).exists(msg =>
msg.contains("Could not read from s3") &&
! msg.contains("-rc.txt")
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
import cromwell.services.keyvalue.KvClient
import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.services.batch.BatchClient
//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import software.amazon.awssdk.services.batch.model._

import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import wom.callable.Callable.OutputDefinition
import wom.core.FullyQualifiedName
import wom.expression.NoIoFunctionSet
Expand Down Expand Up @@ -187,13 +184,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Option(configuration.awsAuth),
configuration.fsxMntPoint)
}

// setup batch client to query job container info
lazy val batchClient: BatchClient = {
val builder = BatchClient.builder()
configureClient(builder, batchJob.optAwsAuthMode, batchJob.configRegion)
}

/* Tries to abort the job in flight
*
* @param job A StandardAsyncJob object (has jobId value) to cancel
Expand Down Expand Up @@ -524,31 +514,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield guaranteedAnswer
}

// new OOM detection
override def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future {
Log.debug(s"Looking for memoryRetry in job '${job.jobId}'")
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'"))
val nrAttempts = jobDetail.attempts.size
val lastattempt = jobDetail.attempts.get(nrAttempts-1)
val containerRC = lastattempt.container.exitCode
// if not zero => get reason, else set retry to false.
containerRC.toString() match {
case "0" =>
Log.debug("container exit code was zero")
false
case _ =>
val containerStatusReason = lastattempt.container.reason
Log.warn(s"Job failed with Container status reason : '${containerStatusReason}'")
val RetryMemoryKeys = memoryRetryErrorKeys.toList.flatten
val retry = RetryMemoryKeys.exists(containerStatusReason.contains)
Log.debug(s"Retry job based on provided keys : '${retry}'")
retry
}


}

// Despite being a "runtime" exception, BatchExceptions for 429 (too many requests) are *not* fatal:
override def isFatal(throwable: Throwable): Boolean = throwable match {
case be: BatchException => !be.getMessage.contains("Status Code: 429")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| fi
| # copy
| $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" ||
| { echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue)
| # check data integrity
| _check_data_integrity $$destination $$s3_path ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| (echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # copy succeeded
| break
| done
Expand Down Expand Up @@ -203,18 +203,18 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| destination=$${destination%/}
| # glob directory. do recursive copy
| $awsCmd s3 cp --no-progress $$local_path $$destination --recursive --exclude "cromwell_glob_control_file" ||
| { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # check integrity for each of the files
| for FILE in $$(cd $$local_path ; ls | grep -v cromwell_glob_control_file); do
| _check_data_integrity $$local_path/$$FILE $$destination/$$FILE ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; }
| ( echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2)
| done
| else
| $awsCmd s3 cp --no-progress "$$local_path" "$$destination" ||
| { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| # check content length for data integrity
| _check_data_integrity $$local_path $$destination ||
| { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; }
| ( echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue)
| fi
| # copy succeeded
| break
Expand All @@ -235,10 +235,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
| exit 1
| fi
| s3_content_length=$$($awsCmd s3api head-object --bucket "$$bucket" --key "$$key" --query 'ContentLength') ||
| { echo "Attempt to get head of object failed for $$s3_path." && return 1 ; }
| ( echo "Attempt to get head of object failed for $$s3_path." && return 1 )
| # local
| local_content_length=$$(LC_ALL=C ls -dn -- "$$local_path" | awk '{print $$5; exit}' ) ||
| { echo "Attempt to get local content length failed for $$_local_path." && return 1; }
| ( echo "Attempt to get local content length failed for $$_local_path." && return 1 )
| # compare
| if [[ "$$s3_content_length" -eq "$$local_content_length" ]]; then
| true
Expand Down Expand Up @@ -277,7 +277,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
s"""
|touch ${output.name}
|_s3_delocalize_with_retry ${output.name} ${output.s3key}
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi""".stripMargin
|if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory "; fi""".stripMargin


case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,48 +133,6 @@ Notes:
- local hashing means that all used containers are pulled. Make sure you have enough storage
- enable a database to make the cache persistent over cromwell restarts

### Retry with more memory

Cromwell can be configured to retry jobs with more allocated memory, under a defined set of conditions. To enable this, set the following parameters:

cromwell configuration: `cromwell.config`:
```
// set the maximal amount of retries.
// backend.providers.AWSBatch.config.default-runtime-attribues.maxRetries
backend {
providers {
AWSBatch {
config {
default-runtime-attributes {
maxRetries: 6
}
}
}
}
}

// set the keys for Out-Of-Memory killing.
// system.io.memory-retry-error-keys
system{
io{
memory-retry-error-keys = ["OutOfMemory","Killed"]
}
}
```

Workflow specific runtime options : `workflow_options.json`:
```
{
"memory_retry_multiplier" : 1.5
}
```

When providing the options.json file during workflow submission, jobs that were terminated due to insufficient memory will be retried 6 times, with increasing memory allocation. For example 4Gb => 6Gb => 9Gb => 13.5Gb => ...

Note: Retries of jobs using the `awsBatchRetryAttempts` counter do *not* increase memory allocation.



AWS Batch
---------

Expand Down