Skip to content

Commit

Permalink
EFS fixes needed because of the earlier broadinstitute#5468 s3 relate…
Browse files Browse the repository at this point in the history
…d changes
  • Loading branch information
Vanaja Narayanaswamy committed Nov 24, 2020
1 parent b46f51d commit 83cd9f1
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ import cromwell.backend.io.DirectoryFunctions
import cromwell.backend.io.JobPaths
import cromwell.backend.standard.{StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, StandardAsyncJob}
import cromwell.core._
import cromwell.core.path.{DefaultPathBuilder, Path, PathFactory, PathBuilder}
import cromwell.core.io.DefaultIoCommandBuilder
import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder, PathFactory}
import cromwell.core.io.{DefaultIoCommandBuilder, IoCommandBuilder}
import cromwell.core.retry.SimpleExponentialBackoff
import cromwell.filesystems.s3.S3Path
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
Expand Down Expand Up @@ -123,6 +123,11 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

override lazy val dockerImageUsed: Option[String] = Option(jobDockerImage)

private lazy val execScript =
s"""|#!$jobShell
|${jobPaths.script.pathWithoutScheme}
|""".stripMargin


/* Batch job object (see AwsBatchJob). This has the configuration necessary
* to perform all operations with the AWS Batch infrastructure. This is
Expand Down Expand Up @@ -156,12 +161,18 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
* needs to push stuff out to S3. This is why we will eventually need
* commandScriptContents here
*/

lazy val cmdScript = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => commandScriptContents.toEither.right.get
case _ => execScript
}

