Skip to content

Commit

Permalink
WX-1625 Quota retry (#7439)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored May 20, 2024
1 parent f111395 commit f2b2c30
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional

The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information.

### Improved handling of Life Sciences API quota errors

Users reported cases where Life Sciences jobs failed due to insufficient quota, instead of queueing and waiting until
quota is available (which is the expected behavior). Cromwell will now retry under these conditions, which present with errors
such as "PAPI error code 9", "no available zones", and/or "quota too low".

## 87 Release Notes

### GCP Batch
Expand Down
21 changes: 21 additions & 0 deletions centaur/src/main/resources/standardTestCases/quota_fail_retry.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: quota_fail_retry
testFormat: workflowfailure
backends: [Papiv2]

files {
workflow: quota_fail_retry/quota_fail_retry.wdl
}

# Adapted from `preemptible_and_memory_retry.test`.
# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
# This functionality is pretty married to PAPI, it doesn't run on `GCPBatch` backend.

metadata {
workflowName: sleepy_sleep
status: Failed
"failures.0.message": "Workflow failed"
"failures.0.causedBy.0.message": "Task sleepy_sleep.sleep:NA:3 failed. The job was stopped before the command finished. PAPI error code 9. Could not start instance custom-12-11264 due to insufficient quota. Cromwell retries exhausted, task failed. Backend info: Execution failed: allocating: selecting resources: selecting region and zone: no available zones: us-west3: 12 CPUS (10/10 available) quota too low"
"sleepy_sleep.sleep.-1.1.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.2.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.3.executionStatus": "Failed"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version 1.0

workflow sleepy_sleep {

input {
Int sleep_seconds = 180
}

call sleep {
input: sleep_seconds = sleep_seconds
}

}

task sleep {

input {
Int sleep_seconds
}

meta {
volatile: true
}

# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
runtime {
cpu: 12
docker: "ubuntu:latest"
zones: "us-west3-a us-west3-b us-west3-c"
}

command <<<
sleep ~{sleep_seconds};
ls -la
>>>
}
5 changes: 4 additions & 1 deletion cromwell.example.backends/PAPIv2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# of cromwell.example.backends/cromwell.examples.conf in the root of the repository.
# You should uncomment lines that you want to define, and read carefully to customize
# the file. If you have any questions, please open an issue at
# https://broadworkbench.atlassian.net/projects/BA/issues
# https://github.com/broadinstitute/cromwell/issues

# Documentation
# https://cromwell.readthedocs.io/en/stable/backends/Google/
Expand Down Expand Up @@ -61,6 +61,9 @@ backend {
# Defaults to 7 days; max 30 days
# pipeline-timeout = 7 days

# Cromwell will retry jobs that fail with a quota signature, such as "PAPI error code 9", "no available zones", and/or "quota too low".
quota-attempts = 20

genomics {
# A reference to an auth defined in the `google` stanza at the top. This auth is used to create
# Pipelines and manipulate auth JSONs.
Expand Down
16 changes: 16 additions & 0 deletions docs/backends/Google.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,22 @@ backend.providers.PAPIv2.config {
}
```

#### Quota retry

Typically, Life Sciences API is designed to accept all jobs sent to it and respond to quota exhaustion
by queueing jobs internally. However, there are cases when jobs fail instead of queueing, with messages such
as "PAPI error code 9", "no available zones", and/or "quota too low".

The following configuration permits Cromwell to detect and retry these failures. Proactively monitoring
and raising quota is still recommended.

```hocon
backend.providers.PAPIv2.config {
# Counts attempts (total jobs) not just retries after to the first
quota-attempts: 20
}
```

**Enabling FUSE capabilities**

*This is a community contribution and not officially supported by the Cromwell team.*
Expand Down
2 changes: 2 additions & 0 deletions src/ci/resources/papi_v2beta_provider_config.inc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ root = "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci"
maximum-polling-interval = 600
concurrent-job-limit = 1000

quota-attempts: 3

genomics {
auth = "service_account"
location = "us-central1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import wom.values._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

object PipelinesApiAsyncBackendJobExecutionActor {
Expand Down Expand Up @@ -103,6 +104,7 @@ object PipelinesApiAsyncBackendJobExecutionActor {
}

new Exception(s"Task $jobTag failed. $returnCodeMessage PAPI error code ${errorCode.getCode.value}. $message")
with NoStackTrace
}
}

Expand Down Expand Up @@ -165,7 +167,7 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
override lazy val dockerImageUsed: Option[String] = Option(jobDockerImage)

override lazy val preemptible: Boolean = previousRetryReasons match {
case Valid(PreviousRetryReasons(p, _)) => p < maxPreemption
case Valid(PreviousRetryReasons(p, _, _)) => p < maxPreemption
case _ => false
}

Expand Down Expand Up @@ -891,6 +893,7 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
runStatus match {
case preemptedStatus: RunStatus.Preempted if preemptible => handlePreemption(preemptedStatus, returnCode)
case _: RunStatus.Cancelled => AbortedExecutionHandle
case quotaFailedStatus: RunStatus.QuotaFailed => handleQuotaFailedStatus(quotaFailedStatus)
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode)
case unknown =>
throw new RuntimeException(
Expand All @@ -901,13 +904,23 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
}
}

private def nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p: Int, ur: Int): Seq[KvPair] =
/**
*
* @param p Preemption count
* @param ur Unexpected Retry count
* @param q Quota count
* @return KV sequence ready to be saved for the next attempt
*/
private def nextAttemptRetryKvPairs(p: Int, ur: Int, q: Int): Seq[KvPair] =
Seq(
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.unexpectedRetryCountKey),
ur.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.preemptionCountKey),
p.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.quotaRetryCountKey),
q.toString
)
)

Expand All @@ -917,11 +930,11 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
): ExecutionHandle = {
val msg = s"Retrying. $errorMessage"
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisUnexpectedRetry = ur + 1
if (thisUnexpectedRetry <= maxUnexpectedRetries) {
val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p, thisUnexpectedRetry)
nextAttemptRetryKvPairs(p, thisUnexpectedRetry, q)
// Increment unexpected retry count and preemption count stays the same
FailedRetryableExecutionHandle(
StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error),
Expand All @@ -944,19 +957,62 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta
}
}

private def handleQuotaFailedStatus(runStatus: RunStatus.QuotaFailed): ExecutionHandle = {

val machineType = runStatus.machineType.map(mt => s"$mt ").getOrElse("")
val baseMsg = s"Could not start instance ${machineType}due to insufficient quota."

previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisQuotaFailure = q + 1
val nextKvPairs = nextAttemptRetryKvPairs(p, ur, thisQuotaFailure)

if (thisQuotaFailure < pipelinesConfiguration.papiAttributes.quotaAttempts) {
val retryFlavor =
s"$baseMsg Cromwell will automatically retry the task. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, retryFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedRetryableExecutionHandle(
exception,
None,
Option(nextKvPairs)
)
} else {
val nopeFlavor =
s"$baseMsg Cromwell retries exhausted, task failed. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(
StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error),
None,
None
)
}
case Invalid(_) =>
val otherMsg = s"$baseMsg Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, otherMsg, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(
exception,
None,
None
)
}
}

private def handlePreemption(runStatus: RunStatus.Preempted, jobReturnCode: Option[Int]): ExecutionHandle = {
import common.numeric.IntegerUtil._

val errorCode: Status = runStatus.errorCode
val prettyPrintedError: String = runStatus.prettyPrintedError
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisPreemption = p + 1
val taskName = s"${workflowDescriptor.id}:${call.localName}"
val baseMsg = s"Task $taskName was preempted for the ${thisPreemption.toOrdinal} time."

val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(thisPreemption, ur)
nextAttemptRetryKvPairs(thisPreemption, ur, q)
if (thisPreemption < maxPreemption) {
// Increment preemption count and unexpectedRetryCount stays the same
val msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.StrictLogging
import cromwell.backend._
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
robustBuildAttributes,
unexpectedRetryCountKey
}
Expand All @@ -32,7 +33,8 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
protected def requiredBackendSingletonActor(serviceRegistryActor: ActorRef): Props
protected val jesConfiguration: PipelinesApiConfiguration

override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey, unexpectedRetryCountKey)
override val requestedKeyValueStoreKeys: Seq[String] =
Seq(preemptionCountKey, unexpectedRetryCountKey, quotaRetryCountKey)

protected val googleConfig: GoogleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig)

Expand Down Expand Up @@ -125,6 +127,7 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
object PipelinesApiBackendLifecycleActorFactory extends StrictLogging {
val preemptionCountKey = "PreemptionCount"
val unexpectedRetryCountKey = "UnexpectedRetryCount"
val quotaRetryCountKey = "QuotaRetryCount"

private[common] def robustBuildAttributes(buildAttributes: () => PipelinesApiConfigurationAttributes,
maxAttempts: Int = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.backend.google.pipelines.common
import com.typesafe.config.Config
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.google.pipelines.common.api.PipelinesApiFactoryInterface
import cromwell.backend.google.pipelines.common.authentication.{PipelinesApiAuths, PipelinesApiDockerCredentials}
import cromwell.backend.google.pipelines.common.authentication.PipelinesApiDockerCredentials
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.BackendDockerConfiguration
import net.ceedubs.ficus.Ficus._
Expand All @@ -17,7 +17,6 @@ class PipelinesApiConfiguration(val configurationDescriptor: BackendConfiguratio
val papiAttributes: PipelinesApiConfigurationAttributes
) extends DefaultJsonProtocol {

val jesAuths: PipelinesApiAuths = papiAttributes.auths
val root: String = configurationDescriptor.backendConfig.getString("root")
val pipelineTimeout: FiniteDuration = papiAttributes.pipelineTimeout
val runtimeConfig: Option[Config] = configurationDescriptor.backendRuntimeAttributesConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ case class PipelinesApiConfigurationAttributes(
cacheHitDuplicationStrategy: PipelinesCacheHitDuplicationStrategy,
requestWorkers: Int Refined Positive,
pipelineTimeout: FiniteDuration,
quotaAttempts: Int,
logFlushPeriod: Option[FiniteDuration],
gcsTransferConfiguration: GcsTransferConfiguration,
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
Expand Down Expand Up @@ -86,6 +87,9 @@ object PipelinesApiConfigurationAttributes

val checkpointingIntervalKey = "checkpointing-interval"

/**
* Used to screen & warn about unexpected keys
*/
private val papiKeys = CommonBackendConfigurationAttributes.commonValidConfigurationAttributeKeys ++ Set(
"project",
"root",
Expand All @@ -108,6 +112,7 @@ object PipelinesApiConfigurationAttributes
"concurrent-job-limit",
"request-workers",
"pipeline-timeout",
"quota-attempts",
"batch-requests.timeouts.read",
"batch-requests.timeouts.connect",
"default-runtime-attributes.bootDiskSizeGb",
Expand Down Expand Up @@ -223,6 +228,8 @@ object PipelinesApiConfigurationAttributes

val pipelineTimeout: FiniteDuration = backendConfig.getOrElse("pipeline-timeout", 7.days)

val quotaAttempts: Int = backendConfig.as[Option[Int]]("quota-attempts").getOrElse(20)

val logFlushPeriod: Option[FiniteDuration] = backendConfig.as[Option[FiniteDuration]]("log-flush-period") match {
case Some(duration) if duration.isFinite => Option(duration)
// "Inf" disables upload
Expand Down Expand Up @@ -317,6 +324,7 @@ object PipelinesApiConfigurationAttributes
cacheHitDuplicationStrategy = cacheHitDuplicationStrategy,
requestWorkers = requestWorkers,
pipelineTimeout = pipelineTimeout,
quotaAttempts = quotaAttempts,
logFlushPeriod = logFlushPeriod,
gcsTransferConfiguration = gcsTransferConfiguration,
virtualPrivateCloudConfiguration = virtualPrivateCloudConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,36 @@ import cats.syntax.validated._
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
unexpectedRetryCountKey
}
import cromwell.services.keyvalue.KeyValueServiceActor._

import scala.util.{Failure, Success, Try}

case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int)
case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int, quota: Int)

object PreviousRetryReasons {

def tryApply(prefetchedKvEntries: Map[String, KvResponse], attemptNumber: Int): ErrorOr[PreviousRetryReasons] = {
val validatedPreemptionCount = validatedKvResponse(prefetchedKvEntries.get(preemptionCountKey), preemptionCountKey)
val validatedUnexpectedRetryCount =
validatedKvResponse(prefetchedKvEntries.get(unexpectedRetryCountKey), unexpectedRetryCountKey)
val validatedQuotaRetryCount = validatedKvResponse(prefetchedKvEntries.get(quotaRetryCountKey), quotaRetryCountKey)

(validatedPreemptionCount, validatedUnexpectedRetryCount) mapN PreviousRetryReasons.apply
(validatedPreemptionCount, validatedUnexpectedRetryCount, validatedQuotaRetryCount) mapN PreviousRetryReasons.apply
}

def apply(knownPreemptedCount: Int, knownUnexpectedRetryCount: Int, attempt: Int): PreviousRetryReasons = {
def apply(knownPreemptedCount: Int,
knownUnexpectedRetryCount: Int,
quotaCount: Int,
attempt: Int
): PreviousRetryReasons = {
// If we have anything unaccounted for, we can top up the unexpected retry count.
// NB: 'attempt' is 1-indexed, so, magic number:
// NB2: for sanity's sake, I won't let this unaccounted for drop below 0, just in case...
val unaccountedFor = Math.max(attempt - 1 - knownPreemptedCount - knownUnexpectedRetryCount, 0)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor, quotaCount)
}

private def validatedKvResponse(r: Option[KvResponse], fromKey: String): ErrorOr[Int] = r match {
Expand Down
Loading

0 comments on commit f2b2c30

Please sign in to comment.