-
Notifications
You must be signed in to change notification settings - Fork 62
Description
I am attempting to build workflows which compose shell command tasks together, some of which providing default output names via output_file_template. However, these trigger an exception at runtime if they are not explicitly set when running the workflow, which defeats their purpose in my opinion.
The following minimal example defines a toy backup function wrapping the cp command and using a templated output for the target file. The code works fine on the task alone, but fails when wrapped with a worfklow with:
future: <Task finished name='Task-2' coro=<ConcurrentFuturesWorker.exec_as_coro() done, defined at .../pydra/engine/workers.py:169> exception=Exception("target_file has to be str or bool, but LF('backup', 'target_file') set")>
I would expect both runs to be successful and yield the same result modulo the temporary directory prefix.
import os
import pathlib
import tempfile
import attrs
import pydra
class BackupFile(pydra.engine.ShellCommandTask):
"""Example task performing a file backup with cp.
If not target file is provided, it uses a default file suffixed by copy.
"""
@attrs.define(kw_only=True)
class InputSpec(pydra.specs.ShellSpec):
source_file: os.PathLike = attrs.field(
metadata={"help_string": "source file", "mandatory": True, "argstr": ""}
)
target_file: str = attrs.field(
metadata={
"help_string": "target file",
"argstr": "",
"output_file_template": "{source_file}_copy",
}
)
input_spec = pydra.specs.SpecInfo(name="Input", bases=(InputSpec,))
executable = "cp"
def backup(name="backup", **kwargs) -> pydra.Workflow:
wf = pydra.Workflow(input_spec=["source_file", "target_file"], name=name, **kwargs)
wf.add(
BackupFile(
name="backup_file",
source_file=wf.lzin.source_file,
target_file=wf.lzin.target_file,
)
)
wf.set_output({"target_file": wf.backup_file.lzout.target_file})
return wf
if __name__ == "__main__":
# Execute with standalone backup task.
with tempfile.TemporaryDirectory() as td:
source_file = (pathlib.Path(td) / "source1.file")
source_file.touch()
task = BackupFile(source_file=source_file)
result = task()
print(result.output.target_file)
# Execute backup task wrapped in a workflow.
with tempfile.TemporaryDirectory() as td:
source_file = (pathlib.Path(td) / "source2.file")
source_file.touch()
task = backup(source_file=source_file)
result = task()
print(result.output.target_file)