Skip to content

Commit

Permalink
WX-1810 WX-1830 n1/n2/n2d machine types, cpuPlatform on GCPBATCH (bro…
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr authored and javiergaitan committed Oct 4, 2024
1 parent c669ba0 commit 84ca918
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
### GCP Batch

- The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information.
- Fixes a bug with not being able to recover jobs on Cromwell restart.
- Fixes machine type selection to match the Google Cloud Life Sciences backend, including default n1 non shared-core machine types and correct handling of `cpuPlatform` to select n2 or n2d machine types as appropriate.
- Fixes the preemption error handling, now, the correct error message is printed, this also handles the other potential exit codes.
- Fixes pulling Docker image metadata from private GCR repositories.
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ lazy val googlePipelinesV2Beta = (project in backendRoot / "google" / "pipelines

lazy val googleBatch = (project in backendRoot / "google" / "batch")
.withLibrarySettings("cromwell-google-batch-backend")
.dependsOn(core)
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(drsFileSystem)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: papi_cpu_platform
testFormat: workflowsuccess
backends: [Papiv2]
backendsMode: any
backends: [Papiv2, GCPBATCH]

files {
workflow: papi_cpu_platform/papi_cpu_platform.wdl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait BatchApiRunCreationClient { this: Actor with ActorLogging with BatchInstru
backendSingletonActor ! BatchApiRequestManager.BatchRunCreationRequest(
request.workflowId,
self,
requestFactory.submitRequest(request)
requestFactory.submitRequest(request, jobLogger)
)
val newPromise = Promise[StandardAsyncJob]()
runCreationClientPromise = Option(newPromise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.VirtualPrivateCloudConfiguration
import cromwell.backend.google.batch.models._
import cromwell.backend.google.batch.monitoring.{CheckpointingConfiguration, MonitoringImage}
import cromwell.core.logging.JobLogger
import cromwell.core.path.Path
import wom.runtime.WomOutputRuntimeExtractor

import scala.concurrent.duration.FiniteDuration

trait GcpBatchRequestFactory {
def submitRequest(data: GcpBatchRequest): CreateJobRequest
def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest

def queryRequest(jobName: JobName): GetJobRequest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.runnable._
import cromwell.backend.google.batch.util.BatchUtilityConversions
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
import cromwell.core.logging.JobLogger

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -74,14 +75,16 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
private def createInstancePolicy(cpuPlatform: String,
spotModel: ProvisioningModel,
accelerators: Option[Accelerator.Builder],
attachedDisks: List[AttachedDisk]
attachedDisks: List[AttachedDisk],
machineType: String
): InstancePolicy.Builder = {

// set GPU count to 0 if not included in workflow
val gpuAccelerators = accelerators.getOrElse(Accelerator.newBuilder.setCount(0).setType("")) // TODO: Driver version

val instancePolicy = InstancePolicy.newBuilder
.setProvisioningModel(spotModel)
.setMachineType(machineType)
.addAllDisks(attachedDisks.asJava)
.setMinCpuPlatform(cpuPlatform)
.buildPartial()
Expand Down Expand Up @@ -154,7 +157,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
}
}

override def submitRequest(data: GcpBatchRequest): CreateJobRequest = {
override def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest = {

val runtimeAttributes = data.gcpBatchParameters.runtimeAttributes
val createParameters = data.createParameters
Expand Down Expand Up @@ -224,7 +227,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val computeResource = createComputeResource(cpuCores, memory, gcpBootDiskSizeMb)
val taskSpec = createTaskSpec(sortedRunnables, computeResource, retryCount, durationInSeconds, allVolumes)
val taskGroup: TaskGroup = createTaskGroup(taskCount, taskSpec)
val instancePolicy = createInstancePolicy(cpuPlatform, spotModel, accelerators, allDisks)
val machineType = GcpBatchMachineConstraints.machineType(runtimeAttributes.memory,
runtimeAttributes.cpu,
cpuPlatformOption = runtimeAttributes.cpuPlatform,
googleLegacyMachineSelection = false,
jobLogger = jobLogger
)
val instancePolicy =
createInstancePolicy(cpuPlatform = cpuPlatform, spotModel, accelerators, allDisks, machineType = machineType)
val locationPolicy = LocationPolicy.newBuilder.addAllowedLocations(zones).build
val allocationPolicy =
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ object GcpBatchRuntimeAttributes {
private val cpuPlatformValidationInstance = new StringRuntimeAttributesValidation(CpuPlatformKey).optional
// via `gcloud compute zones describe us-central1-a`
val CpuPlatformIntelCascadeLakeValue = "Intel Cascade Lake"
val CpuPlatformIntelIceLakeValue = "Intel Ice Lake"
val CpuPlatformAMDRomeValue = "AMD Rome"

val UseDockerImageCacheKey = "useDockerImageCache"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import cromwell.backend.google.batch.models.{
N2CustomMachineType,
N2DCustomMachineType
}
import cromwell.core.logging.JobLogger
import eu.timepit.refined.api.Refined
import eu.timepit.refined.numeric.Positive
import org.slf4j.Logger
import wdl4s.parser.MemoryUnit
import wom.format.MemorySize

Expand All @@ -17,16 +17,17 @@ object GcpBatchMachineConstraints {
cpu: Int Refined Positive,
cpuPlatformOption: Option[String],
googleLegacyMachineSelection: Boolean,
jobLogger: Logger
jobLogger: JobLogger
): String =
if (googleLegacyMachineSelection) {
s"predefined-$cpu-${memory.to(MemoryUnit.MB).amount.intValue()}"
} else {
// If someone requests Intel Cascade Lake as their CPU platform then switch the machine type to n2.
// If someone requests Intel Cascade Lake or Intel Ice Lake as their CPU platform then switch the machine type to n2.
// Similarly, CPU platform of AMD Rome corresponds to the machine type n2d.
val customMachineType =
cpuPlatformOption match {
case Some(GcpBatchRuntimeAttributes.CpuPlatformIntelCascadeLakeValue) => N2CustomMachineType
case Some(GcpBatchRuntimeAttributes.CpuPlatformIntelIceLakeValue) => N2CustomMachineType
case Some(GcpBatchRuntimeAttributes.CpuPlatformAMDRomeValue) => N2DCustomMachineType
case _ => N1CustomMachineType
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
val runtimeAttributesBuilder = GcpBatchRuntimeAttributes.runtimeAttributesBuilder(configuration)

val requestFactory: GcpBatchRequestFactory = new GcpBatchRequestFactory {
override def submitRequest(data: GcpBatchRequest): CreateJobRequest = null
override def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest = null

override def queryRequest(jobName: JobName): GetJobRequest = null

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package cromwell.backend.google.batch.util

import common.assertion.CromwellTimeoutSpec
import common.mock.MockSugar.mock
import cromwell.backend.google.batch.models.GcpBatchRuntimeAttributes
import cromwell.core.logging.JobLogger
import eu.timepit.refined.numeric.Positive
import eu.timepit.refined.refineMV
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks._
import org.scalatest.prop.Tables.Table
import org.slf4j.helpers.NOPLogger
import wdl4s.parser.MemoryUnit
import wom.format.MemorySize

class GcpBatchMachineConstraintsSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers {
behavior of "MachineConstraints"

private val n2Option = Option(GcpBatchRuntimeAttributes.CpuPlatformIntelCascadeLakeValue)
private val n2OptionCascadeLake = Option(GcpBatchRuntimeAttributes.CpuPlatformIntelCascadeLakeValue)

private val n2dOption = Option(GcpBatchRuntimeAttributes.CpuPlatformAMDRomeValue)

private val n2OptionIceLake = Option(GcpBatchRuntimeAttributes.CpuPlatformIntelIceLakeValue)

it should "generate valid machine types" in {
val validTypes = Table(
("memory", "cpu", "cpuPlatformOption", "googleLegacyMachineSelection", "machineTypeString"),
Expand All @@ -41,7 +44,6 @@ class GcpBatchMachineConstraintsSpec extends AnyFlatSpec with CromwellTimeoutSpe

// Same tests as above but with legacy machine type selection (cpu and memory as specified. No 'custom machine
// requirement' adjustments are expected this time, except float->int)

(MemorySize(1024, MemoryUnit.MB), refineMV[Positive](1), None, true, "predefined-1-1024"),
(MemorySize(4, MemoryUnit.GB), refineMV[Positive](3), None, true, "predefined-3-4096"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](1), None, true, "predefined-1-1024"),
Expand All @@ -53,15 +55,26 @@ class GcpBatchMachineConstraintsSpec extends AnyFlatSpec with CromwellTimeoutSpe
(MemorySize(2, MemoryUnit.GB), refineMV[Positive](33), None, true, "predefined-33-2048"),

// Same tests but with cascade lake (n2)
(MemorySize(1024, MemoryUnit.MB), refineMV[Positive](1), n2Option, false, "n2-custom-2-2048"),
(MemorySize(4, MemoryUnit.GB), refineMV[Positive](3), n2Option, false, "n2-custom-4-4096"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](1), n2Option, false, "n2-custom-2-2048"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](4), n2Option, false, "n2-custom-4-4096"),
(MemorySize(14, MemoryUnit.GB), refineMV[Positive](16), n2Option, false, "n2-custom-16-16384"),
(MemorySize(13.65, MemoryUnit.GB), refineMV[Positive](1), n2Option, false, "n2-custom-2-14080"),
(MemorySize(1520.96, MemoryUnit.MB), refineMV[Positive](1), n2Option, false, "n2-custom-2-2048"),
(MemorySize(1024.0, MemoryUnit.MB), refineMV[Positive](1), n2Option, false, "n2-custom-2-2048"),
(MemorySize(2, MemoryUnit.GB), refineMV[Positive](33), n2Option, false, "n2-custom-36-36864"),
(MemorySize(1024, MemoryUnit.MB), refineMV[Positive](1), n2OptionCascadeLake, false, "n2-custom-2-2048"),
(MemorySize(4, MemoryUnit.GB), refineMV[Positive](3), n2OptionCascadeLake, false, "n2-custom-4-4096"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](1), n2OptionCascadeLake, false, "n2-custom-2-2048"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](4), n2OptionCascadeLake, false, "n2-custom-4-4096"),
(MemorySize(14, MemoryUnit.GB), refineMV[Positive](16), n2OptionCascadeLake, false, "n2-custom-16-16384"),
(MemorySize(13.65, MemoryUnit.GB), refineMV[Positive](1), n2OptionCascadeLake, false, "n2-custom-2-14080"),
(MemorySize(1520.96, MemoryUnit.MB), refineMV[Positive](1), n2OptionCascadeLake, false, "n2-custom-2-2048"),
(MemorySize(1024.0, MemoryUnit.MB), refineMV[Positive](1), n2OptionCascadeLake, false, "n2-custom-2-2048"),
(MemorySize(2, MemoryUnit.GB), refineMV[Positive](33), n2OptionCascadeLake, false, "n2-custom-36-36864"),

// Same tests, but with ice lake. Should produce same results as cascade lake since they're both n2.
(MemorySize(1024, MemoryUnit.MB), refineMV[Positive](1), n2OptionIceLake, false, "n2-custom-2-2048"),
(MemorySize(4, MemoryUnit.GB), refineMV[Positive](3), n2OptionIceLake, false, "n2-custom-4-4096"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](1), n2OptionIceLake, false, "n2-custom-2-2048"),
(MemorySize(1, MemoryUnit.GB), refineMV[Positive](4), n2OptionIceLake, false, "n2-custom-4-4096"),
(MemorySize(14, MemoryUnit.GB), refineMV[Positive](16), n2OptionIceLake, false, "n2-custom-16-16384"),
(MemorySize(13.65, MemoryUnit.GB), refineMV[Positive](1), n2OptionIceLake, false, "n2-custom-2-14080"),
(MemorySize(1520.96, MemoryUnit.MB), refineMV[Positive](1), n2OptionIceLake, false, "n2-custom-2-2048"),
(MemorySize(1024.0, MemoryUnit.MB), refineMV[Positive](1), n2OptionIceLake, false, "n2-custom-2-2048"),
(MemorySize(2, MemoryUnit.GB), refineMV[Positive](33), n2OptionIceLake, false, "n2-custom-36-36864"),

// Same tests but with AMD Rome (n2d) #cpu > 16 are in increments of 16
(MemorySize(1024, MemoryUnit.MB), refineMV[Positive](1), n2dOption, false, "n2d-custom-2-1024"),
Expand All @@ -83,7 +96,7 @@ class GcpBatchMachineConstraintsSpec extends AnyFlatSpec with CromwellTimeoutSpe
cpu = cpu,
cpuPlatformOption = cpuPlatformOption,
googleLegacyMachineSelection = googleLegacyMachineSelection,
jobLogger = NOPLogger.NOP_LOGGER
jobLogger = mock[JobLogger]
) shouldBe expected
}
}
Expand Down

0 comments on commit 84ca918

Please sign in to comment.