Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: feed all lifebit related changes to this version from 21.04.1 #21

Open
wants to merge 5 commits into
base: 22.11.1-edge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Core analyses is the default owner for this repo.
* @lifebit-ai/tre-core
* @lifebit-ai/tre-batch-analysis
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ version = "1.0.1"
group = "io.nextflow"

dependencies {
implementation ('com.amazonaws:aws-java-sdk-s3:1.11.542')
implementation ('com.amazonaws:aws-java-sdk-s3:1.12.129')
implementation 'com.google.code.gson:gson:2.9.0'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class BashWrapperBuilder {
binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*} || true" : null

binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null
binding.temp_dir = "\${1:-${copyStrategy.getTempDir(workDir)}}"

return binding
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,9 @@ interface ScriptFileCopyStrategy {
*/
String getEnvScript(Map environment, boolean container)

/**
* @param targetDir The directory where output files need to be unstaged ie. stored
* @return the path string for the temp directory
*/
String getTempDir( Path targetDir )
}
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,9 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
}
}

@Override
String getTempDir( Path workDir ) {
return "/tmp"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,22 @@ region="${zone::-1}"
#
[[ '!{dockerPull}' ]] && for x in '!{dockerPull}'; do docker pull $x || true; done

#
# Mount fsx file systems if provided
#
mountCommandsString="!{fsxFileSystemsMountCommands}"
IFS=';' read -ra mountCommandsArray <<< "$mountCommandsString"
[[ '!{fsxFileSystemsMountCommands}' ]] && for fsxMountCommand in "${mountCommandsArray[@]}"; do $fsxMountCommand || true; done

#
# Install NEXTFLOW and launch it
#
version="v!{nextflow.version}"
curl -fsSL http://www.nextflow.io/releases/${version}/nextflow > $HOME/nextflow
chmod +x $HOME/nextflow
$HOME/nextflow -download
if [[ '!{customNextflowBinaryUrl}' ]]; then
curl -s https://get.nextflow.io --output $HOME/nextflow
NXF_PACK="all" NXF_URL="!{customNextflowBinaryUrl}" NXF_VER=${version} NXF_MODE="ignite" NXF_EXECUTOR="ignite" bash nextflow info
chmod +x $HOME/nextflow
$HOME/nextflow -download
fi

# pull the nextflow pipeline repo
[[ '!{nextflow.pull}' ]] && $HOME/nextflow pull '!{nextflow.pull}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ nxf_kill() {
}

