Skip to content

Commit

Permalink
Replace collection elements with re-run output
Browse files Browse the repository at this point in the history
This specifically addresses the problem where some jobs of a mapped-over
collection have failed. Instead of filtering the failed collection and
restarting the workflow at this position (involving a lot of copy-paste ...)
the user can now limit the rerun to the problematic jobs and the workflow
should resume from there.
Should fix galaxyproject#2235.

This is one possible implementation, it would also be feasible to not
manipulate the original collection, but to copy the HDCA and then to replace
collection elements and replace all references for jobs that depend on the HDCA,
as we do for HDAs. This implementation seems simpler, but let me know if you
see problems with this approach.
  • Loading branch information
mvdbeek committed Dec 31, 2017
1 parent 9fdf0a6 commit c0dbace
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,7 @@ def pause(self, job=None, message=None):
dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED
dataset_assoc.dataset.info = message
self.sa_session.add(dataset_assoc.dataset)
log.debug("Pausing Job '%d', %s", job.id, message)
job.set_state(job.states.PAUSED)
self.sa_session.add(job)

Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3139,6 +3139,13 @@ def copy(self, destination=None, element_destination=None):
object_session(self).flush()
return new_collection

def replace_failed_elements(self, replacements):
for element in self.elements:
if element.element_object in replacements:
if element.element_type == 'hda':
element.hda = replacements[element.element_object]
# TODO: handle the case where elements are collections

def set_from_dict(self, new_data):
# Nothing currently editable in this class.
return {}
Expand Down
11 changes: 11 additions & 0 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
# Duplicate PJAs before remap.
for pjaa in old_job.post_job_actions:
current_job.add_post_job_action(pjaa.post_job_action)
remapped_hdas = {}
input_hdcas = set()
for jtod in old_job.output_datasets:
for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]:
if (trans.user is not None and job_to_remap.user_id == trans.user.id) or (
Expand All @@ -520,6 +522,9 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
hda.state = hda.states.NEW
hda.info = None
input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters])
remapped_hdas[jtod.dataset] = out_data[jtod.name]
for jtidca in job_to_remap.input_dataset_collections:
input_hdcas.add(jtidca.dataset_collection)
old_dataset_id = jtod.dataset_id
new_dataset_id = out_data[jtod.name].id
input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda')
Expand All @@ -530,6 +535,12 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current
log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id))
trans.sa_session.add(job_to_remap)
trans.sa_session.add(jtid)
for hdca in input_hdcas:
hdca.collection.replace_failed_elements(remapped_hdas)
if hdca.implicit_collection_jobs:
for job in hdca.implicit_collection_jobs.jobs:
if job.job_id == old_job.id:
job.job_id = current_job.id
jtod.dataset.visible = False
trans.sa_session.add(jtod)
except Exception:
Expand Down
48 changes: 48 additions & 0 deletions test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,54 @@ def test_workflow_resume_from_failed_step_with_hdca_input(self):
assert_ok=False)
assert unpaused_dataset['state'] == 'ok'

@skip_without_tool("fail_identifier")
@skip_without_tool("identifier_multiple_in_conditional")
def test_workflow_resume_with_mapped_over_input(self):
with self.dataset_populator.test_history() as history_id:
job_summary = self._run_jobs("""
class: GalaxyWorkflow
steps:
- label: input_datasets
type: input_collection
- label: fail_identifier_1
tool_id: fail_identifier
state:
input1:
$link: input_datasets
failbool: true
- tool_id: identifier_collection
state:
input1:
$link: fail_identifier_1#out_file1
test_data:
input_datasets:
type: list
elements:
- identifier: fail
value: 1.fastq
type: File
- identifier: success
value: 1.fastq
type: File
""", history_id=history_id, assert_ok=False, wait=False)
self.wait_for_invocation_and_jobs(history_id, job_summary.workflow_id, job_summary.invocation_id, assert_ok=False)
history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json()
paused_dataset = history_contents[-1]
failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False)
assert paused_dataset['state'] == 'paused', paused_dataset
assert failed_dataset['state'] == 'error', failed_dataset
inputs = {"input1": {'values': [{'src': 'hda',
'id': history_contents[0]['id']}]
},
"failbool": "false",
"rerun_remap_job_id": failed_dataset['creating_job']}
self.dataset_populator.run_tool(tool_id='fail_identifier',
inputs=inputs,
history_id=history_id,
assert_ok=True)
unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False)
assert unpaused_dataset['state'] == 'ok'

