Skip to content

inject study_type in EBI and improvements to current automatic processing pipeline #3023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions qiita_ware/ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ def generate_study_xml(self):
study_title = ET.SubElement(descriptor, 'STUDY_TITLE')
study_title.text = escape(clean_whitespace(self.study_title))

# study type is deprecated and not displayed anywhere on EBI-ENA;
# however it's required for submission so just injecting with Other
ET.SubElement(
descriptor, 'STUDY_TYPE', {'existing_study_type': 'Other'})

study_abstract = ET.SubElement(descriptor, 'STUDY_ABSTRACT')
study_abstract.text = clean_whitespace(escape(self.study_abstract))

Expand Down
1 change: 1 addition & 0 deletions qiita_ware/test/test_ebi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ def test_parse_EBI_reply(self):
<STUDY_TITLE>
Identification of the Microbiomes for Cannabis Soils
</STUDY_TITLE>
<STUDY_TYPE existing_study_type="Other" />
<STUDY_ABSTRACT>
This is a preliminary study to examine the microbiota associated with \
the Cannabis plant. Soils samples from the bulk soil, soil associated with \
Expand Down
77 changes: 55 additions & 22 deletions scripts/qiita-auto-processing
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ user = User('antoniog@ucsd.edu')
# 'version': the version of the plugin,
# 'cmd_name': the command we want to run,
# 'input_name': the name of the input parameter of that command
# 'ignore_parameters': list of parameters to ignore, for example: threads
# 'parent_artifact_name': name of the parent output, input for this command
# 'parameters_names': list of the names of the parameter sets we want to run
# }
Expand All @@ -41,21 +42,24 @@ full_pipelines = [
'data_type': ['Metagenomic'],
'artifact_type': 'per_sample_FASTQ',
'previous-step': None,
'requirements': dict(),
'steps': [
{'previous-step': None,
'plugin': 'qp-shogun',
'version': '012020',
'cmd_name': 'Atropos v1.1.24',
'input_name': 'input',
'ignore_parameters': ['Number of threads used'],
'parent_artifact_name': None,
'parameters_names': ['KAPA HyperPlus with iTru']},
{'previous-step': 'Atropos v1.1.24',
'plugin': 'qp-shogun',
'version': '012020',
'cmd_name': 'Shogun v1.0.7',
'version': '072020',
'cmd_name': 'Shogun v1.0.8',
'input_name': 'input',
'ignore_parameters': ['Number of threads'],
'parent_artifact_name': 'Adapter trimmed files',
'parameters_names': ['wol_bowtie2', 'rep94_bowtie2']}
'parameters_names': ['wol_bowtie2', 'rep200_bowtie2']}
]},
{'name': 'Target Gene Processing',
'data_type': ['16S', '18S', 'ITS'],
Expand All @@ -73,6 +77,7 @@ full_pipelines = [
'version': '1.9.1',
'cmd_name': 'Trimming',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': None,
'parameters_names': ['90 base pairs',
'100 base pairs',
Expand All @@ -83,13 +88,15 @@ full_pipelines = [
'version': '1.9.1',
'cmd_name': 'Pick closed-reference OTUs',
'input_name': 'input_data',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults - parallel']},
{'previous-step': 'Trimming',
'plugin': 'deblur',
'version': '1.1.0',
'cmd_name': 'Deblur',
'input_name': 'Demultiplexed sequences',
'ignore_parameters': [],
'parent_artifact_name': 'Trimmed Demultiplexed',
'parameters_names': ['Defaults']}
]},
Expand Down Expand Up @@ -122,6 +129,22 @@ def _check_requirements(requirements, template):
return satisfied


def _check_parameters(jobs, cmd):
params = [{k: str(v) for k, v in j.parameters.values.items()
if k not in cmd['ignore_parameters']} for j in jobs]
return params


def _submit_workflows(artifact_process):
for artifact in artifact_process:
if artifact['workflow'] is None:
continue
# nodes will return in position [0] the first job created
first_job = list(artifact['workflow'].graph.nodes())[0]
if first_job.status == 'in_construction':
artifact['workflow'].submit()


# Step 1. Loop over the full_pipelines to process each step
for pipeline in full_pipelines:
# Step 2. From the steps generate the list of commands to add to the
Expand Down Expand Up @@ -149,6 +172,7 @@ for pipeline in full_pipelines:
'previous-step': step['previous-step'],
'parent_artifact_name': step['parent_artifact_name'],
'input_name': step['input_name'],
'ignore_parameters': step['ignore_parameters'],
'parameters': parameters})

