Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TST: Decrease divergence between Task/Workflow _run_task #518

Merged
merged 4 commits into from
Apr 1, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 64 additions & 52 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,42 +452,56 @@ def __call__(
res = self._run(rerun=rerun, **kwargs)
return res

def prepare_run_task(self, rerun):
"""
Invoked immediately after the lockfile is generated, this function:
- does a lot of things... (TODO)
- Creates an empty Result and passes it along to be populated.

Created as an attempt to simplify overlapping `Task`|`Workflow` behaviors.
"""
# retrieve cached results
if not (rerun or self.task_rerun):
result = self.result()
# adding info file with the checksum in case the task was cancelled
# and the lockfile has to be removed
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
json.dump({"checksum": self.checksum}, jsonfile)
if not self.can_resume and self.output_dir.exists():
shutil.rmtree(self.output_dir)
self.output_dir.mkdir(parents=False, exist_ok=self.can_resume)
if not is_workflow(self):
self._orig_inputs = attr.asdict(self.inputs, recurse=False)
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
modified_inputs = template_update(
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
)
if modified_inputs:
self.inputs = attr.evolve(self.inputs, **modified_inputs)
self.audit.start_audit(odir=self.output_dir)
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
return result

def _run(self, rerun=False, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()
checksum = self.checksum
lockfile = self.cache_dir / (checksum + ".lock")

lockfile = self.cache_dir / (self.checksum + ".lock")
# Eagerly retrieve cached - see scenarios in __init__()
self.hooks.pre_run(self)
with SoftFileLock(lockfile):
if not (rerun or self.task_rerun):
result = self.result()
if result is not None and not result.errored:
return result
# adding info file with the checksum in case the task was cancelled
# and the lockfile has to be removed
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
json.dump({"checksum": self.checksum}, jsonfile)
# Let only one equivalent process run
odir = self.output_dir
if not self.can_resume and odir.exists():
shutil.rmtree(odir)
cwd = os.getcwd()
odir.mkdir(parents=False, exist_ok=True if self.can_resume else False)
orig_inputs = attr.asdict(self.inputs, recurse=False)
map_copyfiles = copyfile_input(self.inputs, self.output_dir)
modified_inputs = template_update(
self.inputs, self.output_dir, map_copyfiles=map_copyfiles
)
if modified_inputs:
self.inputs = attr.evolve(self.inputs, **modified_inputs)
self.audit.start_audit(odir)
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
result = self.prepare_run_task(rerun)
orig_outdir = self.output_dir
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.output_dir is changing after the try/except block...

# before
/private/var/folders/r6/mcdps6rs37x2ttd1fn032yy40000gp/T/pytest-of-mathiasg/pytest-11/test_shell_cmd_inputspec_copyf0/ShellCommandTask_076520a92438002271a499fd2d1856333600d914bccc24b73505b69c61b77afb
# after
/private/var/folders/r6/mcdps6rs37x2ttd1fn032yy40000gp/T/pytest-of-mathiasg/pytest-11/test_shell_cmd_inputspec_copyf0/ShellCommandTask_f1303b44e112742de4622e902e48725b39bef257efe55f3195af27378ab02b8b

try:
self.audit.monitor()
self._run_task()
result.output = self._collect_outputs(output_dir=odir)
result.output = self._collect_outputs(output_dir=orig_outdir)
except Exception:
etype, eval, etr = sys.exc_info()
traceback = format_exception(etype, eval, etr)
Expand All @@ -497,15 +511,17 @@ def _run(self, rerun=False, **kwargs):
finally:
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(odir, result=result, task=self)
save(orig_outdir, result=result, task=self)
self.output_ = None
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
# # function etc. shouldn't change anyway, so removing
orig_inputs = dict(
(k, v) for (k, v) in orig_inputs.items() if not k.startswith("_")
)
self.inputs = attr.evolve(self.inputs, **orig_inputs)
self._orig_inputs = {
k: v for k, v in self._orig_inputs.items() if not k.startswith("_")
}
self.inputs = attr.evolve(self.inputs, **self._orig_inputs)
# no need to propagate this
del self._orig_inputs
os.chdir(cwd)
self.hooks.post_run(self, result)
return result
Expand Down Expand Up @@ -1038,38 +1054,18 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
raise ValueError(
"Workflow output cannot be None, use set_output to define output(s)"
)
checksum = self.checksum
# creating connections that were defined after adding tasks to the wf
for task in self.graph.nodes:
# if workflow has task_rerun=True and propagate_rerun=True,
# it should be passed to the tasks
if self.task_rerun and self.propagate_rerun:
task.task_rerun = self.task_rerun
# if the task is a wf, than the propagate_rerun should be also set
if is_workflow(task):
task.propagate_rerun = self.propagate_rerun
task.cache_locations = task._cache_locations + self.cache_locations
self.create_connections(task)
lockfile = self.cache_dir / (checksum + ".lock")
self.connect_and_propagate_to_tasks()
lockfile = self.cache_dir / (self.checksum + ".lock")
self.hooks.pre_run(self)
async with PydraFileLock(lockfile):
# retrieve cached results
if not (rerun or self.task_rerun):
result = self.result()
if result is not None and not result.errored:
return result
# adding info file with the checksum in case the task was cancelled
# and the lockfile has to be removed
with open(self.cache_dir / f"{self.uid}_info.json", "w") as jsonfile:
json.dump({"checksum": checksum}, jsonfile)
odir = self.output_dir
if not self.can_resume and odir.exists():
shutil.rmtree(odir)
cwd = os.getcwd()
odir.mkdir(parents=False, exist_ok=True if self.can_resume else False)
self.audit.start_audit(odir=odir)
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
result = self.prepare_run_task(rerun)
orig_outdir = self.output_dir
try:
self.audit.monitor()
await self._run_task(submitter, rerun=rerun)
Expand All @@ -1084,7 +1080,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs):
finally:
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result=result)
save(odir, result=result, task=self)
save(orig_outdir, result=result, task=self)
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
os.chdir(cwd)
Expand Down Expand Up @@ -1226,6 +1222,22 @@ def create_dotfile(self, type="simple", export=None, name=None):
formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext))
return dotfile, formatted_dot

def connect_and_propagate_to_tasks(self):
"""
Visit each node in the graph and create the connections.
Additionally checks if all tasks should be rerun.
"""
for task in self.graph.nodes:
# if workflow has task_rerun=True and propagate_rerun=True,
# it should be passed to the tasks
if self.task_rerun and self.propagate_rerun:
task.task_rerun = self.task_rerun
# if the task is a wf, than the propagate_rerun should be also set
if is_workflow(task):
task.propagate_rerun = self.propagate_rerun
task.cache_locations = task._cache_locations + self.cache_locations
self.create_connections(task)


def is_task(obj):
"""Check whether an object looks like a task."""
Expand Down