@skip_without_tool("collection_creates_pair")
def test_workflow_run_output_collection_mapping(self):
workflow_id = self._upload_yaml_workflow("""
Expand Down
10 changes: 5 additions & 5 deletions test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,19 +275,19 @@ def get_history_dataset_content(self, history_id, wait=True, filename=None, **kw
data = {}
if filename:
data["filename"] = filename
display_response = self.__get_contents_request(history_id, "/%s/display" % dataset_id, data=data)
display_response = self._get_contents_request(history_id, "/%s/display" % dataset_id, data=data)
assert display_response.status_code == 200, display_response.content
return display_response.content

def get_history_dataset_details(self, history_id, **kwds):
dataset_id = self.__history_content_id(history_id, **kwds)
details_response = self.__get_contents_request(history_id, "/datasets/%s" % dataset_id)
details_response = self._get_contents_request(history_id, "/datasets/%s" % dataset_id)
assert details_response.status_code == 200
return details_response.json()

def get_history_collection_details(self, history_id, **kwds):
hdca_id = self.__history_content_id(history_id, **kwds)
details_response = self.__get_contents_request(history_id, "/dataset_collections/%s" % hdca_id)
details_response = self._get_contents_request(history_id, "/dataset_collections/%s" % hdca_id)
assert details_response.status_code == 200, details_response.content
return details_response.json()

Expand Down Expand Up @@ -320,7 +320,7 @@ def __history_content_id(self, history_id, wait=True, **kwds):
history_content_id = kwds["dataset"]["id"]
else:
hid = kwds.get("hid", None) # If not hid, just grab last dataset
history_contents = self.__get_contents_request(history_id).json()
history_contents = self._get_contents_request(history_id).json()
if hid:
history_content_id = None
for history_item in history_contents:
Expand All @@ -333,7 +333,7 @@ def __history_content_id(self, history_id, wait=True, **kwds):
history_content_id = history_contents[-1]["id"]
return history_content_id

def __get_contents_request(self, history_id, suffix="", data={}):
def _get_contents_request(self, history_id, suffix="", data={}):
url = "histories/%s/contents" % history_id
if suffix:
url = "%s%s" % (url, suffix)
Expand Down
21 changes: 21 additions & 0 deletions test/functional/tools/fail_identifier.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<tool id="fail_identifier" name="Fail input with identifier that contains fail">
<command><![CDATA[
#if $failbool and 'fail' in $input1.element_identifier
sh -c "exit 127"
#else
cp '$input1' '$out_file1'
#end if
]]></command>
<inputs>
<param name="input1" type="data" label="An input file" />
<param name="failbool" type="boolean" label="The failure property" checked="false" />
</inputs>
<outputs>
<data name="out_file1" format="data"/>
</outputs>
<stdio>
<exit_code range="127" level="fatal" description="Failing exit code." />
</stdio>
<help>
</help>
</tool>
1 change: 1 addition & 0 deletions test/functional/tools/samples_tool_conf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
<tool file="identifier_multiple_in_repeat.xml" />
<tool file="identifier_collection.xml" />
<tool file="identifier_in_actions.xml" />
<tool file="fail_identifier.xml" />
<tool file="tool_directory.xml" />
<tool file="output_action_change_format.xml" />
<tool file="collection_paired_test.xml" />
Expand Down

0 comments on commit c0dbace

Please sign in to comment.