Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1595 GCP Batch backend refactor to include the PAPI request manager #7412

Merged
merged 80 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 74 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
ff8543c
Refactor GCP Batch request manager
AlexITC Jan 14, 2024
5cca13a
Rename PAPI stuff from BatchApiRequestManager
AlexITC Jan 14, 2024
0823351
Fix scalafmt
AlexITC Jan 14, 2024
0f5ab24
Temporary disable CI
AlexITC Jan 14, 2024
94d1aaf
Add missing test to GcpBatchAsyncBackendJobExecutionActorSpec
AlexITC Jan 15, 2024
af6b42d
Missing rename from PAPI to Batch in BatchApiRunCreationClient
AlexITC Jan 15, 2024
fdb5c24
draft
AlexITC Jan 23, 2024
ff23056
draft
AlexITC Jan 23, 2024
ecc28d1
Yet another draft
AlexITC Jan 27, 2024
2afef28
Code compiles! tests are pending to be fixed
AlexITC Jan 28, 2024
a249835
Implement abort operation
AlexITC Jan 28, 2024
7ef43da
Get has been implemented
AlexITC Jan 28, 2024
71daebe
Tests are compiling
AlexITC Jan 29, 2024
1e8bcef
Complete the PAPIv2 migration to batch
AlexITC Jan 29, 2024
ad51153
Fix GcpBatchGroupedRequests
AlexITC Jan 29, 2024
476a923
Enable tests
AlexITC Jan 29, 2024
f92974b
Clean up unnecessary code + batch abort request bugfix
AlexITC Feb 4, 2024
0bf7a90
Handle JobAbortedException
AlexITC Feb 12, 2024
94ce5cd
Handle GCP errors
AlexITC Feb 12, 2024
76bb726
Refactor to run batch requests in parallel through futures
AlexITC Feb 15, 2024
dbd6d24
Refactor GcpBatchGroupedRequests to be immutable
AlexITC Feb 15, 2024
e619ef0
Enable commented tests
AlexITC Mar 10, 2024
057ee14
Huge refactor on batch RunStatus
AlexITC Mar 11, 2024
6fe1518
Tag the PipelinesApiAsyncBackendJobExecutionActorSpec tests that are …
AlexITC Mar 11, 2024
0412a9d
Clean up + enable missing telemetry entries for batch
AlexITC Mar 18, 2024
d71751d
Add more tests
AlexITC Mar 19, 2024
3ccc34d
Add more tests that seem to reproduce a bug
AlexITC Mar 19, 2024
810a061
Minor fixes
AlexITC Mar 19, 2024
9acc240
Add TODO
AlexITC Mar 19, 2024
7a4db8c
Add note about google sdk on AbortRequestHandler
AlexITC Apr 2, 2024
2db7ce0
Add tests to LoadConfigSpec
AlexITC Apr 2, 2024
62129c9
Refactor the abort workflow
AlexITC Apr 2, 2024
26603c8
Refactor GcpBatchGroupedRequests to be a data holder only
AlexITC Apr 2, 2024
29f3c4f
Add missing files
AlexITC Apr 3, 2024
778c83a
Implement event list mapping
AlexITC Apr 3, 2024
7becc27
Clean up RunStatus from unnecessary data
AlexITC Apr 3, 2024
38eb051
Remove TODOs
AlexITC Apr 3, 2024
64510cf
Fix minor bug in BatchRequestExecutor
AlexITC Apr 4, 2024
be118f5
Clean up BatchApiRequestWorker
AlexITC Apr 4, 2024
b4a1b30
Further clean up
AlexITC Apr 4, 2024
e8a61f1
Migrate missing details from papi to batch
AlexITC Apr 11, 2024
d9c362a
Run centaurGcpBatch tests only
AlexITC Apr 11, 2024
11a02bb
Try setting requestsAbortAndDiesImmediately=true
AlexITC Apr 11, 2024
1e2661b
Draft
AlexITC Apr 15, 2024
290f065
Add debug option
AlexITC Apr 18, 2024
50517e4
Enable tests again
AlexITC Apr 18, 2024
a333f65
Remove unnecessary comments from GcpBatchAsyncBackendJobExecutionActo…
AlexITC Apr 23, 2024
bfd9dd1
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC Apr 26, 2024
54c152d
Fix scalafmt
AlexITC Apr 26, 2024
59c9675
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 2, 2024
83891cf
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 3, 2024
0cf2ba0
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 7, 2024
526447d
Clean up
AlexITC May 7, 2024
8d7c618
Fix scalafmt + add a new test
AlexITC May 7, 2024
fa18a3c
Add tests to BatchApiRequestWorker
AlexITC May 7, 2024
f872fee
Further clean up
AlexITC May 7, 2024
0e30684
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 9, 2024
c08b09b
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 10, 2024
23650c0
Further cleanup
AlexITC May 10, 2024
35dce53
More cleanup
AlexITC May 10, 2024
cee36d9
Minor tweaks
AlexITC May 10, 2024
6119047
Set requestsAbortAndDiesImmediately=false
AlexITC May 14, 2024
f4511b9
Revert temporal flag change
AlexITC May 14, 2024
b30be69
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 14, 2024
8f16dbc
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 15, 2024
28a62ef
Fix merge errors
AlexITC May 16, 2024
0e4ddcb
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 16, 2024
f1a83ff
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 21, 2024
dca1420
Set requestsAbortAndDiesImmediately=false
AlexITC May 21, 2024
eca754f
Improve abort job handler
AlexITC May 21, 2024
0095a87
Remove customPollStatusFailure
AlexITC May 21, 2024
837f86e
Fix compile errors
AlexITC May 21, 2024
dcd0efe
Fix abort from BatchApiRequestManager
AlexITC May 21, 2024
3c9d020
Try fixing preemption errors from GCP
AlexITC May 21, 2024
e261cb9
Rollback the preemption fixes
AlexITC May 24, 2024
1050850
Final cleanup
AlexITC May 25, 2024
75acee0
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 25, 2024
387e399
Yet another cleanup
AlexITC May 25, 2024
5f01bc9
Run scalafmt
AlexITC May 25, 2024
37e5fde
Merge branch 'develop' into gcp-batch-request-manager-refactor-v2
AlexITC May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,10 @@ load-control {
# Google requests to the Pipelines API are also queued and batched
papi-requests = 10000

## Backend specific ##
# Google requests to the Batch API are also queued and batched
batch-requests = 10000

## Misc. ##
# How often each actor should update its perceived load
monitoring-frequency = 5 seconds
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/cromwell/core/LoadConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ object LoadConfig {
val MetadataWriteThreshold = conf.as[Int]("metadata-write")
val MonitoringFrequency = conf.as[FiniteDuration]("monitoring-frequency")
val PAPIThreshold = conf.as[Int]("papi-requests")
val BatchThreshold = conf.as[Int]("batch-requests")
}
1 change: 1 addition & 0 deletions core/src/test/scala/cromwell/core/LoadConfigSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ class LoadConfigSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers
LoadConfig.IoNormalWindowMaximum shouldBe 60.seconds
LoadConfig.MonitoringFrequency shouldBe 5.seconds
LoadConfig.PAPIThreshold shouldBe 10000
LoadConfig.BatchThreshold shouldBe 10000
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,22 @@ package cromwell.backend.google.batch

import akka.actor.{ActorRef, Props}
import com.google.api.client.util.ExponentialBackOff
import com.google.api.gax.rpc.FixedHeaderProvider
import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.common.collect.ImmutableMap
import com.typesafe.scalalogging.StrictLogging
import cromwell.backend._
import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{
preemptionCountKey,
robustBuildAttributes,
unexpectedRetryCountKey
}
import cromwell.backend.google.batch.actors._
import cromwell.backend.google.batch.api.{GcpBatchApiRequestHandler, GcpBatchRequestFactoryImpl}
import cromwell.backend.google.batch.models.{GcpBatchConfiguration, GcpBatchConfigurationAttributes}
import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler}
import cromwell.backend.google.batch.callcaching.{BatchBackendCacheHitCopyingActor, BatchBackendFileHashingActor}
import cromwell.backend.google.batch.models.{GcpBatchConfiguration, GcpBatchConfigurationAttributes}
import cromwell.backend.standard._
import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardFileHashingActor}
import cromwell.backend.{
BackendConfigurationDescriptor,
BackendInitializationData,
BackendWorkflowDescriptor,
Gcp,
JobExecutionMap,
Platform
}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.core.CallOutputs
import wom.graph.CommandCallNode
Expand Down Expand Up @@ -70,7 +66,6 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,