nxf_mktemp() {
local base=${1:-/tmp}
local base={{temp_dir}}
mkdir -p "$base"
if [[ $(uname) = Darwin ]]; then mktemp -d $base/nxf.XXXXXXXXXX
else TMPDIR="$base" mktemp -d -t nxf.XXXXXXXXXX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package nextflow.cloud.aws

import com.amazonaws.AmazonClientException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.auth.BasicSessionCredentials
import com.amazonaws.regions.InstanceMetadataRegionProvider
import com.amazonaws.regions.Region
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.batch.AWSBatchClient
Expand All @@ -29,6 +31,8 @@ import com.amazonaws.services.ecs.AmazonECS
import com.amazonaws.services.ecs.AmazonECSClientBuilder
import com.amazonaws.services.logs.AWSLogs
import com.amazonaws.services.logs.AWSLogsAsyncClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -136,35 +140,19 @@ class AmazonClientFactory {
*/
protected String fetchIamRole() {
try {
def role = getUrl('http://169.254.169.254/latest/meta-data/iam/security-credentials/').readLines()
if( role.size() != 1 )
throw new IllegalArgumentException("Not a valid EC2 IAM role")
return role.get(0)
def stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
def roleArn = stsClient.getCallerIdentity(new GetCallerIdentityRequest()).getArn()
if(roleArn) {
return roleArn.split('/')[-2]
}
return null
}
catch( IOException e ) {
catch( AmazonClientException e ) {
log.trace "Unable to fetch IAM credentials -- Cause: ${e.message}"
return null
}
}

/**
* Fetch a remote URL resource text content
*
* @param path
* A valid http/https resource URL
* @param timeout
* Max connection timeout in millis
* @return
* The resource URL content
*/
protected String getUrl(String path, int timeout=150) {
final url = new URL(path)
final con = url.openConnection()
con.setConnectTimeout(timeout)
con.setReadTimeout(timeout)
return con.getInputStream().text.trim()
}

/**
* Retrieve the AWS region from the EC2 instance metadata.
* See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
Expand All @@ -175,10 +163,9 @@ class AmazonClientFactory {
*/
protected String fetchRegion() {
try {
def zone = getUrl('http://169.254.169.254/latest/meta-data/placement/availability-zone')
zone ? zone.substring(0,zone.length()-1) : null
return new InstanceMetadataRegionProvider().getRegion()
}
catch (IOException e) {
catch (AmazonClientException e) {
log.debug "Cannot fetch AWS region", e
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {

protected void validateWorkDir() {
/*
* make sure the work dir is a S3 bucket
* make sure the work dir is a S3 bucket and if we are not using lustre fsx
*/
if( !(workDir instanceof S3Path) ) {
def isUsingLustre = session.config.navigate('cloud.fsxFileSystemsMountCommands')
log.debug "Checking workdir validation, isUsingLustre $isUsingLustre"
if( !(workDir instanceof S3Path) && !isUsingLustre ) {
session.abort()
throw new AbortOperationException("When using `$name` executor an S3 bucket must be provided as working directory using either the `-bucket-dir` or `-work-dir` command line option")
throw new AbortOperationException("When using `$name` executor and we are not using Lustre storage a S3 bucket must be provided as working directory either using -bucket-dir or -work-dir command line option")
}
}

Expand Down Expand Up @@ -259,7 +261,6 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
@PackageScope
ThrottlingExecutor getReaper() { reaper }


CloudMachineInfo getMachineInfoByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getCloudInfoByQueueAndTaskArn(queue, taskArn)
Expand Down Expand Up @@ -299,13 +300,18 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg, )
}

}








String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
try {
return helper?.getInstanceIdByQueueAndTaskArn(queue, taskArn)
}
catch ( AccessDeniedException e ) {
log.warn "Unable to retrieve AWS Batch instance Id | ${e.message}"
return null
}
catch( Exception e ) {
log.warn "Unable to retrieve AWS batch instance id for queue=$queue; task=$taskArn | ${e.message}", e
return null
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.cloud.aws.batch

import nextflow.cloud.aws.util.LifebitUtils

import java.nio.file.Path

import groovy.transform.CompileStatic
Expand Down Expand Up @@ -70,7 +72,8 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
// when a remote bin directory is provide managed it properly
if( opts.remoteBinDir ) {
result << "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommandWhenUsingS3 = "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
result << (LifebitUtils.isUsingLustreFsx(opts) ? "cp -r ${opts.remoteBinDir} \$PWD/nextflow-bin\n" : copyCommandWhenUsingS3)
result << "chmod +x \$PWD/nextflow-bin/* || true\n"
result << "export PATH=\$PWD/nextflow-bin:\$PATH\n"
}
Expand All @@ -83,6 +86,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getStageInputFilesScript(Map<String,Path> inputFiles) {
if( LifebitUtils.isUsingLustreFsx(opts) ) {
log.trace "[USING LUSTRE FSX] stage_inputs."
return super.getStageInputFilesScript(inputFiles) + '\n'
}
def result = 'downloads=(true)\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
Expand All @@ -94,6 +101,9 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String stageInputFile( Path path, String targetName ) {
if( LifebitUtils.isUsingLustreFsx(opts) ) {
return "cp -r ${Escape.path(path)} ${Escape.path(targetName)}"
}
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = opts.maxTransferAttempts > 1 && !opts.retryMode
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
Expand All @@ -106,7 +116,6 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir) {

final patterns = normalizeGlobStarPaths(outputFiles)
// create a bash script that will copy the out file to the working directory
log.trace "[AWS BATCH] Unstaging file path: $patterns"
Expand All @@ -118,6 +127,19 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
for( String it : patterns )
escape.add( Escape.path(it) )

if ( LifebitUtils.isUsingLustreFsx(opts) ) {
log.trace "[USING LUSTRE FSX] unstage_outputs."
return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("cp -r '\$name' ${Escape.path(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent(true)
}

return """\
uploads=()
IFS=\$'\\n'
Expand All @@ -134,7 +156,8 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String touchFile( Path file ) {
"echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
final touchCommandWhenUsingS3 = "echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
return LifebitUtils.isUsingLustreFsx(opts) ? "echo start > ${Escape.path(file)}" : touchCommandWhenUsingS3
}

/**
Expand All @@ -150,7 +173,9 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String copyFile( String name, Path target ) {
"nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
final copyCommandWhenUsingLustre = "cp -r ${Escape.path(name)} ${Escape.path(target.getParent())}"
final copyCommandWhenUsingS3 = "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
return LifebitUtils.isUsingLustreFsx(opts) ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3
}

static String uploadCmd( String source, Path target ) {
Expand All @@ -161,7 +186,7 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
* {@inheritDoc}
*/
String exitFile( Path path ) {
"| nxf_s3_upload - s3:/${Escape.path(path)} || true"
return LifebitUtils.isUsingLustreFsx(opts) ? "> ${Escape.path(path)}" : "| nxf_s3_upload - s3:/${Escape.path(path)} || true"
}

/**
Expand All @@ -171,4 +196,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
String pipeInputFile( Path path ) {
" < ${Escape.path(path.getFileName())}"
}

@Override
String getTempDir( Path targetDir ) {
return LifebitUtils.isUsingLustreFsx(opts) ? "${Escape.path(targetDir)}" : super.getTempDir(targetDir)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,20 @@ class AwsBatchHelper {
return result.toString()
}

private String getInstanceIdByClusterAndTaskArn(String clusterArn, String taskArn) {
final containerId = getContainerIdByClusterAndTaskArn(clusterArn, taskArn)
return containerId ? getInstanceIdByClusterAndContainerId(clusterArn, containerId) : null
}

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {
final clusterArnList = getClusterArnByBatchQueue(queue)
for (String cluster : clusterArnList) {
final result = getInstanceIdByClusterAndTaskArn(cluster, taskArn)
if (result)
return result
}
return null
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.cloud.aws.batch

import nextflow.cloud.aws.util.LifebitUtils

import static nextflow.cloud.aws.batch.AwsContainerOptionsMapper.*

import java.nio.file.Path
Expand Down Expand Up @@ -248,6 +250,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final job = describeJob(jobId)
final done = job?.status in ['SUCCEEDED', 'FAILED']
if( done ) {
log.trace "[AWS BATCH] Completed task: jobId=$jobId"
// finalize the task
task.exitStatus = readExitFile()
task.stdout = outputFile
Expand Down Expand Up @@ -622,7 +625,16 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}"
final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; $aws s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
def logCopyCommand = LifebitUtils.isUsingLustreFsx(opts)
? "trap \"{ ret=\$?; cp ${TaskRun.CMD_LOG} ${getLogFile()} 2> /dev/null; exit \$ret; }\" EXIT; "
: "trap \"{ ret=\$?; $aws --request-payer ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; "
// Note(ruben): Since we do not download the .command.run from s3 bucket and due the fact that is auto imported
// through the link capacity of fsx when mounting we have already access to the file. So, we just need to make it
// executable and run it
def runCopyCommand = LifebitUtils.isUsingLustreFsx(opts)
? "chmod +x ${getWrapperFile()}; ${getWrapperFile()} 2>&1 | tee ${TaskRun.CMD_LOG}"
: "$aws --request-payer s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}"
def cmd = "${logCopyCommand}${runCopyCommand}"
return ['bash','-o','pipefail','-c', cmd.toString()]
}

Expand Down Expand Up @@ -777,7 +789,8 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
return machineInfo
if( queueName && taskArn && executor.awsOptions.fetchInstanceType ) {
machineInfo = executor.getMachineInfoByQueueAndTaskArn(queueName, taskArn)
log.trace "[AWS BATCH] jobId=$jobId; queue=$queueName; task=$taskArn => machineInfo=$machineInfo"
def instanceId = executor.getInstanceIdByQueueAndTaskArn(queueName, taskArn)
log.trace "[AWS BATCH] jobId=$jobId; queue=$queueName; task=$taskArn => machineInfo=$machineInfo; instanceId=$instanceId\""
}
return machineInfo
}
Expand Down
Loading
Loading