lazy val batchJob: AwsBatchJob = {
AwsBatchJob(
jobDescriptor,
runtimeAttributes,
instantiatedCommand.commandString,
commandScriptContents.toEither.right.get,
cmdScript,
rcPath.toString, executionStdout, executionStderr,
generateAwsBatchInputs(jobDescriptor),
generateAwsBatchOutputs(jobDescriptor),
Expand Down Expand Up @@ -348,7 +359,14 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar

// used by generateAwsBatchOutputs, could potentially move this def within that function
private def generateAwsBatchSingleFileOutputs(womFile: WomSingleFile): List[AwsBatchFileOutput] = {
val destination = callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString
val destination = configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString
case _ => DefaultPathBuilder.get(womFile.valueString) match {
case p if !p.isAbsolute => callRootPath.resolve(womFile.value.stripPrefix("/")).pathAsString
case p => p.pathAsString
}

}
val (relpath, disk) = relativePathAndVolume(womFile.value, runtimeAttributes.disks)
val output = AwsBatchFileOutput(makeSafeAwsBatchReferenceName(womFile.value), destination, relpath, disk)
List(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import akka.actor.ActorRef
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import cromwell.filesystems.s3.batch.S3BatchCommandBuilder
import cromwell.backend.standard.{StandardInitializationActor, StandardInitializationActorParams, StandardValidatedRuntimeAttributesBuilder}
//import cromwell.backend.{BackendConfigurationDescriptor, BackendWorkflowDescriptor, BackendInitializationData}
import cromwell.backend.{BackendConfigurationDescriptor, BackendWorkflowDescriptor}
import cromwell.core.io.DefaultIoCommandBuilder
import cromwell.core.io.AsyncIoActorClient
Expand Down Expand Up @@ -69,6 +70,18 @@ class AwsBatchInitializationActor(params: AwsBatchInitializationActorParams)
private val configuration = params.configuration
override implicit val system = context.system

/*override def beforeAll(): Future[Option[BackendInitializationData]] = {
configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => super.beforeAll
case _ => {
initializationData map { data =>
publishWorkflowRoot(data.workflowPaths.workflowRoot.pathAsString)
Option(data)
}
}
}
}*/

override lazy val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder =
AwsBatchRuntimeAttributes.runtimeAttributesBuilder(configuration)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,31 +213,44 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|}
|""".stripMargin
}
private def batch_file_s3_url(scriptBucketName: String, scriptKeyPrefix: String, scriptKey: String): String = runtimeAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => s"s3://${runtimeAttributes.scriptS3BucketName}/$scriptKeyPrefix$scriptKey"
case _ => ""
}

private def generateEnvironmentKVPairs(scriptBucketName: String, scriptKeyPrefix: String, scriptKey: String): List[KeyValuePair] = {
List(buildKVPair(AWS_MAX_ATTEMPTS, AWS_MAX_ATTEMPTS_DEFAULT_VALUE),
buildKVPair(AWS_RETRY_MODE, AWS_RETRY_MODE_DEFAULT_VALUE),
buildKVPair("BATCH_FILE_TYPE", "script"),
buildKVPair("BATCH_FILE_S3_URL",
s"s3://$scriptBucketName/$scriptKeyPrefix$scriptKey"))
buildKVPair("BATCH_FILE_S3_URL",batch_file_s3_url(scriptBucketName,scriptKeyPrefix,scriptKey)))
}

def submitJob[F[_]]()( implicit timer: Timer[F], async: Async[F]): Aws[F, SubmitJobResponse] = {

val taskId = jobDescriptor.key.call.fullyQualifiedName + "-" + jobDescriptor.key.index + "-" + jobDescriptor.key.attempt

//find or create the script in s3 to execute
val scriptKey = findOrCreateS3Script(reconfiguredScript, runtimeAttributes.scriptS3BucketName)
//find or create the script in s3 to execute for s3 fileSystem
val scriptKey = runtimeAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => findOrCreateS3Script(reconfiguredScript, runtimeAttributes.scriptS3BucketName)
case _ => ""
}

if(runtimeAttributes.fileSystem == AWSBatchStorageSystems.s3) {
val regex = "s3://([^/]*)/(.*)".r
val regex(bucketName, key) = jobPaths.callExecutionRoot.toString
writeReconfiguredScriptForAudit(reconfiguredScript, bucketName, key+"/reconfigured-script.sh")
}

val regex = "s3://([^/]*)/(.*)".r
val regex(bucketName, key) = jobPaths.callExecutionRoot.toString

writeReconfiguredScriptForAudit(reconfiguredScript, bucketName, key+"/reconfigured-script.sh")
val batch_script = runtimeAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => s"s3://${runtimeAttributes.scriptS3BucketName}/$scriptKeyPrefix$scriptKey"
case _ => commandScript
}

//calls the client to submit the job
def callClient(definitionArn: String, awsBatchAttributes: AwsBatchAttributes): Aws[F, SubmitJobResponse] = {

Log.info(s"Submitting taskId: $taskId, job definition : $definitionArn, script: s3://${runtimeAttributes.scriptS3BucketName}/$scriptKeyPrefix$scriptKey")
Log.info(s"Submitting taskId: $taskId, job definition : $definitionArn, script: $batch_script")

val submit: F[SubmitJobResponse] =
async.delay(batchClient.submitJob(
Expand All @@ -249,6 +262,7 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
.containerOverrides(
ContainerOverrides.builder
.environment(

generateEnvironmentKVPairs(runtimeAttributes.scriptS3BucketName, scriptKeyPrefix, scriptKey): _*
)
.memory(runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait AwsBatchJobCachingActorHelper extends StandardCachingActorHelper {
// TODO: Determine if call paths are relevant
lazy val callPaths: AwsBatchJobPaths = jobPaths.asInstanceOf[AwsBatchJobPaths]

lazy val runtimeAttributes: AwsBatchRuntimeAttributes = AwsBatchRuntimeAttributes(validatedRuntimeAttributes, configuration.runtimeConfig)
lazy val runtimeAttributes: AwsBatchRuntimeAttributes = AwsBatchRuntimeAttributes(validatedRuntimeAttributes, configuration.runtimeConfig, configuration.fileSystem)

lazy val workingDisk: AwsBatchVolume = runtimeAttributes.disks.find(x => configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => x.name == AwsBatchWorkingDisk.Name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ trait AwsBatchJobDefinitionBuilder {
}



val packedCommand = packCommand("/bin/bash", "-c", "/var/scratch/fetch_and_run.sh")
val cmdName = context.runtimeAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh"
case _ => context.commandText
}
val packedCommand = packCommand("/bin/bash", "-c", cmdName)
val volumes = buildVolumes( context.runtimeAttributes.disks )
val mountPoints = buildMountPoints( context.runtimeAttributes.disks)
val jobDefinitionName = buildName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,19 @@ object AwsBatchRuntimeAttributes {

def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = {
val runtimeConfig = configuration.runtimeConfig
StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation(
def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation(
cpuValidation(runtimeConfig),
cpuMinValidation(runtimeConfig),
disksValidation(runtimeConfig),
zonesValidation(runtimeConfig),
memoryValidation(runtimeConfig),
memoryMinValidation(runtimeConfig),
noAddressValidation(runtimeConfig),
dockerValidation,
queueArnValidation(runtimeConfig),
scriptS3BucketNameValidation(runtimeConfig)
)
def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation(
cpuValidation(runtimeConfig),
cpuMinValidation(runtimeConfig),
disksValidation(runtimeConfig),
Expand All @@ -145,12 +157,17 @@ object AwsBatchRuntimeAttributes {
memoryMinValidation(runtimeConfig),
noAddressValidation(runtimeConfig),
dockerValidation,
queueArnValidation(runtimeConfig),
scriptS3BucketNameValidation(runtimeConfig)
queueArnValidation(runtimeConfig)
)

configuration.fileSystem match {
case AWSBatchStorageSystems.s3 => validationsS3backend

case _ => validationsLocalBackend
}
}

def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, runtimeAttrsConfig: Option[Config]): AwsBatchRuntimeAttributes = {
def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, runtimeAttrsConfig: Option[Config], fileSystem:String): AwsBatchRuntimeAttributes = {
val cpu: Int Refined Positive = RuntimeAttributesValidation.extract(cpuValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
val zones: Vector[String] = RuntimeAttributesValidation.extract(ZonesValidation, validatedRuntimeAttributes)
val memory: MemorySize = RuntimeAttributesValidation.extract(memoryValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
Expand All @@ -160,7 +177,11 @@ object AwsBatchRuntimeAttributes {
val failOnStderr: Boolean = RuntimeAttributesValidation.extract(failOnStderrValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
val continueOnReturnCode: ContinueOnReturnCode = RuntimeAttributesValidation.extract(continueOnReturnCodeValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
val noAddress: Boolean = RuntimeAttributesValidation.extract(noAddressValidation(runtimeAttrsConfig), validatedRuntimeAttributes)
val scriptS3BucketName = RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes)
val scriptS3BucketName = fileSystem match {
case AWSBatchStorageSystems.s3 => RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes)
case _ => ""
}


new AwsBatchRuntimeAttributes(
cpu,
Expand All @@ -172,7 +193,8 @@ object AwsBatchRuntimeAttributes {
failOnStderr,
continueOnReturnCode,
noAddress,
scriptS3BucketName
scriptS3BucketName,
fileSystem
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout
val defaultedAttributes = RuntimeAttributeDefinition.addDefaultsToAttributes(
staticRuntimeAttributeDefinitions, workflowOptions)(runtimeAttributes)
val validatedRuntimeAttributes = runtimeAttributesBuilder.build(defaultedAttributes, NOPLogger.NOP_LOGGER)
AwsBatchRuntimeAttributes(validatedRuntimeAttributes, configuration.runtimeConfig)
AwsBatchRuntimeAttributes(validatedRuntimeAttributes, configuration.runtimeConfig,configuration.fileSystem)
}

private val emptyWorkflowOptions = WorkflowOptions.fromMap(Map.empty).get
Expand Down

0 comments on commit 83cd9f1

Please sign in to comment.