override def workflowFinalizationActorParams(workflowDescriptor: BackendWorkflowDescriptor,
ioActor: ActorRef,
// batchConfiguration: GcpBatchConfiguration,
calls: Set[CommandCallNode],
jobExecutionMap: JobExecutionMap,
workflowOutputs: CallOutputs,
Expand All @@ -93,10 +88,19 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
)

override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] = {
val requestHandler = new GcpBatchApiRequestHandler
val requestFactory = new GcpBatchRequestFactoryImpl()(batchConfiguration.batchAttributes.gcsTransferConfiguration)
implicit val requestHandler: RequestHandler = new RequestHandler

val batchSettings = BatchServiceSettings.newBuilder
.setHeaderProvider(FixedHeaderProvider.create(ImmutableMap.of("user-agent", "cromwell")))
.build

Option(
GcpBatchBackendSingletonActor.props(requestFactory, serviceRegistryActor = serviceRegistryActor)(requestHandler)
GcpBatchBackendSingletonActor.props(
qps = batchConfiguration.batchAttributes.qps,
requestWorkers = batchConfiguration.batchAttributes.requestWorkers,
serviceRegistryActor = serviceRegistryActor,
batchRequestExecutor = new BatchRequestExecutor.CloudImpl(batchSettings)
)(requestHandler)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,47 @@

import akka.actor.{Actor, ActorLogging, ActorRef}
import com.google.cloud.batch.v1.JobName
import cromwell.backend.google.batch.api.BatchApiRequestManager.{BatchAbortRequest, BatchApiAbortQueryFailed}
import cromwell.backend.google.batch.api.GcpBatchRequestFactory
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
import cromwell.backend.standard.StandardAsyncJob
import cromwell.core.WorkflowId
import cromwell.core.logging.JobLogging

trait BatchApiAbortClient { this: Actor with ActorLogging with BatchInstrumentation =>
object BatchApiAbortClient {
sealed trait BatchAbortRequestSuccess
case class BatchAbortRequestSuccessful(jobId: String) extends BatchAbortRequestSuccess

def abortJob(jobName: JobName, backendSingletonActor: ActorRef): Unit =
backendSingletonActor ! GcpBatchBackendSingletonActor.Action.AbortJob(jobName)
// The operation is no longer running. Maybe it was already cancelled, maybe it finished on its own. We don't know
// the details and for abort they don't really matter.
case class BatchOperationIsAlreadyTerminal(jobId: String) extends BatchAbortRequestSuccess
}

trait BatchApiAbortClient { this: Actor with ActorLogging with JobLogging with BatchInstrumentation =>
import BatchApiAbortClient._

def abortJob(workflowId: WorkflowId,
jobName: JobName,
backendSingletonActor: ActorRef,
requestFactory: GcpBatchRequestFactory
): Unit =
backendSingletonActor ! BatchAbortRequest(
workflowId = workflowId,
requester = self,
httpRequest = requestFactory.abortRequest(jobName),
jobId = StandardAsyncJob(jobName.toString)
)

def abortActorClientReceive: Actor.Receive = {
case GcpBatchBackendSingletonActor.Event.JobAbortRequestSent(job) =>
log.info(s"Job aborted on GCP: ${job.getName}")
case BatchAbortRequestSuccessful(jobId) =>
abortSuccess()
jobLogger.info(s"Successfully requested cancellation of $jobId")

// In this case we could immediately return an aborted handle and spare ourselves a round of polling
case BatchOperationIsAlreadyTerminal(jobId) =>
jobLogger.info(s"Job $jobId has already finished")

Check warning on line 43 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiAbortClient.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiAbortClient.scala#L43

Added line #L43 was not covered by tests

case GcpBatchBackendSingletonActor.Event.ActionFailed(jobName, cause) =>
val msg = s"Failed to abort job ($jobName) from GCP"
log.error(cause, msg)
abortFailed()
case BatchApiAbortQueryFailed(query, e) =>
jobLogger.error(s"Could not request cancellation of job ${query.jobId}", e)

Check warning on line 46 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiAbortClient.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiAbortClient.scala#L46

Added line #L46 was not covered by tests
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package cromwell.backend.google.batch.actors

import akka.actor.{Actor, ActorLogging, ActorRef}
import cromwell.backend.google.batch.api.BatchApiRequestManager.{
BatchApiRunCreationQueryFailed,
SystemBatchApiException
}
import cromwell.backend.google.batch.api.{BatchApiRequestManager, GcpBatchRequestFactory}
import cromwell.backend.google.batch.models.GcpBatchRequest
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
import cromwell.backend.standard.StandardAsyncJob
import cromwell.core.logging.JobLogger

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

/**
* Handles the flow for submitting a single job to GCP, we can't do anything when that fails
* Handles the flow for submitting a single job to GCP
*/
trait BatchApiRunCreationClient { this: Actor with ActorLogging with BatchInstrumentation =>
private var runCreationClientPromise: Option[Promise[StandardAsyncJob]] = None

// handles messages produced from GcpBatchBackendSingletonActor
def runCreationClientReceive: Actor.Receive = {
case GcpBatchBackendSingletonActor.Event.JobSubmitted(job) =>
log.info(s"Job submitted to GCP: ${job.getName}")
case job: StandardAsyncJob =>
log.info(s"A job was submitted successfully: ${job.jobId}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no problem merging as-is with these logs in place, seeing as we'll probably have some more rounds of debugging. That said, we'll probably want to reduce the info ones eventually.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the time being, I have marked the noisy logs as debug and kept the rest as info, it will be help to debug some of the currently open issues.

runSuccess()
completePromise(Success(StandardAsyncJob(job.getName)))

case GcpBatchBackendSingletonActor.Event.ActionFailed(jobName, cause) =>
val msg = s"Failed to submit job ($jobName) to GCP"
log.error(cause, msg)
runFailed()
completePromise(Failure(cause))
completePromise(Success(job))
case BatchApiRunCreationQueryFailed(query, e) =>
log.error(e, s"Failed to submit job ${query.httpRequest.getJobId}: ${e.getMessage}")
completePromise(Failure(e))

Check warning on line 31 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiRunCreationClient.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiRunCreationClient.scala#L29-L31

Added lines #L29 - L31 were not covered by tests
}

private def completePromise(job: Try[StandardAsyncJob]): Unit = {
Expand All @@ -35,15 +38,37 @@
runCreationClientPromise = None
}

def runBatchJob(request: GcpBatchRequest, backendSingletonActor: ActorRef): Future[StandardAsyncJob] =
def runBatchJob(
request: GcpBatchRequest,
backendSingletonActor: ActorRef,
requestFactory: GcpBatchRequestFactory,
jobLogger: JobLogger
): Future[StandardAsyncJob] =
runCreationClientPromise match {
case Some(p) =>
p.future
case None =>
log.info(s"Asking singleton actor to submit a job: ${request.jobName}")
backendSingletonActor ! GcpBatchBackendSingletonActor.Action.SubmitJob(request)
jobLogger.info(s"Asking singleton actor to submit a job: ${request.jobName}")

backendSingletonActor ! BatchApiRequestManager.BatchRunCreationRequest(
request.workflowId,
self,
requestFactory.submitRequest(request)
)
val newPromise = Promise[StandardAsyncJob]()
runCreationClientPromise = Option(newPromise)
newPromise.future
}
}

object BatchApiRunCreationClient {

/**
* Exception used to represent the fact that a job was aborted before a creation attempt was made.
* Meaning it was in the queue when the abort request was made, so it was just removed from the queue.
*/
case object JobAbortedException
extends SystemBatchApiException(
new Exception("The job was removed from the queue before a Batch creation request was made")
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package cromwell.backend.google.batch.actors

import akka.actor.{Actor, ActorLogging, ActorRef}
import com.google.cloud.batch.v1.JobName
import cromwell.backend.google.batch.api.BatchApiRequestManager.{BatchApiStatusQueryFailed, BatchStatusPollRequest}
import cromwell.backend.google.batch.api.GcpBatchRequestFactory
import cromwell.backend.google.batch.models.RunStatus
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
import cromwell.backend.standard.StandardAsyncJob
import cromwell.core.WorkflowId

import scala.concurrent.{Future, Promise}
import scala.util.{Failure, Success, Try}

/**
* Allows fetching a job status
*/
trait BatchApiStatusRequestClient { this: Actor with ActorLogging with BatchInstrumentation =>

private var pollingActorClientPromise: Option[Promise[RunStatus]] = None

def pollingActorClientReceive: Actor.Receive = {
case status: RunStatus =>
log.debug(s"Polled status received: $status")
pollSuccess()
completePromise(Success(status))
case BatchApiStatusQueryFailed(query, e) =>
log.error(e, s"Poll status failed for job ${query.jobId}: ${e.getMessage}")
completePromise(Failure(e))

Check warning on line 29 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiStatusRequestClient.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiStatusRequestClient.scala#L27-L29

Added lines #L27 - L29 were not covered by tests
}

private def completePromise(result: Try[RunStatus]): Unit = {
pollingActorClientPromise foreach { _.complete(result) }
pollingActorClientPromise = None
}

def pollStatus(
workflowId: WorkflowId,
jobName: JobName,
backendSingletonActor: ActorRef,
requestFactory: GcpBatchRequestFactory
): Future[RunStatus] =
pollingActorClientPromise match {
case Some(p) => p.future

Check warning on line 44 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiStatusRequestClient.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiStatusRequestClient.scala#L44

Added line #L44 was not covered by tests
case None =>
backendSingletonActor ! BatchStatusPollRequest(
workflowId,
self,
requestFactory.queryRequest(jobName),
StandardAsyncJob(jobName.toString)
)

val newPromise = Promise[RunStatus]()
pollingActorClientPromise = Option(newPromise)
newPromise.future
}
}
Loading
Loading