Skip to content

Commit

Permalink
Feat: implements nextflow_version new parameter for job submission (#171
Browse files Browse the repository at this point in the history
)

* fix AMI

* adds nextflow-version

* system tools check

* changelog

* trailing /

* fix url

* fix pytests

* readme

* fix url

* messages

* messages
  • Loading branch information
dapineyro authored Nov 28, 2024
1 parent 5b875ad commit 25d76d1
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 14 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
## lifebit-ai/cloudos-cli: changelog

## v2.12.0 (2024-11-27)

### Feature

- Adds the new parameter `--nextflow-version` to select the Nextflow version for job submissions.
- Now `--cloudos-url` can also take URLs with a trailing `/`

## v2.11.2 (2024-11-6)

### Fix
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ Options:
to include.
--nextflow-profile TEXT A comma separated string indicating the
nextflow profile/s to use with your job.
--nextflow-version [22.10.8|24.04.4|latest]
Nextflow version to use when executing the
workflow in CloudOS. Please, note that
versions above 22.10.8 are only DSL2
compatible. Default=22.10.8.
--git-commit TEXT The exact whole 40 character commit hash to
run for the selected pipeline. If not
specified it defaults to the last commit of
Expand Down
48 changes: 40 additions & 8 deletions cloudos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ def queue():
@click.option('--nextflow-profile',
help=('A comma separated string indicating the nextflow profile/s ' +
'to use with your job.'))
@click.option('--nextflow-version',
help=('Nextflow version to use when executing the workflow in CloudOS. ' +
'Please, note that versions above 22.10.8 are only DSL2 compatible. ' +
'Default=22.10.8.'),
type=click.Choice(['22.10.8', '24.04.4', 'latest']),
default='22.10.8')
@click.option('--git-commit',
help=('The exact whole 40 character commit hash to run for ' +
'the selected pipeline. ' +
Expand Down Expand Up @@ -237,6 +243,7 @@ def run(apikey,
ignite,
job_queue,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand All @@ -255,11 +262,11 @@ def run(apikey,
disable_ssl_verification,
ssl_cert):
"""Submit a job to CloudOS."""
print('Executing run...')
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
if spot:
print('\n[Message] You have specified spot instances but they are no longer available ' +
'in CloudOS. Option ignored.\n')
print('[Message] You have specified spot instances but they are no longer available ' +
'in CloudOS. Option ignored.')
if do_not_save_logs:
save_logs = False
else:
Expand All @@ -275,13 +282,13 @@ def run(apikey,
batch = None
elif ignite:
batch = None
print('\n[Warning] You have specified ignite executor. Please, note that ignite is being ' +
print('[Warning] You have specified ignite executor. Please, note that ignite is being ' +
'removed from CloudOS, so the command may fail. Check ignite availability in your ' +
'CloudOS\n')
'CloudOS')
else:
batch = True
if execution_platform == 'hpc':
print('\nHPC execution platform selected')
print('\n[Message] HPC execution platform selected')
if hpc_id is None:
raise ValueError('Please, specify your HPC ID using --hpc parameter')
print('[Message] Please, take into account that HPC execution do not support ' +
Expand All @@ -304,7 +311,7 @@ def run(apikey,
raise ValueError(f'The workflow {workflow_name} is a WDL workflow. ' +
'WDL is not supported on HPC execution platform.')
if workflow_type == 'wdl':
print('\tWDL workflow detected\n')
print('[Message] WDL workflow detected')
if wdl_mainfile is None:
raise ValueError('Please, specify WDL mainFile using --wdl-mainfile <mainFile>.')
c_status = cl.get_cromwell_status(workspace_id, verify_ssl)
Expand Down Expand Up @@ -346,15 +353,29 @@ def run(apikey,
print('\t...Sending job to CloudOS\n')
if is_module:
if job_queue is not None:
print(f'\tIgnoring job queue "{job_queue}" for ' +
print(f'[Message] Ignoring job queue "{job_queue}" for ' +
f'Platform Workflow "{workflow_name}". Platform Workflows ' +
'use their own predetermined queues.')
job_queue_id = None
if nextflow_version != '22.10.8':
print(f'[Message] The selected worflow \'{workflow_name}\' ' +
'is a CloudOS module. CloudOS modules only work with ' +
'Nextflow version 22.10.8. Switching to use 22.10.8')
nextflow_version = '22.10.8'
else:
queue = Queue(cloudos_url=cloudos_url, apikey=apikey, cromwell_token=cromwell_token,
workspace_id=workspace_id, verify=verify_ssl)
job_queue_id = queue.fetch_job_queue_id(workflow_type=workflow_type, batch=batch,
job_queue=job_queue)
if nextflow_version == 'latest':
nextflow_version = '24.04.4'
print('[Message] You have specified Nextflow version \'latest\'. The workflow will use the ' +
f'latest version available on CloudOS: {nextflow_version}.')
if nextflow_version != '22.10.8':
print(f'[Warning] You have specified Nextflow version {nextflow_version}. This version requires the pipeline ' +
'to be written in DSL2 and does not support DSL1.')
print('\nExecuting run...')
print(f'\tNextflow version: {nextflow_version}')
j_id = j.send_job(job_config=job_config,
parameter=parameter,
git_commit=git_commit,
Expand All @@ -365,6 +386,7 @@ def run(apikey,
batch=batch,
job_queue_id=job_queue_id,
nextflow_profile=nextflow_profile,
nextflow_version=nextflow_version,
instance_type=instance_type,
instance_disk=instance_disk,
storage_mode=storage_mode,
Expand Down Expand Up @@ -515,6 +537,7 @@ def run_curated_examples(apikey,
NOTE that currently, only Nextflow workflows are supported.
"""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
cl = Cloudos(cloudos_url, apikey, None)
curated_workflows = cl.get_curated_workflow_list(workspace_id, verify=verify_ssl)
Expand Down Expand Up @@ -628,6 +651,7 @@ def job_status(apikey,
disable_ssl_verification,
ssl_cert):
"""Check job status in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
print('Executing status...')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
if verbose:
Expand Down Expand Up @@ -702,6 +726,7 @@ def list_jobs(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all your jobs from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -783,6 +808,7 @@ def list_workflows(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all workflows from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -859,6 +885,7 @@ def import_workflows(apikey,
disable_ssl_verification,
ssl_cert):
"""Imports workflows to CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
print('Executing workflow import...\n')
print('\t[Message] Only Nextflow workflows are currently supported.\n')
Expand Down Expand Up @@ -920,6 +947,7 @@ def list_projects(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all projects from a CloudOS workspace in CSV format."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down Expand Up @@ -985,6 +1013,7 @@ def cromwell_status(apikey,
disable_ssl_verification,
ssl_cert):
"""Check Cromwell server status in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
print('Executing status...')
Expand Down Expand Up @@ -1040,6 +1069,7 @@ def cromwell_restart(apikey,
disable_ssl_verification,
ssl_cert):
"""Restart Cromwell server in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
Expand Down Expand Up @@ -1112,6 +1142,7 @@ def cromwell_stop(apikey,
disable_ssl_verification,
ssl_cert):
"""Stop Cromwell server in CloudOS."""
cloudos_url = cloudos_url.rstrip('/')
if apikey is None and cromwell_token is None:
raise ValueError("Please, use one of the following tokens: '--apikey', '--cromwell_token'")
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
Expand Down Expand Up @@ -1172,6 +1203,7 @@ def list_queues(apikey,
disable_ssl_verification,
ssl_cert):
"""Collect all available job queues from a CloudOS workspace."""
cloudos_url = cloudos_url.rstrip('/')
verify_ssl = ssl_selector(disable_ssl_verification, ssl_cert)
outfile = output_basename + '.' + output_format
print('Executing list...')
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.11.2'
__version__ = '2.12.0'
12 changes: 10 additions & 2 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def convert_nextflow_to_json(self,
batch,
job_queue_id,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand Down Expand Up @@ -242,8 +243,10 @@ def convert_nextflow_to_json(self,
Job queue Id to use in the batch job.
nextflow_profile: string
A comma separated string with the profiles to be used.
nextflow_version: string
Nextflow version to use when executing the workflow in CloudOS.
instance_type : string
Name of the AMI to choose.
Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge
instance_disk : int
The disk space of the instance, in GB.
storage_mode : string
Expand Down Expand Up @@ -383,6 +386,7 @@ def convert_nextflow_to_json(self,
"project": project_id,
"workflow": workflow_id,
"name": job_name,
"nextflowVersion": nextflow_version,
"resumable": resumable,
"saveProcessLogs": save_logs,
"batch": {
Expand Down Expand Up @@ -425,6 +429,7 @@ def send_job(self,
batch=True,
job_queue_id=None,
nextflow_profile=None,
nextflow_version='22.10.8',
instance_type='c5.xlarge',
instance_disk=500,
storage_mode='regular',
Expand Down Expand Up @@ -467,8 +472,10 @@ def send_job(self,
Job queue Id to use in the batch job.
nextflow_profile: string
A comma separated string with the profiles to be used.
nextflow_version: string
Nextflow version to use when executing the workflow in CloudOS.
instance_type : string
Type of the AMI to choose.
Name of the instance type to be used for the job master node, for example for AWS EC2 c5.xlarge
instance_disk : int
The disk space of the instance, in GB.
storage_mode : string
Expand Down Expand Up @@ -520,6 +527,7 @@ def send_job(self,
batch,
job_queue_id,
nextflow_profile,
nextflow_version,
instance_type,
instance_disk,
storage_mode,
Expand Down
4 changes: 2 additions & 2 deletions cloudos/queue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ def fetch_job_queue_id(self, workflow_type, batch=True, job_queue=None):
default_queue_name = available_queues[-1]['label']
queue_as_default = 'most recent suitable'
if job_queue is None:
print(f'\tNo job_queue was specified, using the {queue_as_default} queue: ' +
print(f'[Message] No job_queue was specified, using the {queue_as_default} queue: ' +
f'{default_queue_name}.')
return default_queue_id
selected_queue = [q for q in available_queues if q['label'] == job_queue]
if len(selected_queue) == 0:
print(f'\tQueue \'{job_queue}\' you specified was not found, using the {queue_as_default} ' +
print(f'[Message] Queue \'{job_queue}\' you specified was not found, using the {queue_as_default} ' +
f'queue instead: {default_queue_name}.')
return default_queue_id
return selected_queue[0]['id']
2 changes: 1 addition & 1 deletion tests/test_data/convert_nextflow_to_json_params.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
3 changes: 3 additions & 0 deletions tests/test_jobs/test_convert_nextflow_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"batch": False,
"job_queue_id": None,
"nextflow_profile": None,
"nextflow_version": '22.10.8',
"instance_type": "c5.xlarge",
"instance_disk": 500,
"storage_mode": 'regular',
Expand Down Expand Up @@ -46,6 +47,7 @@ def test_convert_nextflow_to_json_output_correct():
batch=param_dict["batch"],
job_queue_id=param_dict["job_queue_id"],
nextflow_profile=param_dict["nextflow_profile"],
nextflow_version=param_dict["nextflow_version"],
instance_type=param_dict["instance_type"],
instance_disk=param_dict["instance_disk"],
storage_mode=param_dict["storage_mode"],
Expand Down Expand Up @@ -78,6 +80,7 @@ def test_convert_nextflow_to_json_badly_formed_config():
batch=param_dict["batch"],
job_queue_id=param_dict["job_queue_id"],
nextflow_profile=param_dict["nextflow_profile"],
nextflow_version=param_dict["nextflow_version"],
instance_type=param_dict["instance_type"],
instance_disk=param_dict["instance_disk"],
storage_mode=param_dict["storage_mode"],
Expand Down

0 comments on commit 25d76d1

Please sign in to comment.