Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling of very rare early/late job killing #23

Merged
merged 4 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final case class WrongReturnCode(jobTag: String, returnCode: Int, stderrPath: Op
override def getMessage = s"Job $jobTag exited with return code $returnCode which has not been declared as a valid return code. See 'continueOnReturnCode' runtime attribute for more details."
}

final case class UnExpectedStatus(jobTag: String, returnCode: Int, jobStatus: String, stderrPath: Option[Path]) extends KnownJobFailureException {
override def getMessage = s"Job $jobTag exited with success code '$returnCode' but failed status '$jobStatus'. Suspecting spot kill and retrying."
}

final case class ReturnCodeIsNotAnInt(jobTag: String, returnCode: String, stderrPath: Option[Path]) extends KnownJobFailureException {
override def getMessage = {
if (returnCode.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ trait StandardAsyncExecutionActor
* to re-do this before sending the response.
*/
private var jobPathsUpdated: Boolean = false
private def updateJobPaths(): Unit = if (!jobPathsUpdated) {
def updateJobPaths(): Unit = if (!jobPathsUpdated) {
// .get's are safe on stdout and stderr after falling back to default names above.
jobPaths.standardPaths = StandardPaths(
output = hostPathFromContainerPath(executionStdout),
Expand Down Expand Up @@ -1162,7 +1162,7 @@ trait StandardAsyncExecutionActor
configurationDescriptor.slowJobWarningAfter foreach { duration => self ! WarnAboutSlownessAfter(handle.pendingJob.jobId, duration) }

tellKvJobId(handle.pendingJob) map { _ =>
if (logJobIds) jobLogger.info(s"job id: ${handle.pendingJob.jobId}")
if (logJobIds) jobLogger.debug(s"job id: ${handle.pendingJob.jobId}")
tellMetadata(Map(CallMetadataKeys.JobId -> handle.pendingJob.jobId))
/*
NOTE: Because of the async nature of the Scala Futures, there is a point in time where we have submitted this or
Expand Down Expand Up @@ -1281,7 +1281,7 @@ trait StandardAsyncExecutionActor
stderrSizeAndReturnCodeAndMemoryRetry flatMap {
case (stderrSize, returnCodeAsString, retryWithMoreMemory) =>
val tryReturnCodeAsInt = Try(returnCodeAsString.trim.toInt)

jobLogger.debug(s"Handling execution Result with status '${status.toString()}' and returnCode ${returnCodeAsString}")
if (isDone(status)) {
tryReturnCodeAsInt match {
// stderr not empty : retry
Expand All @@ -1293,12 +1293,17 @@ trait StandardAsyncExecutionActor
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) && !retryWithMoreMemory =>
jobLogger.debug(s"Job was aborted, code was : '${returnCodeAsString}'")
Future.successful(AbortedExecutionHandle)
// if instance killed after RC.txt creation : edge case with status == Failed AND returnCode == [accepted values] => retry.
case Success(returnCodeAsInt) if status.toString() == "Failed" && continueOnReturnCode.continueFor(returnCodeAsInt) =>
jobLogger.debug(s"Suspected spot kill due to status/RC mismatch")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(UnExpectedStatus(jobDescriptor.key.tag, returnCodeAsInt, status.toString(), stderrAsOption), Option(returnCodeAsInt), None))
retryElseFail(executionHandle)
// job considered ok by accepted exit code
case Success(returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
// job failed on out-of-memory : retry
case Success(returnCodeAsInt) if retryWithMoreMemory =>
jobLogger.warn(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
jobLogger.info(s"Retrying job due to OOM with exit code : '${returnCodeAsString}' ")
val executionHandle = Future.successful(FailedNonRetryableExecutionHandle(RetryWithMoreMemory(jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log), Option(returnCodeAsInt), None))
retryElseFail(executionHandle, retryWithMoreMemory)
// unaccepted return code : retry.
Expand Down Expand Up @@ -1339,8 +1344,7 @@ trait StandardAsyncExecutionActor
if (fileExists)
asyncIo.contentAsStringAsync(jobPaths.returnCode, None, failOnOverflow = false)
else {
jobLogger.warn("RC file not found. Setting job to failed & retry.")
//Thread.sleep(300000)
jobLogger.debug("RC file not found. Setting job to failed.")
Future("1")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class EngineJobExecutionActor(replyTo: ActorRef,
log.info(template, jobTag, data.failedCopyAttempts, callCachingParameters.maxFailedCopyAttempts, data.aggregatedHashString)
} else {
log.info(s"BT-322 {} cache hit copying nomatch: could not find a suitable cache hit.", jobTag)
workflowLogger.info("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag)
workflowLogger.debug("Could not copy a suitable cache hit for {}. No copy attempts were made.", arg = jobTag)
}

runJob(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,38 @@ import java.io.FileNotFoundException
import akka.actor.ActorRef
import akka.pattern.AskSupport
import akka.util.Timeout

import cats.implicits._

import common.exception.MessageAggregation
import common.collections.EnhancedCollections._
import common.util.StringUtil._
import common.validation.Validation._

import cromwell.backend._
import cromwell.backend.async.{ExecutionHandle, PendingExecutionHandle}
import cromwell.backend.async._ //{ExecutionHandle, PendingExecutionHandle}
import cromwell.backend.impl.aws.IntervalLimitedAwsJobSubmitActor.SubmitAwsJobRequest
import cromwell.backend.impl.aws.OccasionalStatusPollingActor.{NotifyOfStatus, WhatsMyStatus}
import cromwell.backend.impl.aws.RunStatus.{Initializing, TerminalRunStatus}
import cromwell.backend.impl.aws.io._
import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.backend.OutputEvaluator._

import cromwell.core._
import cromwell.core.path.Path
import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder, PathFactory}
import cromwell.core.io.{DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.retry.SimpleExponentialBackoff

import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder

import cromwell.services.keyvalue.KvClient

import org.slf4j.{Logger, LoggerFactory}
import software.amazon.awssdk.services.batch.BatchClient
//import software.amazon.awssdk.services.batch.model.{BatchException, SubmitJobResponse}
import software.amazon.awssdk.services.batch.model._

import wom.callable.Callable.OutputDefinition
Expand All @@ -67,7 +77,7 @@ import wom.expression.NoIoFunctionSet
import wom.types.{WomArrayType, WomSingleFileType}
import wom.values._

import scala.concurrent.{Future, Promise}
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
Expand Down Expand Up @@ -510,7 +520,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
case NotifyOfStatus(_, _, Some(value)) =>
Future.successful(value)
case NotifyOfStatus(_, _, None) =>
jobLogger.info("Having to fall back to AWS query for status")
jobLogger.debug("Having to fall back to AWS query for status")
Future.fromTry(job.status(jobId))
case other =>
val message = s"Programmer Error (please report this): Received an unexpected message from the OccasionalPollingActor: $other"
Expand All @@ -536,8 +546,24 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
val describeJobsResponse = batchClient.describeJobs(DescribeJobsRequest.builder.jobs(job.jobId).build)
val jobDetail = describeJobsResponse.jobs.get(0) //OrElse(throw new RuntimeException(s"Could not get job details for job '${job.jobId}'"))
val nrAttempts = jobDetail.attempts.size
val lastattempt = jobDetail.attempts.get(nrAttempts-1)
var containerRC = lastattempt.container.exitCode
// if job is terminated/cancelled before starting, there are no attempts.
val lastattempt =
try {
jobDetail.attempts.get(nrAttempts-1)
} catch {
case _ : Throwable => null
}
if (lastattempt == null ) {
Log.info(s"No attempts were made for job '${job.jobId}'. no memory-related retry needed.")
false
}

var containerRC =
try {
lastattempt.container.exitCode
} catch {
case _ : Throwable => null
}
// if missing, set to failed.
if (containerRC == null ) {
Log.debug(s"No RC found for job '${job.jobId}', most likely a spot kill")
Expand Down Expand Up @@ -621,8 +647,9 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
override def getTerminalEvents(runStatus: RunStatus): Seq[ExecutionEvent] = {
runStatus match {
case successStatus: RunStatus.Succeeded => successStatus.eventList
case unknown =>
throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown")
case unknown => {
throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown")
}
}
}

Expand All @@ -642,4 +669,38 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}
)
}

override def handleExecutionSuccess(runStatus: StandardAsyncRunState,
handle: StandardAsyncPendingExecutionHandle,
returnCode: Int)(implicit ec: ExecutionContext): Future[ExecutionHandle] = {
evaluateOutputs() map {
case ValidJobOutputs(outputs) =>
// Need to make sure the paths are up to date before sending the detritus back in the response
updateJobPaths()
// If instance is terminated while copying stdout/stderr : status is failed while jobs outputs are ok
// => Retryable
if (runStatus.toString().equals("Failed")) {
jobLogger.warn("Got Failed RunStatus for success Execution")

val exception = new MessageAggregation {
override def exceptionContext: String = "Got Failed RunStatus for success Execution"
override def errorMessages: Traversable[String] = Array("Got Failed RunStatus for success Execution")
}
FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None)
} else {
SuccessfulExecutionHandle(outputs, returnCode, jobPaths.detritusPaths, getTerminalEvents(runStatus))
}
case InvalidJobOutputs(errors) =>
val exception = new MessageAggregation {
override def exceptionContext: String = "Failed to evaluate job outputs"
override def errorMessages: Traversable[String] = errors.toList
}
FailedNonRetryableExecutionHandle(exception, kvPairsToSave = None)
case JobOutputsEvaluationException(exception: Exception) if retryEvaluateOutputsAggregated(exception) =>
// Return the execution handle in this case to retry the operation
handle
case JobOutputsEvaluationException(ex) => FailedNonRetryableExecutionHandle(ex, kvPairsToSave = None)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|
| # get the multipart chunk size
| chunk_size=$$(_get_multipart_chunk_size $$local_path)
| echo "chunk size : $$chunk_size bytes"
| local MP_THRESHOLD=${mp_threshold}
| # then set them
| $awsCmd configure set default.s3.multipart_threshold $$MP_THRESHOLD
Expand Down Expand Up @@ -378,7 +377,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
//calls the client to submit the job
def callClient(definitionArn: String, awsBatchAttributes: AwsBatchAttributes): Aws[F, SubmitJobResponse] = {

Log.info(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script")
Log.debug(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script")
Log.info(s"Submitting taskId: $taskId, script: $batch_script")

val submit: F[SubmitJobResponse] =
async.delay(batchClient.submitJob(
Expand Down