Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1625 Quota retry #7439

Merged
merged 17 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional

The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information.

### Improved handling of Life Sciences API quota errors

Users reported cases where Life Sciences jobs failed due to insufficient quota, instead of queueing and waiting until
quota is available (which is the expected behavior). Cromwell will now retry under these conditions, which present with errors
such as "PAPI error code 9", "no available zones", and/or "quota too low".

## 87 Release Notes

### GCP Batch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: quota_fail_retry
testFormat: workflowfailure
backends: [Papiv2]

files {
workflow: quota_fail_retry/quota_fail_retry.wdl
}

# Adapted from `preemptible_and_memory_retry.test`.
# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
# This functionality is pretty married to PAPI, it doesn't run on `GCPBatch` backend.

metadata {
workflowName: sleepy_sleep
status: Failed
"failures.0.message": "Workflow failed"
"failures.0.causedBy.0.message": "Task sleepy_sleep.sleep:NA:3 failed. The job was stopped before the command finished. PAPI error code 9. Could not start instance custom-12-11264 due to insufficient quota. Cromwell retries exhausted, task failed. Backend info: Execution failed: allocating: selecting resources: selecting region and zone: no available zones: us-west3: 12 CPUS (10/10 available) quota too low"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had previously done the same thing for us-west3 when designing AwaitingCloudQuota

"sleepy_sleep.sleep.-1.1.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.2.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.3.executionStatus": "Failed"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version 1.0

workflow sleepy_sleep {

input {
Int sleep_seconds = 180
}

call sleep {
input: sleep_seconds = sleep_seconds
}

}

task sleep {

input {
Int sleep_seconds
}

meta {
volatile: true
}

# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
runtime {
cpu: 12
docker: "ubuntu:latest"
zones: "us-west3-a us-west3-b us-west3-c"
}

command <<<
sleep ~{sleep_seconds};
ls -la
>>>
}
5 changes: 4 additions & 1 deletion cromwell.example.backends/PAPIv2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# of cromwell.example.backends/cromwell.examples.conf in the root of the repository.
# You should uncomment lines that you want to define, and read carefully to customize
# the file. If you have any questions, please open an issue at
# https://broadworkbench.atlassian.net/projects/BA/issues
# https://github.com/broadinstitute/cromwell/issues

# Documentation
# https://cromwell.readthedocs.io/en/stable/backends/Google/
Expand Down Expand Up @@ -61,6 +61,9 @@ backend {
# Defaults to 7 days; max 30 days
# pipeline-timeout = 7 days

# Cromwell will retry jobs that fail with a quota signature, such as "PAPI error code 9", "no available zones", and/or "quota too low".
quota-attempts = 20

genomics {
# A reference to an auth defined in the `google` stanza at the top. This auth is used to create
# Pipelines and manipulate auth JSONs.
Expand Down
16 changes: 16 additions & 0 deletions docs/backends/Google.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,22 @@ backend.providers.PAPIv2.config {
}
```

#### Quota retry

Typically, Life Sciences API is designed to accept all jobs sent to it and respond to quota exhaustion
by queueing jobs internally. However, there are cases when jobs fail instead of queueing, with messages such
as "PAPI error code 9", "no available zones", and/or "quota too low".

The following configuration permits Cromwell to detect and retry these failures. Proactively monitoring
and raising quota is still recommended.

```hocon
backend.providers.PAPIv2.config {
# Counts attempts (total jobs) not just retries after to the first
quota-attempts: 20
}
```

**Enabling FUSE capabilities**

*This is a community contribution and not officially supported by the Cromwell team.*
Expand Down
2 changes: 2 additions & 0 deletions src/ci/resources/gcp_batch_provider_config.inc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ root = "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci"
maximum-polling-interval = 600
concurrent-job-limit = 1000

quota-attempts: 3
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nope, good call. This is a leftover from when I was spamming the key everywhere because I couldn't get it to read and thought maybe my instance was reading the wrong config.


batch {
auth = "service_account"
location = "us-central1"
Expand Down
2 changes: 2 additions & 0 deletions src/ci/resources/papi_v2beta_provider_config.inc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ root = "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci"
maximum-polling-interval = 600
concurrent-job-limit = 1000

quota-attempts: 3

genomics {
auth = "service_account"
location = "us-central1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

object PipelinesApiAsyncBackendJobExecutionActor {
Expand Down Expand Up @@ -103,6 +104,7 @@
}

new Exception(s"Task $jobTag failed. $returnCodeMessage PAPI error code ${errorCode.getCode.value}. $message")
with NoStackTrace
}
}

Expand Down Expand Up @@ -165,7 +167,7 @@
override lazy val dockerImageUsed: Option[String] = Option(jobDockerImage)

override lazy val preemptible: Boolean = previousRetryReasons match {
case Valid(PreviousRetryReasons(p, _)) => p < maxPreemption
case Valid(PreviousRetryReasons(p, _, _)) => p < maxPreemption
case _ => false
}

Expand Down Expand Up @@ -891,6 +893,7 @@
runStatus match {
case preemptedStatus: RunStatus.Preempted if preemptible => handlePreemption(preemptedStatus, returnCode)
case _: RunStatus.Cancelled => AbortedExecutionHandle
case quotaFailedStatus: RunStatus.QuotaFailed => handleQuotaFailedStatus(quotaFailedStatus)
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode)
case unknown =>
throw new RuntimeException(
Expand All @@ -901,13 +904,23 @@
}
}

private def nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p: Int, ur: Int): Seq[KvPair] =
/**
*
* @param p Preemption count
* @param ur Unexpected Retry count
* @param q Quota count
* @return KV sequence ready to be saved for the next attempt
*/
private def nextAttemptRetryKvPairs(p: Int, ur: Int, q: Int): Seq[KvPair] =
Seq(
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.unexpectedRetryCountKey),
ur.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.preemptionCountKey),
p.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.quotaRetryCountKey),
q.toString
)
)

Expand All @@ -917,11 +930,11 @@
): ExecutionHandle = {
val msg = s"Retrying. $errorMessage"
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>

Check warning on line 933 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L933

Added line #L933 was not covered by tests
val thisUnexpectedRetry = ur + 1
if (thisUnexpectedRetry <= maxUnexpectedRetries) {
val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p, thisUnexpectedRetry)
nextAttemptRetryKvPairs(p, thisUnexpectedRetry, q)

Check warning on line 937 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L937

Added line #L937 was not covered by tests
// Increment unexpected retry count and preemption count stays the same
FailedRetryableExecutionHandle(
StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error),
Expand All @@ -944,19 +957,62 @@
}
}

private def handleQuotaFailedStatus(runStatus: RunStatus.QuotaFailed): ExecutionHandle = {

val machineType = runStatus.machineType.map(mt => s"$mt ").getOrElse("")
val baseMsg = s"Could not start instance ${machineType}due to insufficient quota."

previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisQuotaFailure = q + 1
val nextKvPairs = nextAttemptRetryKvPairs(p, ur, thisQuotaFailure)

if (thisQuotaFailure < pipelinesConfiguration.papiAttributes.quotaAttempts) {
val retryFlavor =
s"$baseMsg Cromwell will automatically retry the task. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, retryFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedRetryableExecutionHandle(
exception,
None,
Option(nextKvPairs)
)
} else {
val nopeFlavor =
s"$baseMsg Cromwell retries exhausted, task failed. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(
StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error),
None,
None
)
}
case Invalid(_) =>

Check warning on line 991 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L991

Added line #L991 was not covered by tests
val otherMsg = s"$baseMsg Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, otherMsg, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(

Check warning on line 995 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L993-L995

Added lines #L993 - L995 were not covered by tests
exception,
None,
None

Check warning on line 998 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L997-L998

Added lines #L997 - L998 were not covered by tests
)
}
}

private def handlePreemption(runStatus: RunStatus.Preempted, jobReturnCode: Option[Int]): ExecutionHandle = {
import common.numeric.IntegerUtil._

val errorCode: Status = runStatus.errorCode
val prettyPrintedError: String = runStatus.prettyPrintedError
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisPreemption = p + 1
val taskName = s"${workflowDescriptor.id}:${call.localName}"
val baseMsg = s"Task $taskName was preempted for the ${thisPreemption.toOrdinal} time."

val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(thisPreemption, ur)
nextAttemptRetryKvPairs(thisPreemption, ur, q)
if (thisPreemption < maxPreemption) {
// Increment preemption count and unexpectedRetryCount stays the same
val msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.StrictLogging
import cromwell.backend._
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
robustBuildAttributes,
unexpectedRetryCountKey
}
Expand All @@ -32,7 +33,8 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
protected def requiredBackendSingletonActor(serviceRegistryActor: ActorRef): Props
protected val jesConfiguration: PipelinesApiConfiguration

override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey, unexpectedRetryCountKey)
override val requestedKeyValueStoreKeys: Seq[String] =
Seq(preemptionCountKey, unexpectedRetryCountKey, quotaRetryCountKey)

protected val googleConfig: GoogleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig)

Expand Down Expand Up @@ -125,6 +127,7 @@ abstract class PipelinesApiBackendLifecycleActorFactory(
object PipelinesApiBackendLifecycleActorFactory extends StrictLogging {
val preemptionCountKey = "PreemptionCount"
val unexpectedRetryCountKey = "UnexpectedRetryCount"
val quotaRetryCountKey = "QuotaRetryCount"

private[common] def robustBuildAttributes(buildAttributes: () => PipelinesApiConfigurationAttributes,
maxAttempts: Int = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.backend.google.pipelines.common
import com.typesafe.config.Config
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.google.pipelines.common.api.PipelinesApiFactoryInterface
import cromwell.backend.google.pipelines.common.authentication.{PipelinesApiAuths, PipelinesApiDockerCredentials}
import cromwell.backend.google.pipelines.common.authentication.PipelinesApiDockerCredentials
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.BackendDockerConfiguration
import net.ceedubs.ficus.Ficus._
Expand All @@ -17,7 +17,6 @@ class PipelinesApiConfiguration(val configurationDescriptor: BackendConfiguratio
val papiAttributes: PipelinesApiConfigurationAttributes
) extends DefaultJsonProtocol {

val jesAuths: PipelinesApiAuths = papiAttributes.auths
val root: String = configurationDescriptor.backendConfig.getString("root")
val pipelineTimeout: FiniteDuration = papiAttributes.pipelineTimeout
val runtimeConfig: Option[Config] = configurationDescriptor.backendRuntimeAttributesConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ case class PipelinesApiConfigurationAttributes(
cacheHitDuplicationStrategy: PipelinesCacheHitDuplicationStrategy,
requestWorkers: Int Refined Positive,
pipelineTimeout: FiniteDuration,
quotaAttempts: Int,
logFlushPeriod: Option[FiniteDuration],
gcsTransferConfiguration: GcsTransferConfiguration,
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
Expand Down Expand Up @@ -86,6 +87,9 @@ object PipelinesApiConfigurationAttributes

val checkpointingIntervalKey = "checkpointing-interval"

/**
* Used to screen & warn about unexpected keys
*/
private val papiKeys = CommonBackendConfigurationAttributes.commonValidConfigurationAttributeKeys ++ Set(
"project",
"root",
Expand All @@ -108,6 +112,7 @@ object PipelinesApiConfigurationAttributes
"concurrent-job-limit",
"request-workers",
"pipeline-timeout",
"quota-attempts",
"batch-requests.timeouts.read",
"batch-requests.timeouts.connect",
"default-runtime-attributes.bootDiskSizeGb",
Expand Down Expand Up @@ -223,6 +228,8 @@ object PipelinesApiConfigurationAttributes

val pipelineTimeout: FiniteDuration = backendConfig.getOrElse("pipeline-timeout", 7.days)

val quotaAttempts: Int = backendConfig.as[Option[Int]]("quota-attempts").getOrElse(20)

val logFlushPeriod: Option[FiniteDuration] = backendConfig.as[Option[FiniteDuration]]("log-flush-period") match {
case Some(duration) if duration.isFinite => Option(duration)
// "Inf" disables upload
Expand Down Expand Up @@ -317,6 +324,7 @@ object PipelinesApiConfigurationAttributes
cacheHitDuplicationStrategy = cacheHitDuplicationStrategy,
requestWorkers = requestWorkers,
pipelineTimeout = pipelineTimeout,
quotaAttempts = quotaAttempts,
logFlushPeriod = logFlushPeriod,
gcsTransferConfiguration = gcsTransferConfiguration,
virtualPrivateCloudConfiguration = virtualPrivateCloudConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,36 @@
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
unexpectedRetryCountKey
}
import cromwell.services.keyvalue.KeyValueServiceActor._

import scala.util.{Failure, Success, Try}

case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int)
case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int, quota: Int)

object PreviousRetryReasons {

def tryApply(prefetchedKvEntries: Map[String, KvResponse], attemptNumber: Int): ErrorOr[PreviousRetryReasons] = {
val validatedPreemptionCount = validatedKvResponse(prefetchedKvEntries.get(preemptionCountKey), preemptionCountKey)
val validatedUnexpectedRetryCount =
validatedKvResponse(prefetchedKvEntries.get(unexpectedRetryCountKey), unexpectedRetryCountKey)
val validatedQuotaRetryCount = validatedKvResponse(prefetchedKvEntries.get(quotaRetryCountKey), quotaRetryCountKey)

(validatedPreemptionCount, validatedUnexpectedRetryCount) mapN PreviousRetryReasons.apply
(validatedPreemptionCount, validatedUnexpectedRetryCount, validatedQuotaRetryCount) mapN PreviousRetryReasons.apply
}

def apply(knownPreemptedCount: Int, knownUnexpectedRetryCount: Int, attempt: Int): PreviousRetryReasons = {
def apply(knownPreemptedCount: Int,
knownUnexpectedRetryCount: Int,
quotaCount: Int,
attempt: Int
): PreviousRetryReasons = {
// If we have anything unaccounted for, we can top up the unexpected retry count.
// NB: 'attempt' is 1-indexed, so, magic number:
// NB2: for sanity's sake, I won't let this unaccounted for drop below 0, just in case...
val unaccountedFor = Math.max(attempt - 1 - knownPreemptedCount - knownUnexpectedRetryCount, 0)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor, quotaCount)

Check warning on line 37 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala#L37

Added line #L37 was not covered by tests
}

private def validatedKvResponse(r: Option[KvResponse], fromKey: String): ErrorOr[Int] = r match {
Expand Down
Loading
Loading