Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

21.04.1 List of changes done by Lifebit #19

Open
wants to merge 17 commits into
base: 21.04.1-original
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Core analyses is the default owner for this repo.
* @lifebit-ai/core-analyses
2 changes: 1 addition & 1 deletion buildSrc/build.gradle
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes done by @mageshwaran-lifebit

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ version = "1.0.0"
group = "io.nextflow"

dependencies {
implementation ('com.amazonaws:aws-java-sdk-s3:1.11.542')
implementation ('com.amazonaws:aws-java-sdk-s3:1.12.129')
implementation 'com.google.code.gson:gson:2.8.6'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,6 @@ class BashWrapperBuilder {
binding.container_env = null
}

/*
* staging input files when required
*/
final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles)
binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null

binding.stdout_file = TaskRun.CMD_OUTFILE
binding.stderr_file = TaskRun.CMD_ERRFILE
binding.trace_file = TaskRun.CMD_TRACE
Expand All @@ -262,6 +256,13 @@ class BashWrapperBuilder {
binding.launch_cmd = getLaunchCommand(interpreter,env)
binding.stage_cmd = getStageCommand()
binding.unstage_cmd = getUnstageCommand()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this change make sense. so dropping it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What change did you drop? then, I'm siging the stagingScript prop being populated as well. This is needed because .getStageInputFilesScript can actually be fetching the files from s3 or directly from lustre mounted file system.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rubengomex I think only but line 263-23 is moved from 254-255, but this line (281) is newly added. that is considered for development

/*
* staging input and unstage output files when required
*/
final stagingScript = copyStrategy.getStageInputFilesScript(inputFiles)
binding.stage_inputs = stagingScript ? "# stage input files\n${stagingScript}" : null

binding.unstage_controls = changeDir || shouldUnstageOutputs() ? getUnstageControls() : null

if( changeDir || shouldUnstageOutputs() ) {
Expand All @@ -277,7 +278,8 @@ class BashWrapperBuilder {
binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*} || true" : null

binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null

binding.temp_dir = "\${1:-${copyStrategy.getTempDir(workDir)}}"

return binding
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ interface ScriptFileCopyStrategy {
* @return A BASH snippet included in the wrapper script that un-stages the task output files
*/
String getUnstageOutputFilesScript(List<String> outputFiles, Path targetDir)

/**
* @param targetDir The directory where output files need to be unstaged ie. stored
* @return the path string for the temp directory
*/
String getTempDir( Path targetDir )


/**
* Command to 'touch' a file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
return cmd
}

@Override
String getTempDir( Path workDir ) {
return "/tmp"
}

/**
* Creates the script to unstage the result output files from the scratch directory
* to the shared working directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ class GitlabRepositoryProvider extends RepositoryProvider {
protected void auth( URLConnection connection ) {
if( config.token ) {
// set the token in the request header
connection.setRequestProperty("PRIVATE-TOKEN", config.token)
connection.setRequestProperty("Authorization", "Bearer ${config.token}")
} else if (config.password) {
// set the password as the token
connection.setRequestProperty("Authorization", "Bearer ${config.password}")
}
}

Expand Down

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont need this change, as this is been deprecated and not used anymore in code
V19.10.0:
image

*v21.04.1
Uploading image.png…

Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,22 @@ region="${zone::-1}"
#
[[ '!{dockerPull}' ]] && for x in '!{dockerPull}'; do docker pull $x || true; done

#
# Mount fsx file systems if provided
#
mountCommandsString="!{fsxFileSystemsMountCommands}"
IFS=';' read -ra mountCommandsArray <<< "$mountCommandsString"
[[ '!{fsxFileSystemsMountCommands}' ]] && for fsxMountCommand in "${mountCommandsArray[@]}"; do $fsxMountCommand || true; done

#
# Install NEXTFLOW and launch it
#
version="v!{nextflow.version}"
curl -fsSL http://www.nextflow.io/releases/${version}/nextflow > $HOME/nextflow
chmod +x $HOME/nextflow
$HOME/nextflow -download
if [[ '!{customNextflowBinaryUrl}' ]]; then
curl -s https://get.nextflow.io --output nextflow
NXF_PACK="all" NXF_URL="!{customNextflowBinaryUrl}" NXF_VER=${version} NXF_MODE="ignite" NXF_EXECUTOR="ignite" bash nextflow info
chmod +x $HOME/nextflow
$HOME/nextflow -download
fi

# pull the nextflow pipeline repo
[[ '!{nextflow.pull}' ]] && $HOME/nextflow pull '!{nextflow.pull}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ nxf_kill() {
}

nxf_mktemp() {
local base=${1:-/tmp}
local base={{temp_dir}}
if [[ $(uname) = Darwin ]]; then mktemp -d $base/nxf.XXXXXXXXXX
else TMPDIR="$base" mktemp -d -t nxf.XXXXXXXXXX
fi
Expand Down
4 changes: 2 additions & 2 deletions modules/nf-commons/src/main/nextflow/Const.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ class Const {
/**
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether this is needed

* The app build time as linux/unix timestamp
*/
static public final long APP_TIMESTAMP = 1621005654076
static public final long APP_TIMESTAMP = 1670410083094

/**
* The app build number
*/
static public final int APP_BUILDNUM = 5556
static public final int APP_BUILDNUM = 5566


/**
Expand Down
11 changes: 6 additions & 5 deletions plugins/nf-amazon/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ dependencies {
compileOnly 'org.pf4j:pf4j:3.4.1'

compile ('io.nextflow:nxf-s3fs:1.1.0') { transitive = false }
compile ('com.amazonaws:aws-java-sdk-s3:1.11.542')
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already packages are using 1.12.129

compile ('com.amazonaws:aws-java-sdk-ec2:1.11.542')
compile ('com.amazonaws:aws-java-sdk-batch:1.11.542')
compile ('com.amazonaws:aws-java-sdk-iam:1.11.542')
compile ('com.amazonaws:aws-java-sdk-ecs:1.11.542')
compile ('com.amazonaws:aws-java-sdk-s3:1.12.129')
compile ('com.amazonaws:aws-java-sdk-ec2:1.12.129')
compile ('com.amazonaws:aws-java-sdk-batch:1.12.129')
compile ('com.amazonaws:aws-java-sdk-iam:1.12.129')
compile ('com.amazonaws:aws-java-sdk-ecs:1.12.129')
compile ('com.amazonaws:aws-java-sdk-sts:1.12.129')

testImplementation(testFixtures(project(":nextflow")))
testImplementation project(':nextflow')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@

package nextflow.cloud.aws

import com.amazonaws.AmazonClientException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.auth.BasicSessionCredentials
import com.amazonaws.regions.InstanceMetadataRegionProvider
import com.amazonaws.regions.Region
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.batch.AWSBatchClient
import com.amazonaws.services.ec2.AmazonEC2Client
import com.amazonaws.services.ecs.AmazonECS
import com.amazonaws.services.ecs.AmazonECSClientBuilder
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -132,37 +136,21 @@ class AmazonClientFactory {
* The IAM role name associated to this instance or {@code null} if no role is defined or
* it's not a EC2 instance
*/
protected String fetchIamRole() {
private String fetchIamRole() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nf-amazon-1.12.0 already have this

try {
def role = getUrl('http://169.254.169.254/latest/meta-data/iam/security-credentials/').readLines()
if( role.size() != 1 )
throw new IllegalArgumentException("Not a valid EC2 IAM role")
return role.get(0)
def stsClient = AWSSecurityTokenServiceClientBuilder.defaultClient();
def roleArn = stsClient.getCallerIdentity(new GetCallerIdentityRequest()).getArn()
if(roleArn){
return roleArn.split('/')[-2]
}
return null
}
catch( IOException e ) {
catch( AmazonClientException e ) {
log.trace "Unable to fetch IAM credentials -- Cause: ${e.message}"
return null
}
}

/**
* Fetch a remote URL resource text content
*
* @param path
* A valid http/https resource URL
* @param timeout
* Max connection timeout in millis
* @return
* The resource URL content
*/
protected String getUrl(String path, int timeout=150) {
final url = new URL(path)
final con = url.openConnection()
con.setConnectTimeout(timeout)
con.setReadTimeout(timeout)
return con.getInputStream().text.trim()
}

/**
* Retrieve the AWS region from the EC2 instance metadata.
* See http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
Expand All @@ -171,12 +159,11 @@ class AmazonClientFactory {
* The AWS region of the current EC2 instance eg. {@code eu-west-1} or
* {@code null} if it's not an EC2 instance.
*/
protected String fetchRegion() {
private String fetchRegion() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nf-amazon-1.12.0 already this

try {
def zone = getUrl('http://169.254.169.254/latest/meta-data/placement/availability-zone')
zone ? zone.substring(0,zone.length()-1) : null
return new InstanceMetadataRegionProvider().getRegion()
}
catch (IOException e) {
catch (AmazonClientException e) {
log.debug "Cannot fetch AWS region", e
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {

protected void validateWorkDir() {
/*
* make sure the work dir is a S3 bucket
* make sure the work dir is a S3 bucket and if we are not usign lustre fsx
*/
if( !(workDir instanceof S3Path) ) {
def isUsingLustre = session.config.navigate('cloud.fsxFileSystemsMountCommands')
log.debug "Checking workdir validation, isUsingLustre $isUsingLustre"
if( !(workDir instanceof S3Path) && !isUsingLustre ) {
session.abort()
throw new AbortOperationException("When using `$name` executor a S3 bucket must be provided as working directory either using -bucket-dir or -work-dir command line option")
throw new AbortOperationException("When using `$name` executor and we are not using Lustre storage a S3 bucket must be provided as working directory either using -bucket-dir or -work-dir command line option")
}
}

Expand Down Expand Up @@ -251,6 +253,20 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
@PackageScope
ThrottlingExecutor getReaper() { reaper }

String getInstanceIdByQueueAndTaskArn(String queue, String taskArn) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this getInstanceIdByQueueAndTaskArn function is not required, as we are moving towards nf-monitor plugin

try {
return helper?.getInstanceIdByQueueAndTaskArn(queue, taskArn)
}
catch ( AccessDeniedException e ) {
log.warn "Unable to retrieve AWS Batch instance Id | ${e.message}"
return null
}
catch( Exception e ) {
log.warn "Unable to retrieve AWS batch instance id for queue=$queue; task=$taskArn | ${e.message}", e
return null
}
}


CloudMachineInfo getMachineInfoByQueueAndTaskArn(String queue, String taskArn) {
try {
Expand All @@ -267,14 +283,4 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint {
return null
}
}

}









Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
// when a remote bin directory is provide managed it properly
if( opts.remoteBinDir ) {
result << "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommandWhenUsingS3 = "${opts.getAwsCli()} s3 cp --recursive --only-show-errors s3:/${opts.remoteBinDir} \$PWD/nextflow-bin\n"
final copyCommand = isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3

result << copyCommand
result << "chmod +x \$PWD/nextflow-bin/*\n"
result << "export PATH=\$PWD/nextflow-bin:\$PATH\n"
}
Expand All @@ -82,6 +87,11 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {

@Override
String getStageInputFilesScript(Map<String,Path> inputFiles) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
log.trace "[USING LUSTRE FSX] stage_inputs."
return super.getStageInputFilesScript(inputFiles) + '\n'
}
def result = 'downloads=()\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
Expand All @@ -93,6 +103,10 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String stageInputFile( Path path, String targetName ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
if( isUsingLustreFsx ) {
return "cp -r ${Escape.path(path)} ${Escape.path(targetName)}"
}
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = opts.maxTransferAttempts > 1
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
Expand All @@ -117,6 +131,20 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
for( String it : patterns )
escape.add( Escape.path(it) )

def isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()

if ( isUsingLustreFsx ) {
log.trace "[USING LUSTRE FSX] unstage_outputs."
return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("cp -r '\$name' ${Escape.path(targetDir)}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
""".stripIndent(true)
}
return """\
uploads=()
IFS=\$'\\n'
Expand All @@ -128,13 +156,23 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
""".stripIndent(true)
}

@Override
String getTempDir( Path targetDir ) {
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
return isUsingLustreFsx ? "${Escape.path(targetDir)}" : super.getTempDir(targetDir)
}

/**
* {@inheritDoc}
*/
@Override
String touchFile( Path file ) {
final aws = opts.getAwsCli()
"echo start | $aws s3 cp --only-show-errors - s3:/${Escape.path(file)}"
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final touchCommandWhenUsingLustre = "echo start > ${Escape.path(file)}"
final touchCommandWhenUsingS3 = "echo start | $aws s3 cp --only-show-errors $encryption - s3:/${Escape.path(file)}"
return isUsingLustreFsx ? touchCommandWhenUsingLustre : touchCommandWhenUsingS3
}

/**
Expand All @@ -150,15 +188,22 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String copyFile( String name, Path target ) {
"nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final copyCommandWhenUsingLustre = "cp -r ${Escape.path(name)} ${Escape.path(target.getParent())}"
final copyCommandWhenUsingS3 = "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
return isUsingLustreFsx ? copyCommandWhenUsingLustre : copyCommandWhenUsingS3
}

/**
* {@inheritDoc}
*/
String exitFile( Path path ) {
final aws = opts.getAwsCli()
"| $aws s3 cp --only-show-errors - s3:/${Escape.path(path)} || true"
def encryption = opts.storageEncryption ? "--sse $opts.storageEncryption " : ''
final isUsingLustreFsx = !opts.getFsxFileSystemsMountCommands().isEmpty()
final exitCommandWhenUsingLustre = "> ${Escape.path(path)}"
final exitCommandWhenUsingS3 = "| $aws s3 cp --only-show-errors $encryption - s3:/${Escape.path(path)} || true"
return isUsingLustreFsx ? exitCommandWhenUsingLustre : exitCommandWhenUsingS3
}

/**
Expand Down
Loading
Loading