Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1625 Quota retry #7439

Merged
merged 17 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional

The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information.

### Improved handling of Life Sciences API quota errors

Users reported cases where Life Sciences jobs failed due to insufficient quota, instead of queueing them and waiting until
quota freed up (which is the expected behavior). Cromwell will now retry under these conditions, which present with errors
such as "PAPI error code 9", "no available zones", and/or "quota too low".

## 87 Release Notes

### GCP Batch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: quota_fail_retry
testFormat: workflowfailure
backends: [Papiv2]

files {
workflow: quota_fail_retry/quota_fail_retry.wdl
}

# Adapted from `preemptible_and_memory_retry.test`.
# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
# This functionality is pretty married to PAPI, it doesn't run on `GCPBatch` backend.

metadata {
workflowName: sleepy_sleep
status: Failed
"failures.0.message": "Workflow failed"
"failures.0.causedBy.0.message": "Task sleepy_sleep.sleep:NA:3 failed. The job was stopped before the command finished. PAPI error code 9. Could not start instance custom-12-11264 due to insufficient quota. Cromwell retries exhausted, task failed. Backend info: Execution failed: allocating: selecting resources: selecting region and zone: no available zones: us-west3: 12 CPUS (10/10 available) quota too low"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had previously done the same thing for us-west3 when designing AwaitingCloudQuota

"sleepy_sleep.sleep.-1.1.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.2.executionStatus": "RetryableFailure"
"sleepy_sleep.sleep.-1.3.executionStatus": "Failed"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version 1.0

workflow sleepy_sleep {

input {
Int sleep_seconds = 180
}

call sleep {
input: sleep_seconds = sleep_seconds
}

}

task sleep {

input {
Int sleep_seconds
}

meta {
volatile: true
}

# I set `broad-dsde-cromwell-dev` to have super low CPU quota in `us-west3` (Salt Lake City) for this test
runtime {
cpu: 12
docker: "ubuntu:latest"
zones: "us-west3-a us-west3-b us-west3-c"
}

command <<<
sleep ~{sleep_seconds};
ls -la
>>>
}
5 changes: 4 additions & 1 deletion cromwell.example.backends/PAPIv2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# of cromwell.example.backends/cromwell.examples.conf in the root of the repository.
# You should uncomment lines that you want to define, and read carefully to customize
# the file. If you have any questions, please open an issue at
# https://broadworkbench.atlassian.net/projects/BA/issues
# https://github.com/broadinstitute/cromwell/issues

# Documentation
# https://cromwell.readthedocs.io/en/stable/backends/Google/
Expand Down Expand Up @@ -61,6 +61,9 @@ backend {
# Defaults to 7 days; max 30 days
# pipeline-timeout = 7 days

# Cromwell will retry jobs that fail with a quota signature, such as "PAPI error code 9", "no available zones", and/or "quota too low".
quota-attempts = 20

genomics {
# A reference to an auth defined in the `google` stanza at the top. This auth is used to create
# Pipelines and manipulate auth JSONs.
Expand Down
16 changes: 16 additions & 0 deletions docs/backends/Google.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,22 @@ backend.providers.PAPIv2.config {
}
```

#### Quota retry

Typically, Life Sciences API is designed to accept all jobs sent to it and respond to quota exhaustion
by queueing jobs internally. However, there are cases when jobs fail instead of queueing, with messages such
as "PAPI error code 9", "no available zones", and/or "quota too low".

The following configuration permits Cromwell to detect and retry these failures. Proactively monitoring
and raising quota is still recommended.

```hocon
backend.providers.PAPIv2.config {
# Counts attempts (total jobs) not just retries after to the first
quota-attempts: 20
}
```

**Enabling FUSE capabilities**

*This is a community contribution and not officially supported by the Cromwell team.*
Expand Down
2 changes: 2 additions & 0 deletions src/ci/resources/gcp_batch_provider_config.inc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ root = "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci"
maximum-polling-interval = 600
concurrent-job-limit = 1000

quota-attempts: 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nope, good call. This is a leftover from when I was spamming the key everywhere because I couldn't get it to read and thought maybe my instance was reading the wrong config.


batch {
auth = "service_account"
location = "us-central1"
Expand Down
2 changes: 2 additions & 0 deletions src/ci/resources/papi_v2beta_provider_config.inc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ root = "gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/ci"
maximum-polling-interval = 600
concurrent-job-limit = 1000

quota-attempts: 3

genomics {
auth = "service_account"
location = "us-central1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

object PipelinesApiAsyncBackendJobExecutionActor {
Expand Down Expand Up @@ -103,6 +104,7 @@
}

new Exception(s"Task $jobTag failed. $returnCodeMessage PAPI error code ${errorCode.getCode.value}. $message")
with NoStackTrace
}
}

Expand Down Expand Up @@ -165,7 +167,7 @@
override lazy val dockerImageUsed: Option[String] = Option(jobDockerImage)

override lazy val preemptible: Boolean = previousRetryReasons match {
case Valid(PreviousRetryReasons(p, _)) => p < maxPreemption
case Valid(PreviousRetryReasons(p, _, _)) => p < maxPreemption
case _ => false
}

Expand Down Expand Up @@ -891,6 +893,7 @@
runStatus match {
case preemptedStatus: RunStatus.Preempted if preemptible => handlePreemption(preemptedStatus, returnCode)
case _: RunStatus.Cancelled => AbortedExecutionHandle
case quotaFailedStatus: RunStatus.QuotaFailed => handleQuotaFailedStatus(quotaFailedStatus)

Check warning on line 896 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L896

Added line #L896 was not covered by tests
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus, returnCode)
case unknown =>
throw new RuntimeException(
Expand All @@ -901,13 +904,23 @@
}
}

private def nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p: Int, ur: Int): Seq[KvPair] =
/**
*
* @param p Preemption count
* @param ur Unexpected Retry count
* @param q Quota count
* @return KV sequence ready to be saved for the next attempt
*/
private def nextAttemptRetryKvPairs(p: Int, ur: Int, q: Int): Seq[KvPair] =
Seq(
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.unexpectedRetryCountKey),
ur.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.preemptionCountKey),
p.toString
),
KvPair(ScopedKey(workflowId, futureKvJobKey, PipelinesApiBackendLifecycleActorFactory.quotaRetryCountKey),
q.toString

Check warning on line 923 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L922-L923

Added lines #L922 - L923 were not covered by tests
)
)

Expand All @@ -917,11 +930,11 @@
): ExecutionHandle = {
val msg = s"Retrying. $errorMessage"
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>

Check warning on line 933 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L933

Added line #L933 was not covered by tests
val thisUnexpectedRetry = ur + 1
if (thisUnexpectedRetry <= maxUnexpectedRetries) {
val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(p, thisUnexpectedRetry)
nextAttemptRetryKvPairs(p, thisUnexpectedRetry, q)

Check warning on line 937 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L937

Added line #L937 was not covered by tests
// Increment unexpected retry count and preemption count stays the same
FailedRetryableExecutionHandle(
StandardException(errorCode, msg, jobTag, jobReturnCode, standardPaths.error),
Expand All @@ -944,19 +957,62 @@
}
}

private def handleQuotaFailedStatus(runStatus: RunStatus.QuotaFailed): ExecutionHandle = {

val machineType = runStatus.machineType.map(mt => s"$mt ").getOrElse("")

Check warning on line 962 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L962

Added line #L962 was not covered by tests
val baseMsg = s"Could not start instance ${machineType}due to insufficient quota."

previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur, q)) =>
val thisQuotaFailure = q + 1
val nextKvPairs = nextAttemptRetryKvPairs(p, ur, thisQuotaFailure)

Check warning on line 968 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L965-L968

Added lines #L965 - L968 were not covered by tests

if (thisQuotaFailure < pipelinesConfiguration.papiAttributes.quotaAttempts) {

Check warning on line 970 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L970

Added line #L970 was not covered by tests
val retryFlavor =
s"$baseMsg Cromwell will automatically retry the task. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, retryFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedRetryableExecutionHandle(

Check warning on line 975 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L973-L975

Added lines #L973 - L975 were not covered by tests
exception,
None,
Option(nextKvPairs)

Check warning on line 978 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L977-L978

Added lines #L977 - L978 were not covered by tests
)
} else {

Check warning on line 980 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L980

Added line #L980 was not covered by tests
val nopeFlavor =
s"$baseMsg Cromwell retries exhausted, task failed. Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(
StandardException(runStatus.errorCode, nopeFlavor, jobTag, None, standardPaths.error),
None,
None

Check warning on line 988 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L983-L988

Added lines #L983 - L988 were not covered by tests
)
}
case Invalid(_) =>

Check warning on line 991 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L991

Added line #L991 was not covered by tests
val otherMsg = s"$baseMsg Backend info: ${runStatus.prettyPrintedError}"
val exception = StandardException(runStatus.errorCode, otherMsg, jobTag, None, standardPaths.error)
jobLogger.info(exception.getMessage)
FailedNonRetryableExecutionHandle(

Check warning on line 995 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L993-L995

Added lines #L993 - L995 were not covered by tests
exception,
None,
None

Check warning on line 998 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L997-L998

Added lines #L997 - L998 were not covered by tests
)
}
}

private def handlePreemption(runStatus: RunStatus.Preempted, jobReturnCode: Option[Int]): ExecutionHandle = {
import common.numeric.IntegerUtil._

val errorCode: Status = runStatus.errorCode
val prettyPrintedError: String = runStatus.prettyPrintedError
previousRetryReasons match {
case Valid(PreviousRetryReasons(p, ur)) =>
case Valid(PreviousRetryReasons(p, ur, q)) =>

Check warning on line 1009 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L1009

Added line #L1009 was not covered by tests
val thisPreemption = p + 1
val taskName = s"${workflowDescriptor.id}:${call.localName}"
val baseMsg = s"Task $taskName was preempted for the ${thisPreemption.toOrdinal} time."

val preemptionAndUnexpectedRetryCountsKvPairs =
nextAttemptPreemptedAndUnexpectedRetryCountsToKvPairs(thisPreemption, ur)
nextAttemptRetryKvPairs(thisPreemption, ur, q)

Check warning on line 1015 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala#L1015

Added line #L1015 was not covered by tests
if (thisPreemption < maxPreemption) {
// Increment preemption count and unexpectedRetryCount stays the same
val msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import cromwell.backend._
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
robustBuildAttributes,
unexpectedRetryCountKey
}
Expand All @@ -32,7 +33,8 @@
protected def requiredBackendSingletonActor(serviceRegistryActor: ActorRef): Props
protected val jesConfiguration: PipelinesApiConfiguration

override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey, unexpectedRetryCountKey)
override val requestedKeyValueStoreKeys: Seq[String] =
Seq(preemptionCountKey, unexpectedRetryCountKey, quotaRetryCountKey)

Check warning on line 37 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiBackendLifecycleActorFactory.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiBackendLifecycleActorFactory.scala#L37

Added line #L37 was not covered by tests

protected val googleConfig: GoogleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig)

Expand Down Expand Up @@ -125,6 +127,7 @@
object PipelinesApiBackendLifecycleActorFactory extends StrictLogging {
val preemptionCountKey = "PreemptionCount"
val unexpectedRetryCountKey = "UnexpectedRetryCount"
val quotaRetryCountKey = "QuotaRetryCount"

Check warning on line 130 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiBackendLifecycleActorFactory.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiBackendLifecycleActorFactory.scala#L130

Added line #L130 was not covered by tests

private[common] def robustBuildAttributes(buildAttributes: () => PipelinesApiConfigurationAttributes,
maxAttempts: Int = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cromwell.backend.google.pipelines.common
import com.typesafe.config.Config
import cromwell.backend.BackendConfigurationDescriptor
import cromwell.backend.google.pipelines.common.api.PipelinesApiFactoryInterface
import cromwell.backend.google.pipelines.common.authentication.{PipelinesApiAuths, PipelinesApiDockerCredentials}
import cromwell.backend.google.pipelines.common.authentication.PipelinesApiDockerCredentials
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.BackendDockerConfiguration
import net.ceedubs.ficus.Ficus._
Expand All @@ -17,7 +17,6 @@ class PipelinesApiConfiguration(val configurationDescriptor: BackendConfiguratio
val papiAttributes: PipelinesApiConfigurationAttributes
) extends DefaultJsonProtocol {

val jesAuths: PipelinesApiAuths = papiAttributes.auths
val root: String = configurationDescriptor.backendConfig.getString("root")
val pipelineTimeout: FiniteDuration = papiAttributes.pipelineTimeout
val runtimeConfig: Option[Config] = configurationDescriptor.backendRuntimeAttributesConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
cacheHitDuplicationStrategy: PipelinesCacheHitDuplicationStrategy,
requestWorkers: Int Refined Positive,
pipelineTimeout: FiniteDuration,
quotaAttempts: Int,
logFlushPeriod: Option[FiniteDuration],
gcsTransferConfiguration: GcsTransferConfiguration,
virtualPrivateCloudConfiguration: VirtualPrivateCloudConfiguration,
Expand Down Expand Up @@ -86,6 +87,9 @@

val checkpointingIntervalKey = "checkpointing-interval"

/**
* Used to screen & warn about unexpected keys
*/
private val papiKeys = CommonBackendConfigurationAttributes.commonValidConfigurationAttributeKeys ++ Set(
"project",
"root",
Expand All @@ -108,6 +112,7 @@
"concurrent-job-limit",
"request-workers",
"pipeline-timeout",
"quota-attempts",

Check warning on line 115 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiConfigurationAttributes.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiConfigurationAttributes.scala#L115

Added line #L115 was not covered by tests
"batch-requests.timeouts.read",
"batch-requests.timeouts.connect",
"default-runtime-attributes.bootDiskSizeGb",
Expand Down Expand Up @@ -223,6 +228,8 @@

val pipelineTimeout: FiniteDuration = backendConfig.getOrElse("pipeline-timeout", 7.days)

val quotaAttempts: Int = backendConfig.as[Option[Int]]("quota-attempts").getOrElse(20)

Check warning on line 231 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiConfigurationAttributes.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiConfigurationAttributes.scala#L231

Added line #L231 was not covered by tests

val logFlushPeriod: Option[FiniteDuration] = backendConfig.as[Option[FiniteDuration]]("log-flush-period") match {
case Some(duration) if duration.isFinite => Option(duration)
// "Inf" disables upload
Expand Down Expand Up @@ -317,6 +324,7 @@
cacheHitDuplicationStrategy = cacheHitDuplicationStrategy,
requestWorkers = requestWorkers,
pipelineTimeout = pipelineTimeout,
quotaAttempts = quotaAttempts,
logFlushPeriod = logFlushPeriod,
gcsTransferConfiguration = gcsTransferConfiguration,
virtualPrivateCloudConfiguration = virtualPrivateCloudConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,36 @@
import common.validation.ErrorOr.ErrorOr
import cromwell.backend.google.pipelines.common.PipelinesApiBackendLifecycleActorFactory.{
preemptionCountKey,
quotaRetryCountKey,
unexpectedRetryCountKey
}
import cromwell.services.keyvalue.KeyValueServiceActor._

import scala.util.{Failure, Success, Try}

case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int)
case class PreviousRetryReasons(preempted: Int, unexpectedRetry: Int, quota: Int)

object PreviousRetryReasons {

def tryApply(prefetchedKvEntries: Map[String, KvResponse], attemptNumber: Int): ErrorOr[PreviousRetryReasons] = {
val validatedPreemptionCount = validatedKvResponse(prefetchedKvEntries.get(preemptionCountKey), preemptionCountKey)
val validatedUnexpectedRetryCount =
validatedKvResponse(prefetchedKvEntries.get(unexpectedRetryCountKey), unexpectedRetryCountKey)
val validatedQuotaRetryCount = validatedKvResponse(prefetchedKvEntries.get(quotaRetryCountKey), quotaRetryCountKey)

Check warning on line 23 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala#L23

Added line #L23 was not covered by tests

(validatedPreemptionCount, validatedUnexpectedRetryCount) mapN PreviousRetryReasons.apply
(validatedPreemptionCount, validatedUnexpectedRetryCount, validatedQuotaRetryCount) mapN PreviousRetryReasons.apply

Check warning on line 25 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala#L25

Added line #L25 was not covered by tests
}

def apply(knownPreemptedCount: Int, knownUnexpectedRetryCount: Int, attempt: Int): PreviousRetryReasons = {
def apply(knownPreemptedCount: Int,
knownUnexpectedRetryCount: Int,
quotaCount: Int,
attempt: Int
): PreviousRetryReasons = {
// If we have anything unaccounted for, we can top up the unexpected retry count.
// NB: 'attempt' is 1-indexed, so, magic number:
// NB2: for sanity's sake, I won't let this unaccounted for drop below 0, just in case...
val unaccountedFor = Math.max(attempt - 1 - knownPreemptedCount - knownUnexpectedRetryCount, 0)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor)
PreviousRetryReasons(knownPreemptedCount, knownUnexpectedRetryCount + unaccountedFor, quotaCount)

Check warning on line 37 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PreviousRetryReasons.scala#L37

Added line #L37 was not covered by tests
}

private def validatedKvResponse(r: Option[KvResponse], fromKey: String): ErrorOr[Int] = r match {
Expand Down
Loading
Loading