From c0dbace7bbf33da15ba1fdb9f69777a3c881cecd Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sat, 30 Dec 2017 19:42:24 +0200 Subject: [PATCH] Replace collection elements with re-run output This specifically addresses the problem where some jobs of a mapped-over collection have failed. Instead of filtering the failed collection and restarting the workflow at this position (involving a lot of copy-paste ...) the user can now limit the rerun to the problematic jobs and the workflow should resume from there. Should fix https://github.com/galaxyproject/galaxy/issues/2235. This is one possible implementation, it would also be feasible to not manipulate the original collection, but to copy the HDCA and then to replace collection elements and replace all references for jobs that depend on the HDCA, as we do for HDAs. This implementation seems simpler, but let me know if you see problems with this approach. --- lib/galaxy/jobs/__init__.py | 1 + lib/galaxy/model/__init__.py | 7 +++ lib/galaxy/tools/actions/__init__.py | 11 +++++ test/api/test_workflows.py | 48 +++++++++++++++++++++ test/base/populators.py | 10 ++--- test/functional/tools/fail_identifier.xml | 21 +++++++++ test/functional/tools/samples_tool_conf.xml | 1 + 7 files changed, 94 insertions(+), 5 deletions(-) create mode 100644 test/functional/tools/fail_identifier.xml diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 82107f160940..7329f95546f9 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1030,6 +1030,7 @@ def pause(self, job=None, message=None): dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED dataset_assoc.dataset.info = message self.sa_session.add(dataset_assoc.dataset) + log.debug("Pausing Job '%d', %s", job.id, message) job.set_state(job.states.PAUSED) self.sa_session.add(job) diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 225c82054997..68610b8eabd3 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -3139,6 +3139,13 @@ def copy(self, destination=None, element_destination=None): object_session(self).flush() return new_collection + def replace_failed_elements(self, replacements): + for element in self.elements: + if element.element_object in replacements: + if element.element_type == 'hda': + element.hda = replacements[element.element_object] + # TODO: handle the case where elements are collections + def set_from_dict(self, new_data): # Nothing currently editable in this class. return {} diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 737ca4658044..a0088e8344e3 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -509,6 +509,8 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current # Duplicate PJAs before remap. for pjaa in old_job.post_job_actions: current_job.add_post_job_action(pjaa.post_job_action) + remapped_hdas = {} + input_hdcas = set() for jtod in old_job.output_datasets: for (job_to_remap, jtid) in [(jtid.job, jtid) for jtid in jtod.dataset.dependent_jobs]: if (trans.user is not None and job_to_remap.user_id == trans.user.id) or ( @@ -520,6 +522,9 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current hda.state = hda.states.NEW hda.info = None input_values = dict([(p.name, json.loads(p.value)) for p in job_to_remap.parameters]) + remapped_hdas[jtod.dataset] = out_data[jtod.name] + for jtidca in job_to_remap.input_dataset_collections: + input_hdcas.add(jtidca.dataset_collection) old_dataset_id = jtod.dataset_id new_dataset_id = out_data[jtod.name].id input_values = update_dataset_ids(input_values, {old_dataset_id: new_dataset_id}, src='hda') @@ -530,6 +535,12 @@ def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current log.info('Job %s input HDA %s remapped to new HDA %s' % (job_to_remap.id, jtod.dataset.id, jtid.dataset.id)) trans.sa_session.add(job_to_remap) trans.sa_session.add(jtid) + for hdca in input_hdcas: + hdca.collection.replace_failed_elements(remapped_hdas) + if hdca.implicit_collection_jobs: + for job in hdca.implicit_collection_jobs.jobs: + if job.job_id == old_job.id: + job.job_id = current_job.id jtod.dataset.visible = False trans.sa_session.add(jtod) except Exception: diff --git a/test/api/test_workflows.py b/test/api/test_workflows.py index 5f4700714927..edf397cedf2f 100644 --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -723,6 +723,54 @@ def test_workflow_resume_from_failed_step_with_hdca_input(self): assert_ok=False) assert unpaused_dataset['state'] == 'ok' + @skip_without_tool("fail_identifier") + @skip_without_tool("identifier_multiple_in_conditional") + def test_workflow_resume_with_mapped_over_input(self): + with self.dataset_populator.test_history() as history_id: + job_summary = self._run_jobs(""" +class: GalaxyWorkflow +steps: + - label: input_datasets + type: input_collection + - label: fail_identifier_1 + tool_id: fail_identifier + state: + input1: + $link: input_datasets + failbool: true + - tool_id: identifier_collection + state: + input1: + $link: fail_identifier_1#out_file1 +test_data: + input_datasets: + type: list + elements: + - identifier: fail + value: 1.fastq + type: File + - identifier: success + value: 1.fastq + type: File +""", history_id=history_id, assert_ok=False, wait=False) + self.wait_for_invocation_and_jobs(history_id, job_summary.workflow_id, job_summary.invocation_id, assert_ok=False) + history_contents = self.dataset_populator._get_contents_request(history_id=history_id).json() + paused_dataset = history_contents[-1] + failed_dataset = self.dataset_populator.get_history_dataset_details(history_id, hid=5, assert_ok=False) + assert paused_dataset['state'] == 'paused', paused_dataset + assert failed_dataset['state'] == 'error', failed_dataset + inputs = {"input1": {'values': [{'src': 'hda', + 'id': history_contents[0]['id']}] + }, + "failbool": "false", + "rerun_remap_job_id": failed_dataset['creating_job']} + self.dataset_populator.run_tool(tool_id='fail_identifier', + inputs=inputs, + history_id=history_id, + assert_ok=True) + unpaused_dataset = self.dataset_populator.get_history_dataset_details(history_id, wait=True, assert_ok=False) + assert unpaused_dataset['state'] == 'ok' + @skip_without_tool("collection_creates_pair") def test_workflow_run_output_collection_mapping(self): workflow_id = self._upload_yaml_workflow(""" diff --git a/test/base/populators.py b/test/base/populators.py index c046a78bf2ae..c8388d06bcc2 100644 --- a/test/base/populators.py +++ b/test/base/populators.py @@ -275,19 +275,19 @@ def get_history_dataset_content(self, history_id, wait=True, filename=None, **kw data = {} if filename: data["filename"] = filename - display_response = self.__get_contents_request(history_id, "/%s/display" % dataset_id, data=data) + display_response = self._get_contents_request(history_id, "/%s/display" % dataset_id, data=data) assert display_response.status_code == 200, display_response.content return display_response.content def get_history_dataset_details(self, history_id, **kwds): dataset_id = self.__history_content_id(history_id, **kwds) - details_response = self.__get_contents_request(history_id, "/datasets/%s" % dataset_id) + details_response = self._get_contents_request(history_id, "/datasets/%s" % dataset_id) assert details_response.status_code == 200 return details_response.json() def get_history_collection_details(self, history_id, **kwds): hdca_id = self.__history_content_id(history_id, **kwds) - details_response = self.__get_contents_request(history_id, "/dataset_collections/%s" % hdca_id) + details_response = self._get_contents_request(history_id, "/dataset_collections/%s" % hdca_id) assert details_response.status_code == 200, details_response.content return details_response.json() @@ -320,7 +320,7 @@ def __history_content_id(self, history_id, wait=True, **kwds): history_content_id = kwds["dataset"]["id"] else: hid = kwds.get("hid", None) # If not hid, just grab last dataset - history_contents = self.__get_contents_request(history_id).json() + history_contents = self._get_contents_request(history_id).json() if hid: history_content_id = None for history_item in history_contents: @@ -333,7 +333,7 @@ def __history_content_id(self, history_id, wait=True, **kwds): history_content_id = history_contents[-1]["id"] return history_content_id - def __get_contents_request(self, history_id, suffix="", data={}): + def _get_contents_request(self, history_id, suffix="", data={}): url = "histories/%s/contents" % history_id if suffix: url = "%s%s" % (url, suffix) diff --git a/test/functional/tools/fail_identifier.xml b/test/functional/tools/fail_identifier.xml new file mode 100644 index 000000000000..c9f4aac2aa44 --- /dev/null +++ b/test/functional/tools/fail_identifier.xml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + diff --git a/test/functional/tools/samples_tool_conf.xml b/test/functional/tools/samples_tool_conf.xml index 5bd62a7cbb8b..a92448627e5c 100644 --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -96,6 +96,7 @@ +