# Step 2. - for children. Get their commands. We currently only support
Expand All @@ -161,7 +185,9 @@ for pipeline in full_pipelines:
if c['previous-step'] == commands[0]['command-name']]

# Step 3. Find all preparations/artifacts that we can add the pipeline
artifacts_all = [a for study in Study.iter()
# ... as a first pass we will only process study 10317 (AGP) ...
# artifacts_all = [a for study in Study.iter()
artifacts_all = [a for study in [Study(10317)]
# loop over all artifacts of artifact_type with in study
for a in study.artifacts(
artifact_type=pipeline['artifact_type'])
Expand All @@ -172,7 +198,10 @@ for pipeline in full_pipelines:
artifacts_compliant = []
for a in artifacts_all:
st = a.study.sample_template
pt = a.prep_templates[0]
pts = a.prep_templates
if not pts:
continue
pt = pts[0]

# {'sandbox', 'awaiting_approval', 'private', 'public'}
if a.visibility in ('sandbox', 'awaiting_approval'):
Expand All @@ -194,23 +223,29 @@ for pipeline in full_pipelines:
# of Step 4 but for debugging it makes sense to separate
artifact_process = []
children_compliant = []
cmd = commands[0]
for a in artifacts_compliant:
cmd = commands[0]
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = [j.parameters.values for j in jobs]
params = _check_parameters(jobs, cmd)

# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(a.id)})
if p not in params:
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]
if p_to_compare not in params:
missing_parameters.append(p)
else:
for c in a.children:
if c.processing_parameters.values == p:
children_compliant.append(c)
cpp = c.processing_parameters
if cpp.command.name == cmd['command-name']:
cparams = _check_parameters([cpp], cmd)
if cparams == p_to_compare:
children_compliant.append(c)
if missing_parameters:
# note that we are building a dict for each artifact so we can
# save the workflow id, useful for when we run this in a terminal
Expand All @@ -224,14 +259,18 @@ for pipeline in full_pipelines:
for cmd_id, cmd in enumerate(children_cmds):
# getting all jobs, includen hiddens, in case the job failed
jobs = a.jobs(cmd=cmd['command'], show_hidden=True)
params = [j.parameters.values for j in jobs]
params = _check_parameters(jobs, cmd)

# checking that all required parameters of this command exist
missing_parameters = []
for p in cmd['parameters']:
p = p['values']
p.update({cmd['input_name']: str(c.id)})
if p not in params:
p.update({cmd['input_name']: str(a.id)})
p_to_compare = p.copy()
for k in cmd['ignore_parameters']:
del p_to_compare[k]

if p_to_compare not in params:
missing_parameters.append(p)
if missing_parameters:
artifact_process.append(
Expand Down Expand Up @@ -266,9 +305,9 @@ for pipeline in full_pipelines:
# now we can add the rest of the parameters to the workflow for
# the first command
for params in artifact['missing_parameters'][1:]:
params.update({cmd['input_name']: str(a.id)})
job_params = Parameters.load(cmd['command'], values_dict=params)
artifact['workflow'].add(job_params)
artifact['workflow'].add(
job_params, req_params={cmd['input_name']: str(a.id)})

for cmd in commands[cmd_id + 1:]:
# get jobs from the workflow to which we can add this new command
Expand All @@ -286,10 +325,4 @@ for pipeline in full_pipelines:
cmd['parent_artifact_name']: cmd['input_name']}})

# Step 7. submit the workflows!
for artifact in artifact_process:
if artifact['workflow'] is None:
continue
# nodes will return in position [0] the first job created
first_job = list(artifact['workflow'].graph.nodes())[0]
if first_job.status == 'in_construction':
artifact['workflow'].submit()
_submit_workflows(artifact_process)