From b1a8f4cb38c40a1eb17af55f8f8c03c699e01b5f Mon Sep 17 00:00:00 2001 From: mathiasg Date: Thu, 31 Mar 2022 16:30:57 -0400 Subject: [PATCH 1/4] TST: Decrease divergence between Task/Workflow _run_task --- pydra/engine/core.py | 122 ++++++++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 59 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 47d9baeaf3..313af8741c 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -452,42 +452,53 @@ def __call__( res = self._run(rerun=rerun, **kwargs) return res + def prepare_run_task(self, rerun): + """ + Invoked immediately after the lockfile is generated, this function: + - does a lot of things... (TODO) + - Creates an empty Result and passes it along to be populated. + + Created as an attempt to simplify overlapping `Task`|`Workflow` behaviors. + """ + # retrieve cached results + if not (rerun or self.task_rerun): + result = self.result() + if result is not None and not result.errored: + return result + # adding info file with the checksum in case the task was cancelled + # and the lockfile has to be removed + with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: + json.dump({"checksum": self.checksum}, jsonfile) + if not self.can_resume and self.output_dir.exists(): + shutil.rmtree(self.output_dir) + self.output_dir.mkdir(parents=False, exist_ok=self.can_resume) + if not is_workflow(self): + self._orig_inputs = attr.asdict(self.inputs, recurse=False) + map_copyfiles = copyfile_input(self.inputs, self.output_dir) + modified_inputs = template_update( + self.inputs, self.output_dir, map_copyfiles=map_copyfiles + ) + if modified_inputs: + self.inputs = attr.evolve(self.inputs, **modified_inputs) + self.audit.start_audit(odir=self.output_dir) + result = Result(output=None, runtime=None, errored=False) + self.hooks.pre_run_task(self) + return result + def _run(self, rerun=False, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() - checksum = self.checksum - lockfile = self.cache_dir / (checksum + ".lock") + + lockfile = self.cache_dir / (self.checksum + ".lock") # Eagerly retrieve cached - see scenarios in __init__() self.hooks.pre_run(self) with SoftFileLock(lockfile): - if not (rerun or self.task_rerun): - result = self.result() - if result is not None and not result.errored: - return result - # adding info file with the checksum in case the task was cancelled - # and the lockfile has to be removed - with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: - json.dump({"checksum": self.checksum}, jsonfile) - # Let only one equivalent process run - odir = self.output_dir - if not self.can_resume and odir.exists(): - shutil.rmtree(odir) cwd = os.getcwd() - odir.mkdir(parents=False, exist_ok=True if self.can_resume else False) - orig_inputs = attr.asdict(self.inputs, recurse=False) - map_copyfiles = copyfile_input(self.inputs, self.output_dir) - modified_inputs = template_update( - self.inputs, self.output_dir, map_copyfiles=map_copyfiles - ) - if modified_inputs: - self.inputs = attr.evolve(self.inputs, **modified_inputs) - self.audit.start_audit(odir) - result = Result(output=None, runtime=None, errored=False) - self.hooks.pre_run_task(self) + result = self.prepare_run_task(rerun) try: self.audit.monitor() self._run_task() - result.output = self._collect_outputs(output_dir=odir) + result.output = self._collect_outputs(output_dir=self.output_dir) except Exception: etype, eval, etr = sys.exc_info() traceback = format_exception(etype, eval, etr) @@ -497,15 +508,17 @@ def _run(self, rerun=False, **kwargs): finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result) - save(odir, result=result, task=self) + save(self.output_dir, result=result, task=self) self.output_ = None # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() # # function etc. shouldn't change anyway, so removing - orig_inputs = dict( - (k, v) for (k, v) in orig_inputs.items() if not k.startswith("_") + self._orig_inputs = dict( + (k, v) for (k, v) in self._orig_inputs.items() if not k.startswith("_") ) - self.inputs = attr.evolve(self.inputs, **orig_inputs) + self.inputs = attr.evolve(self.inputs, **self._orig_inputs) + # no need to propagate this + del self._orig_inputs os.chdir(cwd) self.hooks.post_run(self, result) return result @@ -1038,38 +1051,13 @@ async def _run(self, submitter=None, rerun=False, **kwargs): raise ValueError( "Workflow output cannot be None, use set_output to define output(s)" ) - checksum = self.checksum # creating connections that were defined after adding tasks to the wf - for task in self.graph.nodes: - # if workflow has task_rerun=True and propagate_rerun=True, - # it should be passed to the tasks - if self.task_rerun and self.propagate_rerun: - task.task_rerun = self.task_rerun - # if the task is a wf, than the propagate_rerun should be also set - if is_workflow(task): - task.propagate_rerun = self.propagate_rerun - task.cache_locations = task._cache_locations + self.cache_locations - self.create_connections(task) - lockfile = self.cache_dir / (checksum + ".lock") + self.connect_and_propagate_to_tasks() + lockfile = self.cache_dir / (self.checksum + ".lock") self.hooks.pre_run(self) async with PydraFileLock(lockfile): - # retrieve cached results - if not (rerun or self.task_rerun): - result = self.result() - if result is not None and not result.errored: - return result - # adding info file with the checksum in case the task was cancelled - # and the lockfile has to be removed - with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: - json.dump({"checksum": checksum}, jsonfile) - odir = self.output_dir - if not self.can_resume and odir.exists(): - shutil.rmtree(odir) cwd = os.getcwd() - odir.mkdir(parents=False, exist_ok=True if self.can_resume else False) - self.audit.start_audit(odir=odir) - result = Result(output=None, runtime=None, errored=False) - self.hooks.pre_run_task(self) + result = self.prepare_run_task(rerun) try: self.audit.monitor() await self._run_task(submitter, rerun=rerun) @@ -1084,7 +1072,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs): finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result=result) - save(odir, result=result, task=self) + save(self.output_dir, result=result, task=self) # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() os.chdir(cwd) @@ -1226,6 +1214,22 @@ def create_dotfile(self, type="simple", export=None, name=None): formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext)) return dotfile, formatted_dot + def connect_and_propagate_to_tasks(self): + """ + Visit each node in the graph and create the connections. + Additionally checks if all tasks should be rerun. + """ + for task in self.graph.nodes: + # if workflow has task_rerun=True and propagate_rerun=True, + # it should be passed to the tasks + if self.task_rerun and self.propagate_rerun: + task.task_rerun = self.task_rerun + # if the task is a wf, than the propagate_rerun should be also set + if is_workflow(task): + task.propagate_rerun = self.propagate_rerun + task.cache_locations = task._cache_locations + self.cache_locations + self.create_connections(task) + def is_task(obj): """Check whether an object looks like a task.""" From a456a6b06de5a4364b36ca58225eb5786c11db45 Mon Sep 17 00:00:00 2001 From: mathiasg Date: Fri, 1 Apr 2022 09:05:14 -0400 Subject: [PATCH 2/4] FIX: Ensure same logic --- pydra/engine/core.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 313af8741c..0d95a9a31b 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -463,8 +463,6 @@ def prepare_run_task(self, rerun): # retrieve cached results if not (rerun or self.task_rerun): result = self.result() - if result is not None and not result.errored: - return result # adding info file with the checksum in case the task was cancelled # and the lockfile has to be removed with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: @@ -493,12 +491,17 @@ def _run(self, rerun=False, **kwargs): # Eagerly retrieve cached - see scenarios in __init__() self.hooks.pre_run(self) with SoftFileLock(lockfile): + if not (rerun or self.task_rerun): + result = self.result() + if result is not None and not result.errored: + return result cwd = os.getcwd() result = self.prepare_run_task(rerun) + orig_outdir = self.output_dir try: self.audit.monitor() self._run_task() - result.output = self._collect_outputs(output_dir=self.output_dir) + result.output = self._collect_outputs(output_dir=orig_outdir) except Exception: etype, eval, etr = sys.exc_info() traceback = format_exception(etype, eval, etr) @@ -508,14 +511,14 @@ def _run(self, rerun=False, **kwargs): finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result) - save(self.output_dir, result=result, task=self) + save(orig_outdir, result=result, task=self) self.output_ = None # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() # # function etc. shouldn't change anyway, so removing - self._orig_inputs = dict( - (k, v) for (k, v) in self._orig_inputs.items() if not k.startswith("_") - ) + self._orig_inputs = { + k: v for k, v in self._orig_inputs.items() if not k.startswith("_") + } self.inputs = attr.evolve(self.inputs, **self._orig_inputs) # no need to propagate this del self._orig_inputs @@ -1056,8 +1059,13 @@ async def _run(self, submitter=None, rerun=False, **kwargs): lockfile = self.cache_dir / (self.checksum + ".lock") self.hooks.pre_run(self) async with PydraFileLock(lockfile): + if not (rerun or self.task_rerun): + result = self.result() + if result is not None and not result.errored: + return result cwd = os.getcwd() result = self.prepare_run_task(rerun) + orig_outdir = self.output_dir try: self.audit.monitor() await self._run_task(submitter, rerun=rerun) @@ -1072,7 +1080,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs): finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result=result) - save(self.output_dir, result=result, task=self) + save(orig_outdir, result=result, task=self) # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() os.chdir(cwd) From 36ad2df1c7d111248c3d13445aa7b38cc7e66a3e Mon Sep 17 00:00:00 2001 From: mathiasg Date: Fri, 1 Apr 2022 11:18:20 -0400 Subject: [PATCH 3/4] FIX: Group logical parts together --- pydra/engine/core.py | 58 +++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 0d95a9a31b..e6000a056f 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -452,17 +452,26 @@ def __call__( res = self._run(rerun=rerun, **kwargs) return res - def prepare_run_task(self, rerun): + def _modify_inputs(self): + """Update and preserve a Task's original inputs""" + orig_inputs = attr.asdict(self.inputs, recurse=False) + map_copyfiles = copyfile_input(self.inputs, self.output_dir) + modified_inputs = template_update( + self.inputs, self.output_dir, map_copyfiles=map_copyfiles + ) + if modified_inputs: + self.inputs = attr.evolve(self.inputs, **modified_inputs) + return orig_inputs + + def _populate_filesystem(self): """ Invoked immediately after the lockfile is generated, this function: - - does a lot of things... (TODO) - - Creates an empty Result and passes it along to be populated. + - Creates the cache file + - Clears existing outputs if `can_resume` is False + - Generates a fresh output directory Created as an attempt to simplify overlapping `Task`|`Workflow` behaviors. """ - # retrieve cached results - if not (rerun or self.task_rerun): - result = self.result() # adding info file with the checksum in case the task was cancelled # and the lockfile has to be removed with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: @@ -470,18 +479,6 @@ def prepare_run_task(self, rerun): if not self.can_resume and self.output_dir.exists(): shutil.rmtree(self.output_dir) self.output_dir.mkdir(parents=False, exist_ok=self.can_resume) - if not is_workflow(self): - self._orig_inputs = attr.asdict(self.inputs, recurse=False) - map_copyfiles = copyfile_input(self.inputs, self.output_dir) - modified_inputs = template_update( - self.inputs, self.output_dir, map_copyfiles=map_copyfiles - ) - if modified_inputs: - self.inputs = attr.evolve(self.inputs, **modified_inputs) - self.audit.start_audit(odir=self.output_dir) - result = Result(output=None, runtime=None, errored=False) - self.hooks.pre_run_task(self) - return result def _run(self, rerun=False, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) @@ -496,8 +493,13 @@ def _run(self, rerun=False, **kwargs): if result is not None and not result.errored: return result cwd = os.getcwd() - result = self.prepare_run_task(rerun) + self._populate_filesystem() + orig_inputs = self._modify_inputs() + # the output dir can be changed by _run_task (but should it??) orig_outdir = self.output_dir + result = Result(output=None, runtime=None, errored=False) + self.hooks.pre_run_task(self) + self.audit.start_audit(odir=self.output_dir) try: self.audit.monitor() self._run_task() @@ -516,12 +518,10 @@ def _run(self, rerun=False, **kwargs): # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() # # function etc. shouldn't change anyway, so removing - self._orig_inputs = { - k: v for k, v in self._orig_inputs.items() if not k.startswith("_") + orig_inputs = { + k: v for k, v in orig_inputs.items() if not k.startswith("_") } - self.inputs = attr.evolve(self.inputs, **self._orig_inputs) - # no need to propagate this - del self._orig_inputs + self.inputs = attr.evolve(self.inputs, **orig_inputs) os.chdir(cwd) self.hooks.post_run(self, result) return result @@ -1055,7 +1055,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs): "Workflow output cannot be None, use set_output to define output(s)" ) # creating connections that were defined after adding tasks to the wf - self.connect_and_propagate_to_tasks() + self._connect_and_propagate_to_tasks() lockfile = self.cache_dir / (self.checksum + ".lock") self.hooks.pre_run(self) async with PydraFileLock(lockfile): @@ -1064,8 +1064,12 @@ async def _run(self, submitter=None, rerun=False, **kwargs): if result is not None and not result.errored: return result cwd = os.getcwd() - result = self.prepare_run_task(rerun) + self._populate_filesystem() + # the output dir can be changed by _run_task (but should it??) orig_outdir = self.output_dir + result = Result(output=None, runtime=None, errored=False) + self.hooks.pre_run_task(self) + self.audit.start_audit(odir=self.output_dir) try: self.audit.monitor() await self._run_task(submitter, rerun=rerun) @@ -1222,7 +1226,7 @@ def create_dotfile(self, type="simple", export=None, name=None): formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext)) return dotfile, formatted_dot - def connect_and_propagate_to_tasks(self): + def _connect_and_propagate_to_tasks(self): """ Visit each node in the graph and create the connections. Additionally checks if all tasks should be rerun. From 5cbca66a1c75c19b42a2e548ec69543a62e5952f Mon Sep 17 00:00:00 2001 From: mathiasg Date: Fri, 1 Apr 2022 12:33:34 -0400 Subject: [PATCH 4/4] FIX: Ensure output_dir is not recalculated --- pydra/engine/core.py | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/pydra/engine/core.py b/pydra/engine/core.py index e6000a056f..4a7d21c7cf 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -463,7 +463,7 @@ def _modify_inputs(self): self.inputs = attr.evolve(self.inputs, **modified_inputs) return orig_inputs - def _populate_filesystem(self): + def _populate_filesystem(self, checksum, output_dir): """ Invoked immediately after the lockfile is generated, this function: - Creates the cache file @@ -475,16 +475,18 @@ def _populate_filesystem(self): # adding info file with the checksum in case the task was cancelled # and the lockfile has to be removed with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile: - json.dump({"checksum": self.checksum}, jsonfile) - if not self.can_resume and self.output_dir.exists(): - shutil.rmtree(self.output_dir) - self.output_dir.mkdir(parents=False, exist_ok=self.can_resume) + json.dump({"checksum": checksum}, jsonfile) + if not self.can_resume and output_dir.exists(): + shutil.rmtree(output_dir) + output_dir.mkdir(parents=False, exist_ok=self.can_resume) def _run(self, rerun=False, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() - lockfile = self.cache_dir / (self.checksum + ".lock") + checksum = self.checksum + output_dir = self.output_dir + lockfile = self.cache_dir / (checksum + ".lock") # Eagerly retrieve cached - see scenarios in __init__() self.hooks.pre_run(self) with SoftFileLock(lockfile): @@ -493,27 +495,25 @@ def _run(self, rerun=False, **kwargs): if result is not None and not result.errored: return result cwd = os.getcwd() - self._populate_filesystem() + self._populate_filesystem(checksum, output_dir) orig_inputs = self._modify_inputs() - # the output dir can be changed by _run_task (but should it??) - orig_outdir = self.output_dir result = Result(output=None, runtime=None, errored=False) self.hooks.pre_run_task(self) - self.audit.start_audit(odir=self.output_dir) + self.audit.start_audit(odir=output_dir) try: self.audit.monitor() self._run_task() - result.output = self._collect_outputs(output_dir=orig_outdir) + result.output = self._collect_outputs(output_dir=output_dir) except Exception: etype, eval, etr = sys.exc_info() traceback = format_exception(etype, eval, etr) - record_error(self.output_dir, error=traceback) + record_error(output_dir, error=traceback) result.errored = True raise finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result) - save(orig_outdir, result=result, task=self) + save(output_dir, result=result, task=self) self.output_ = None # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() @@ -1056,7 +1056,10 @@ async def _run(self, submitter=None, rerun=False, **kwargs): ) # creating connections that were defined after adding tasks to the wf self._connect_and_propagate_to_tasks() - lockfile = self.cache_dir / (self.checksum + ".lock") + + checksum = self.checksum + output_dir = self.output_dir + lockfile = self.cache_dir / (checksum + ".lock") self.hooks.pre_run(self) async with PydraFileLock(lockfile): if not (rerun or self.task_rerun): @@ -1064,12 +1067,10 @@ async def _run(self, submitter=None, rerun=False, **kwargs): if result is not None and not result.errored: return result cwd = os.getcwd() - self._populate_filesystem() - # the output dir can be changed by _run_task (but should it??) - orig_outdir = self.output_dir + self._populate_filesystem(checksum, output_dir) result = Result(output=None, runtime=None, errored=False) self.hooks.pre_run_task(self) - self.audit.start_audit(odir=self.output_dir) + self.audit.start_audit(odir=output_dir) try: self.audit.monitor() await self._run_task(submitter, rerun=rerun) @@ -1077,14 +1078,14 @@ async def _run(self, submitter=None, rerun=False, **kwargs): except Exception: etype, eval, etr = sys.exc_info() traceback = format_exception(etype, eval, etr) - record_error(self.output_dir, error=traceback) + record_error(output_dir, error=traceback) result.errored = True self._errored = True raise finally: self.hooks.post_run_task(self, result) self.audit.finalize_audit(result=result) - save(orig_outdir, result=result, task=self) + save(output_dir, result=result, task=self) # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() os.chdir(cwd)