diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index 2ef8b7bd5b7..9b21871b538 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -1265,16 +1265,70 @@ trait StandardAsyncExecutionActor def handleExecutionResult(status: StandardAsyncRunState, oldHandle: StandardAsyncPendingExecutionHandle): Future[ExecutionHandle] = { + // get the memory retry code. + def memoryRetryRC: Future[Boolean] = { + // convert int to boolean + def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = { + codeAsOption match { + case Some(codeAsString) => + Try(codeAsString.trim.toInt) match { + case Success(code) => code match { + case StderrContainsRetryKeysCode => true + case _ => false + } + case Failure(e) => + log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " + + s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}") + false + } + case None => false + } + } + // read if the file exists + def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = { + if (fileExists) + asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_)) + else + Future.successful(None) + } + //finally : assign the yielded variable + for { + fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC) + retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists) + retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption) + } yield retryWithMoreMemory + } + + // get the exit code of the job. + def JobExitCode: Future[String] = { + + // read if the file exists + def readRCFile(fileExists: Boolean): Future[String] = { + if (fileExists) + asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false) + else { + jobLogger.warn("RC file not found. Setting job to failed & waiting 5m before retry.") + Thread.sleep(300000) + Future("1") + } + } + //finally : assign the yielded variable + for { + fileExists <- asyncIo.existsAsync(jobPaths.returnCode) + jobRC <- readRCFile(fileExists) + } yield jobRC + } + // get path to sderr val stderr = jobPaths.standardPaths.error lazy val stderrAsOption: Option[Path] = Option(stderr) - // get the three needed variables, using helper functions below, or direct assignment. + // get the three needed variables, using functions above or direct assignment. val stderrSizeAndReturnCodeAndMemoryRetry = for { returnCodeAsString <- JobExitCode // Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that // may fail due to race conditions on quickly-executing jobs. stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future.successful(0L) - retryWithMoreMemory <- memoryRetryRC(oldHandle.pendingJob) + retryWithMoreMemory <- memoryRetryRC } yield (stderrSize, returnCodeAsString, retryWithMoreMemory) stderrSizeAndReturnCodeAndMemoryRetry flatMap { @@ -1283,36 +1337,25 @@ trait StandardAsyncExecutionActor if (isDone(status)) { tryReturnCodeAsInt match { - // stderr not empty : retry case Success(returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 => val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(StderrNonEmpty(jobDescriptor.key.tag, stderrSize, stderrAsOption), Option(returnCodeAsInt), None)) retryElseFail(executionHandle) - // job was aborted (cancelled by user?) - // on AWS OOM kill are code 137 : check retryWithMoreMemory here - case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory => - jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'") + case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) => Future.successful(AbortedExecutionHandle) - // job considered ok by accepted exit code case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) => handleExecutionSuccess(status, oldHandle, returnCodeAsInt) - // job failed on out-of-memory : retry case Success(returnCodeAsInt) if retryWithMoreMemory => - jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ") val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None)) retryElseFail(executionHandle, retryWithMoreMemory) - // unaccepted return code : retry. case Success(returnCodeAsInt) => - jobLogger.debug(s"Retrying with wrong exit code : '${returnCodeAsString}'") val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(WrongReturnCode(jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption), Option(returnCodeAsInt), None)) retryElseFail(executionHandle) case Failure(_) => - jobLogger.warn(s"General failure of job with exit code : '${returnCodeAsString}'") Future.successful(FailedNonRetryableExecutionHandle(ReturnCodeIsNotAnInt(jobDescriptor.key.tag, returnCodeAsString, stderrAsOption), kvPairsToSave = None)) } } else { tryReturnCodeAsInt match { case Success(returnCodeAsInt) if retryWithMoreMemory && !continueOnReturnCode.continueFor(returnCodeAsInt) => - jobLogger.debug(s"job not done but retrying already? : ${status.toString()}") val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None)) retryElseFail(executionHandle, retryWithMoreMemory) case _ => @@ -1330,63 +1373,6 @@ trait StandardAsyncExecutionActor } } - // helper function for handleExecutionResult : get the exit code of the job. - def JobExitCode: Future[String] = { - - // read if the file exists - def readRCFile(fileExists: Boolean): Future[String] = { - if (fileExists) - asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false) - else { - jobLogger.warn("RC file not found. Setting job to failed & retry.") - //Thread.sleep(300000) - Future("1") - } - } - //finally : assign the yielded variable - for { - fileExists <- asyncIo.existsAsync(jobPaths.returnCode) - jobRC <- readRCFile(fileExists) - } yield jobRC - } - - // helper function for handleExecutionResult : get the memory retry code. - def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = { - // job is used in aws override version. use here to prevent compilation error. - log.debug(s"Looking for memoryRetry in job '${job.jobId}'") - // convert int to boolean - def returnCodeAsBoolean(codeAsOption: Option[String]): Boolean = { - codeAsOption match { - case Some(codeAsString) => - Try(codeAsString.trim.toInt) match { - case Success(code) => code match { - case StderrContainsRetryKeysCode => true - case _ => false - } - case Failure(e) => - log.error(s"'CheckingForMemoryRetry' action exited with code '$codeAsString' which couldn't be " + - s"converted to an Integer. Task will not be retried with more memory. Error: ${ExceptionUtils.getMessage(e)}") - false - } - case None => false - } - } - // read if the file exists - def readMemoryRetryRCFile(fileExists: Boolean): Future[Option[String]] = { - if (fileExists) - asyncIo.contentAsStringAsync(jobPaths.memoryRetryRC, None, failOnOverflow = false).map(Option(_)) - else - Future.successful(None) - } - //finally : assign the yielded variable - for { - fileExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC) - retryCheckRCAsOption <- readMemoryRetryRCFile(fileExists) - retryWithMoreMemory = returnCodeAsBoolean(retryCheckRCAsOption) - } yield retryWithMoreMemory - } - - /** * Send the job id of the running job to the key value store. * diff --git a/engine/src/main/scala/cromwell/engine/io/RetryableRequestSupport.scala b/engine/src/main/scala/cromwell/engine/io/RetryableRequestSupport.scala index 9efa3e7c767..03ef9224b7d 100644 --- a/engine/src/main/scala/cromwell/engine/io/RetryableRequestSupport.scala +++ b/engine/src/main/scala/cromwell/engine/io/RetryableRequestSupport.scala @@ -26,7 +26,7 @@ object RetryableRequestSupport { case _: SocketException => true case _: SocketTimeoutException => true case ioE: IOException if Option(ioE.getMessage).exists(_.contains("Error getting access token for service account")) => true - case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE) || isAws504(ioE) + case ioE: IOException => isGcs500(ioE) || isGcs503(ioE) || isGcs504(ioE) case other => // Infinitely retryable is a subset of retryable isInfinitelyRetryable(other) @@ -86,28 +86,4 @@ object RetryableRequestSupport { msg.contains("504 Gateway Timeout") ) } - // AWS timeout error - def isAws504(failure: Throwable): Boolean = { - Option(failure.getMessage).exists(msg => - ( - // timeout in reading form s3. - msg.contains("Could not read from s3") && - msg.contains("Timeout waiting for connection") - ) || ( - // reading in cromwell wdl (read_lines() etc) - msg.contains("Failed to evaluate") && - msg.contains("s3://") && - msg.contains("Timed out after") - ) - ) - } - // General AWS IO error : all items unreadable except rc.txt files (might be missing) - // => mainly for testing. will retry mis-specified s3 paths as well... - def isAwsIO(failure:Throwable): Boolean = { - Option(failure.getMessage).exists(msg => - msg.contains("Could not read from s3") && - ! msg.contains("-rc.txt") - ) - } - } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index a534ae2b8da..34334449587 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -57,10 +57,7 @@ import cromwell.filesystems.s3.S3Path import cromwell.filesystems.s3.batch.S3BatchCommandBuilder import cromwell.services.keyvalue.KvClient import org.slf4j.{Logger, LoggerFactory} -import software.amazon.awssdk.services.batch.BatchClient -//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse} -import software.amazon.awssdk.services.batch.model._ - +import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse} import wom.callable.Callable.OutputDefinition import wom.core.FullyQualifiedName import wom.expression.NoIoFunctionSet @@ -187,13 +184,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Option(configuration.awsAuth), configuration.fsxMntPoint) } - - // setup batch client to query job container info - lazy val batchClient: BatchClient = { - val builder = BatchClient.builder() - configureClient(builder, batchJob.optAwsAuthMode, batchJob.configRegion) - } - /* Tries to abort the job in flight * * @param job A StandardAsyncJob object (has jobId value) to cancel @@ -524,31 +514,6 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } yield guaranteedAnswer } - // new OOM detection - override def memoryRetryRC(job: StandardAsyncJob): Future[Boolean] = Future { - Log.debug(s"Looking for memoryRetry in job '${job.jobId}'") - val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build) - val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'")) - val nrAttempts = jobDetail.attempts.size - val lastattempt = jobDetail.attempts.get(nrAttempts-1) - val containerRC = lastattempt.container.exitCode - // if not zero => get reason, else set retry to false. - containerRC.toString() match { - case "0" => - Log.debug("container exit code was zero") - false - case _ => - val containerStatusReason = lastattempt.container.reason - Log.warn(s"Job failed with Container status reason : '${containerStatusReason}'") - val RetryMemoryKeys = memoryRetryErrorKeys.toList.flatten - val retry = RetryMemoryKeys.exists(containerStatusReason.contains) - Log.debug(s"Retry job based on provided keys : '${retry}'") - retry - } - - - } - // Despite being a "runtime" exception, BatchExceptions for 429 (too many requests) are *not* fatal: override def isFatal(throwable: Throwable): Boolean = throwable match { case be: BatchException => !be.getMessage.contains("Status Code: 429") diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 757c7ab7b87..e79a948377b 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -171,10 +171,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | fi | # copy | $awsCmd s3 cp --no-progress "$$s3_path" "$$destination" || - | { echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue; } + | ( echo "attempt $$i to copy $$s3_path failed" sleep $$((7 * "$$i")) && continue) | # check data integrity | _check_data_integrity $$destination $$s3_path || - | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | (echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue) | # copy succeeded | break | done @@ -203,18 +203,18 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | destination=$${destination%/} | # glob directory. do recursive copy | $awsCmd s3 cp --no-progress $$local_path $$destination --recursive --exclude "cromwell_glob_control_file" || - | { echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | ( echo "attempt $$i to copy globDir $$local_path failed" && sleep $$((7 * "$$i")) && continue) | # check integrity for each of the files | for FILE in $$(cd $$local_path ; ls | grep -v cromwell_glob_control_file); do | _check_data_integrity $$local_path/$$FILE $$destination/$$FILE || - | { echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2; } + | ( echo "data content length difference detected in attempt $$i to copy $$local_path/$$FILE failed" && sleep $$((7 * "$$i")) && continue 2) | done | else | $awsCmd s3 cp --no-progress "$$local_path" "$$destination" || - | { echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | ( echo "attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue) | # check content length for data integrity | _check_data_integrity $$local_path $$destination || - | { echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue; } + | ( echo "data content length difference detected in attempt $$i to copy $$local_path failed" && sleep $$((7 * "$$i")) && continue) | fi | # copy succeeded | break @@ -235,10 +235,10 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL | exit 1 | fi | s3_content_length=$$($awsCmd s3api head-object --bucket "$$bucket" --key "$$key" --query 'ContentLength') || - | { echo "Attempt to get head of object failed for $$s3_path." && return 1 ; } + | ( echo "Attempt to get head of object failed for $$s3_path." && return 1 ) | # local | local_content_length=$$(LC_ALL=C ls -dn -- "$$local_path" | awk '{print $$5; exit}' ) || - | { echo "Attempt to get local content length failed for $$_local_path." && return 1; } + | ( echo "Attempt to get local content length failed for $$_local_path." && return 1 ) | # compare | if [[ "$$s3_content_length" -eq "$$local_content_length" ]]; then | true @@ -277,7 +277,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL s""" |touch ${output.name} |_s3_delocalize_with_retry ${output.name} ${output.s3key} - |if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory ; fi""".stripMargin + |if [ -e $globDirectory ]; then _s3_delocalize_with_retry $globDirectory $s3GlobOutDirectory "; fi""".stripMargin case output: AwsBatchFileOutput if output.s3key.startsWith("s3://") && output.mount.mountPoint.pathAsString == AwsBatchWorkingDisk.MountPoint.pathAsString => diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md index df83472bdc2..62a0495c1be 100644 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/README.md @@ -133,48 +133,6 @@ Notes: - local hashing means that all used containers are pulled. Make sure you have enough storage - enable a database to make the cache persistent over cromwell restarts -### Retry with more memory - -Cromwell can be configured to retry jobs with more allocated memory, under a defined set of conditions. To enable this, set the following parameters: - -cromwell configuration: `cromwell.config`: -``` -// set the maximal amount of retries. -// backend.providers.AWSBatch.config.default-runtime-attribues.maxRetries -backend { - providers { - AWSBatch { - config { - default-runtime-attributes { - maxRetries: 6 - } - } - } - } -} - -// set the keys for Out-Of-Memory killing. -// system.io.memory-retry-error-keys -system{ - io{ - memory-retry-error-keys = ["OutOfMemory","Killed"] - } -} -``` - -Workflow specific runtime options : `workflow_options.json`: -``` -{ - "memory_retry_multiplier" : 1.5 -} -``` - -When providing the options.json file during workflow submission, jobs that were terminated due to insufficient memory will be retried 6 times, with increasing memory allocation. For example 4Gb => 6Gb => 9Gb => 13.5Gb => ... - -Note: Retries of jobs using the `awsBatchRetryAttempts` counter do *not* increase memory allocation. - - - AWS Batch ---------