Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1625 Refactor ahead of quota retry #7432

Merged
merged 9 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async._
import cromwell.backend.standard.StandardAdHocValue._
import cromwell.backend.standard.retry.memory.MemoryRetryResult
import cromwell.backend.validation._
import cromwell.core.io.{AsyncIoActorClient, DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.path.Path
Expand Down Expand Up @@ -896,6 +897,16 @@
/**
* Returns true if the status represents a completion.
*
* Select meanings by backend:
* - TES:
* `cromwell.backend.impl.tes.Complete` derived from "state": "COMPLETE"
* - Life Sciences:
* `com.google.api.services.genomics.v2alpha1.model.Operation.getDone` is true
* -- AND --
* `com.google.api.services.genomics.v2alpha1.model.Operation#getError` is empty
* - GCP Batch:
* `com.google.cloud.batch.v1.JobStatus.State` is `SUCCEEDED`
*
* @param runStatus The run status.
* @return True if the job is done.
*/
Expand Down Expand Up @@ -1054,7 +1065,7 @@
* @return The execution handle.
*/
def retryElseFail(backendExecutionStatus: Future[ExecutionHandle],
retryWithMoreMemory: Boolean = false
memoryRetry: MemoryRetryResult = MemoryRetryResult.none
): Future[ExecutionHandle] =
backendExecutionStatus flatMap {
case failedRetryableOrNonRetryable: FailedExecutionHandle =>
Expand All @@ -1069,35 +1080,48 @@
case None => Map.empty[String, KvPair]
}

val maxRetriesNotReachedYet = previousFailedRetries < maxRetries
failedRetryableOrNonRetryable match {
case failed: FailedNonRetryableExecutionHandle if maxRetriesNotReachedYet =>
(retryWithMoreMemory, memoryRetryFactor, previousMemoryMultiplier) match {
case (true, Some(retryFactor), Some(previousMultiplier)) =>
val nextMemoryMultiplier = previousMultiplier * retryFactor.value
saveAttrsAndRetry(failed,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(nextMemoryMultiplier)
)
case (true, Some(retryFactor), None) =>
saveAttrsAndRetry(failed,
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(retryFactor.value)
)
case (_, _, _) =>
saveAttrsAndRetry(failed, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
}
case failedNonRetryable: FailedNonRetryableExecutionHandle => Future.successful(failedNonRetryable)
case failedNonRetryable: FailedNonRetryableExecutionHandle if previousFailedRetries < maxRetries =>
// The user asked us to retry finitely for them, possibly with a memory modification
evaluateFailureRetry(failedNonRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, memoryRetry)
case failedNonRetryable: FailedNonRetryableExecutionHandle =>
// No reason to retry
Future.successful(failedNonRetryable)
case failedRetryable: FailedRetryableExecutionHandle =>
// Retry infinitely and unconditionally (!)
saveAttrsAndRetry(failedRetryable, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = false)
}
case _ => backendExecutionStatus
}

private def evaluateFailureRetry(handle: FailedNonRetryableExecutionHandle,
kvsFromPreviousAttempt: Map[String, KvPair],
kvsForNextAttempt: Map[String, KvPair],
memoryRetry: MemoryRetryResult
): Future[FailedRetryableExecutionHandle] =
(memoryRetry.oomDetected, memoryRetry.factor, memoryRetry.previousMultiplier) match {
case (true, Some(retryFactor), Some(previousMultiplier)) =>
// Subsequent memory retry attempt
val nextMemoryMultiplier = previousMultiplier * retryFactor.value
saveAttrsAndRetry(handle,

Check warning on line 1106 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1105-L1106

Added lines #L1105 - L1106 were not covered by tests
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(nextMemoryMultiplier)

Check warning on line 1110 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1109-L1110

Added lines #L1109 - L1110 were not covered by tests
)
case (true, Some(retryFactor), None) =>
// First memory retry attempt
saveAttrsAndRetry(handle,

Check warning on line 1114 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1114

Added line #L1114 was not covered by tests
kvsFromPreviousAttempt,
kvsForNextAttempt,
incFailedCount = true,
Option(retryFactor.value)

Check warning on line 1118 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1117-L1118

Added lines #L1117 - L1118 were not covered by tests
)
case (_, _, _) =>
// Not an OOM
saveAttrsAndRetry(handle, kvsFromPreviousAttempt, kvsForNextAttempt, incFailedCount = true)
}

private def saveAttrsAndRetry(failedExecHandle: FailedExecutionHandle,
kvPrev: Map[String, KvPair],
kvNext: Map[String, KvPair],
Expand Down Expand Up @@ -1400,7 +1424,9 @@
None
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)

Check warning on line 1428 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1427-L1428

Added lines #L1427 - L1428 were not covered by tests
)
case Success(returnCodeAsInt) if isAbort(returnCodeAsInt) =>
Future.successful(AbortedExecutionHandle)
case Success(returnCodeAsInt) =>
Expand Down Expand Up @@ -1430,7 +1456,9 @@
None
)
)
retryElseFail(executionHandle, outOfMemoryDetected)
retryElseFail(executionHandle,
MemoryRetryResult(outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)

Check warning on line 1460 in backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala#L1459-L1460

Added lines #L1459 - L1460 were not covered by tests
)
case _ =>
val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
retryElseFail(failureStatus)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cromwell.backend.standard.retry.memory

import common.validation.Validation.MemoryRetryMultiplierRefined

/**
* Result of evaluating an attempt as a memory-retry candidate, encapsulating instructions
* for configuring the next attempt if applicable.
*
* @param oomDetected Did the previous attempt OOM?
* @param factor User-configured factor
* @param previousMultiplier Multiplier used for the previous attempt
*/
case class MemoryRetryResult(oomDetected: Boolean,
factor: Option[MemoryRetryMultiplierRefined],
previousMultiplier: Option[Double]
)

case object MemoryRetryResult {
def none: MemoryRetryResult = MemoryRetryResult(oomDetected = false, None, None)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cromwell.backend.google.pipelines.common

package object errors {

private def quotaMessages = List(
"A resource limit has delayed the operation",
"usage too high",
"no available zones",
"resource_exhausted",
"quota too low"
)

def isQuotaMessage(msg: String): Boolean =
quotaMessages.exists(msg.contains)

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import cromwell.backend.google.pipelines.common.api.RunStatus.{
Success,
UnsuccessfulRunStatus
}
import cromwell.backend.google.pipelines.common.errors.isQuotaMessage
import cromwell.backend.google.pipelines.v2beta.PipelinesConversions._
import cromwell.backend.google.pipelines.v2beta.api.Deserialization._
import cromwell.backend.google.pipelines.v2beta.api.request.ErrorReporter._
Expand Down Expand Up @@ -197,16 +198,9 @@ trait GetRequestHandler { this: RequestHandler =>
private def isQuotaDelayed(events: List[Event]): Boolean =
events.sortBy(_.getTimestamp).reverse.headOption match {
case Some(event) =>
quotaMessages.exists(event.getDescription.contains)
isQuotaMessage(event.getDescription)
case None =>
// If the events list is empty, we're not waiting for quota yet
false
}

private val quotaMessages = List(
"A resource limit has delayed the operation",
"usage too high",
"no available zones",
"resource_exhausted"
)
}
Loading