You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Syntax changes to enable static type-checking of workflows #670
Given that inputs are passed to the initialisation of the task in the canonical workflow construction pattern, in order for static type-checkers to follow what is happening, the task inputs would need to be attributes of the object added to the workflow. Therefore, I have been playing around with the idea of merging the input and output specs into a single class, along with the @shell.task("mycmd")
class MyShellTask(SpecBase["MyShellTask.Out"]):
in_file: Nifti = shell.arg(argstr="--in", position=0)
a_flag: bool = shell.arg("--aflag", ...)
class Out:
out_file: NiftiGz = shell.out(argstr="--out", file_template="{in_file}_out.nii.gz")
scalar: float = shell.out(callable=...) in this case Function tasks could still be created conveniently using the existing def myfunc(in_file: Nifti, y: float) -> float:
return x / y
class MyFuncTask(SpecBase["MyFuncTask.Out"]):
x: int = func.arg(allowed_values=[1, 2, 3])
y: float = func.arg()
class Out:
z: float = func.out() NB: Tasks defined in this form are then be added to a workflow using the wf = Workflow(name="my workflow", input_spec=["a_file"])
my_shell = wf.add(
my_func = wf.add(
) The reason you would do it like this is so you can set the signature of the def add(self, spec: T[U]) -> TaskBase[T, U]:
... where class TaskBase(Generic[T, U]):
def inputs(self) -> T:
def lzout(self) -> U:
... Note that in this case typing |
Here is a prototype with mock classes that works with import typing as ty
from pathlib import Path
import inspect
from typing import Any
from copy import copy
from typing_extensions import dataclass_transform
import attrs
from fileformats.generic import File, Directory
from pydra.engine.task import TaskBase, FunctionTask, ShellCommandTask
from pydra.engine.core import Result
OutSpec = ty.TypeVar("OutSpec")
class SpecBase(ty.Generic[OutSpec]):
__pydra_task_class__: ty.Type[TaskBase]
__pydra_cmd__: ty.Union[str, ty.Callable]
def __call__(self, name: ty.Optional[str] = None, **kwargs) -> TaskBase:
return self.__pydra_task_class__(self.__pydra_cmd__, name=name, inputs=self, **kwargs)
InSpec = ty.TypeVar("InSpec", bound=SpecBase)
class LazyOut:
name: str
spec: type
def __getattr___(self, field_name):
field = self._fields[field_name]
except KeyError as e:
raise AttributeError(
f"Lazy output interface of {self.name} task does not include "
) from e
return LazyOutField[field.type](self.name, field_name, field.type)
def _fields(self):
return attrs.fields(self.spec)
class LazyOutField:
task_name: str
field_name: str
type: type
class LazyIn:
name: str
spec: type
def __getattr___(self, field_name):
field = self._fields[field_name]
except KeyError as e:
raise AttributeError(
f"Lazy output interface of {self.name} task does not include "
) from e
return LazyInField[field.type](self.name, field_name, field.type)
def _fields(self):
return attrs.fields(self.spec)
class LazyInField:
task_name: str
field_name: str
type: type
class Task(ty.Generic[InSpec, OutSpec]):
inputs: InSpec = attrs.field()
name: str = attrs.field()
def name_default(self):
return type(self).__name__.lower()
def lzout(self) -> OutSpec:
return ty.cast(OutSpec, LazyOut(self.name, self.inputs.__pydra_output_spec__))
class Workflow:
name: str
tasks: ty.List[Task] = attrs.field(factory=list)
connections: ty.List[ty.Tuple[str, LazyOutField]] = attrs.field(factory=list)
input_spec: ty.Dict[str, type] = attrs.field(factory=dict)
def add(
self, spec: SpecBase[OutSpec], name: ty.Optional[str] = None
) -> Task[SpecBase[OutSpec], OutSpec]:
task = Task[SpecBase[OutSpec], OutSpec](
spec, **({"name": name} if name else {})
return task
def set_output(self, connection):
def lzin(self):
return LazyIn(self.name, self.input_spec)
def shell_arg(
return attrs.field(
metadata={"argstr": argstr, "position": position, "help_string": help},
def shell_out(
return attrs.field(
"argstr": argstr,
"position": position,
"output_file_template": filename_template,
"help_string": help,
"callable": callable,
@dataclass_transform(kw_only_default=True, field_specifiers=(shell_arg,))
def shell_task(executable: str):
def decorator(klass):
klass.__pydra_cmd__ = executable
klass.__pydra_task_class__ = ShellCommandTask
klass.__annotations__["__pydra_executable__"] = str
return attrs.define(kw_only=True, auto_attrib=True, slots=False)(klass)
return decorator
@dataclass_transform(kw_only_default=True, field_specifiers=(shell_out,))
def shell_outputs(klass):
return attrs.define(kw_only=True, auto_attrib=True, slots=False)(klass)
def func_arg(
return attrs.field(
metadata={"factory": factory, "help_string": help},
@dataclass_transform(kw_only_default=True, field_specifiers=(func_arg,))
def func_task(function: ty.Callable):
def decorator(klass):
klass.__pydra_cmd__ = staticmethod(function)
klass.__pydra_task_class__ = FunctionTask
nested_out_specs = [
n for n, s in klass.__dict__.items() if hasattr(s, "__pydra_spec_class__")
if not nested_out_specs:
raise AttributeError(f"No nested output specs found in {klass}")
elif len(nested_out_specs) > 1:
raise AttributeError(
f"More than one output specs found in {klass}: {nested_out_specs}"
klass.__pydra_output_spec__ = nested_out_specs[0]
klass.__annotations__["__pydra_cmd__"] = ty.Callable
return attrs.define(kw_only=True, auto_attrib=False, slots=False)(klass)
return decorator
def func_out(
help: ty.Optional[str] = None,
return attrs.field(
"help_string": help,
@dataclass_transform(kw_only_default=True, field_specifiers=(func_out,))
def func_outputs(klass):
return attrs.define(kw_only=True, auto_attrib=True, slots=False)(klass) |
You would then define shell specs as psuedo "dataclasses" @shell_task("mycmd")
class MyShellCmd(SpecBase["MyShellCmd.Out"]):
in_file: File = shell_arg(argstr="", position=0)
an_option: str = shell_arg(
help="an option to flag something",
converter=str, # Not necessary, just showing off a potential benefit of defining fields as full attrs.fields
out_file: ty.Optional[ty.Union[Path, str]] = None # Not necessary as would be automatically inserted from output spec, only included to allow static full type-checking (but output_file_template fields are not typically used in workflows in any case)
class Out:
out_file: File = shell_out(
out_str: str or alternatively @shell_outputs
class MyShellOut:
out_file: File = shell_out(
out_str: str
class MyShellCmd(Interface[MyShellOut]):
executable = "cmd"
in_file: File
an_option: str = shell_arg(argstr="--opt")
For function specs you could either use the longer form that enables static type-checking, help-strings and attrs converters/validators, i.e. def func(in_int: int, in_str: str) -> ty.Tuple[int, str]:
return in_int, in_str
class MyFunc(SpecBase["MyFunc.Out"]):
in_int: int = func_arg(help="a dummy input int", validator=attrs.validators.gt(0))
in_str: str = func_arg(help="a dummy input str", converter=str) # converter isn't necessary here, it just means that all inputs will be attempted to be converted
class Out:
out_int: int
out_str: str
or stick with the more concise for simple self-explanatory function tasks @pydra.mark.task(returns=["out_int", "out_str"])
def func(in_int: int, in_str: str) -> ty.Tuple[int, str]:
return in_int, in_str |
THIS IS OUTDATED, SEE SLIGHTLY UPDATED SYNTAX BELOW, #670 (comment) which would be used in a workflow like wf = Workflow(name="myworkflow", input_spec={"in_file": File})
mytask = wf.add(MyFunc(in_int=1, in_str="hi"))
mytask2 = wf.add(
in_int=mytask.lzout.out_int, # should be ok
in_str=mytask.lzout.out_int, # should show up as a mypy error because `in_str` expects a str not an int
name="mytask2", # Tasks can be optionally given a name to differentiate multiple tasks from the same spec class (otherwise defaults to name of spec class)
mytask3 = wf.add(
wf.set_output(("out_str", mytask2.lzout.out_str)) Note that |
The other main syntax difference to the end user (i.e. non interface designers) is that the function/shell wrappers are separated from the task classes, so they don't return tasks to be run, i.e. spec = MyFunc(in_int=1, in_str="hi") # is not a TaskBase object So I wasn't quite sure how best to support running tasks outside of a workflow. Option 1Would it be best to have spec = MyFunc(in_int=1, in_str="hi") # is not a Task object
task = spec(name="mytask") # is a Task object
result = task(plugin="serial") # run the task object which would be conceptually clean but more verbose Option 2or automatically do steps 2 and 3 under the hood to keep behaviour close to what it is currently, i.e. spec = MyFunc(in_int=1, in_str="hi") # is not a Task object
result = spec(name="mytask", plugin="serial") # create and run a Task object in one step The only issue with this second option is that task object gets hidden and is hard to access afterwards, and the |
@tclose - thank you for all the work you have put into #662 and the thoughts here. this latter issue of wrapping functions is important to pydra, where the functions come from any package out there. one should be able to use an arbitrary function with as much ease as a fully decorated one. this intent was supposed to be true of shell commands as well (see the nextflow syntax of wrapping shell scripts). a goal of pydra was to provide efficient caching and parallelization to any script in eager mode and to any scripter who can put one together. this addressed one change that we made in nipype at the beginning (the separation of interfaces and nodes). in pydra once a task is defined it can be called with any inputs and any split/combine settings. the other intentional change was that a workflow behaves the same way as a task (again not the case in nipype). both allow "calling" them, running in parallel, and caching. and this brings me to all the back and forth we did between dataclasses and attrs and the main culprit: pickling. here were the principles/features we were considering (i think there is an early issue out there somewhere)
from my perspective the question is not so much whether static type checking or a dataclass like syntax should be there, but more whether the simplest operations can be done in the simplest manner possible so anyone can use it as they would do most functions in a notebook. thus, while thinking about the syntax of things, it may be useful to see how the pydra tutorial would change given the changes proposed and what it would take a "scripter" not a python coder to use pydra easily. some comments: regarding returning task vs workflow with on the
currently any task once defined can be executed with a function call, or in a distributed execution form. there is a fundamental difference in the way a |
Thanks for the detailed feedback @satra!
I had a look at the nextflow syntax, but wasn't quite sure which part you were referring to. In general, I agree that it is a really important feature, as being forced to create new interfaces in Nipype was a big barrier. I think there is scope to further streamline the use of arbitrary functions/shell commands through helper functions, which I was planning to include as part of this work.
The "eager mode" is the only issue that will be impacted on with what I have proposed above, as addressed in my last post above.
I would agree that the unification of the task and workflow behaviour in combination with split/combine is very important as it enables dynamic iteration patterns that weren't possible with Nipype, this was definitely a big selling point.
I have found the
Only the issue raised above
I would argue the proposed dataclass syntax for task specs feels like a reduction in boilerplate. However, it is perhaps just a different, more standardised, boilerplate. But for coders familiar with dataclasses (or are interested in becoming so), I think it would result in much more readable code.
What I am suggesting would split the interfaces and nodes again, but I'm not sure I'm seeing all the drawbacks with this. I would have thought that as long as nodes and workflows are both unified as "tasks" then you would get the benefits you are looking for with it.
Is this possible currently? Doesn't each task object have it's own state preventing it from being reused? Initialising parameterised specs would actually facilitate this if I understand what you mean.
These features wouldn't be impacted
For minimal impact on the tutorial you could just change Cell 3 in the 2_intro_functiontask notebook (and similarly Cell 2 in 3_intro_functiontask_state) from task1 = add_var(a=4, b=5) to task1 = add_var(a=4, b=5)() or perhaps task1 = add_var(a=4, b=5)(cache_dir="/home/me/Desktop/pydra-work") to make it a bit more explicit that the second call is configuring the state whereas the first just parameterises it. Alternatively, if this feels too circuitous if you went with Option 2 you could leave Cell 3 the way it is and just change Cell 8 & 10 to refer to the returned result rather than the
Isn't this already the case if a task is initialised before it is added, e.g. wf = Workflow(name="myworkflow", input_spec={"in_file": File})
mytask = MyShellCmd(in_file=wf.lzin.in_file, an_option="hi")
... If you wanted to avoid this you could have the wf = Workflow(name="myworkflow", input_spec={"in_file": File})
func_lz = wf.add(MyFunc(in_int=1, in_str="hi"))
func2_lz = wf.add(
... I would be quite attracted to this syntax, as I'm always forgetting to add the The only issue would be that you couldn't do the split/combine on the same line as the workflow add, as neither the return value or the arg to wf = Workflow(name="myworkflow", input_spec={"in_file": File})
func_lz = wf.add(MyFunc(in_str="hi"), name="mytask")
wf.mytask.split(in_int=[1, 2, 3])
My plan to set it up so that The only reason to explicitly include
You are right,
I would argue that the proposed changed aren't fundamental, just that the creation of task object is delayed a step from the parameterisation in some way. I agree that it is not ideal, but feel like it is a bit of trade-off to streamline some things down the line. |
EDIT: I realised that by splitting parameterisation from configuration, that workflow construction would be creating a light-weight object and @attrs.define
class Node(ty.Generic[In, Out]):
inputs: In # contains a reference to the task type to instantiate
lzout: Out
_splitter: ty.Union[list, tuple, str]
_workflow: Workflow
def split(...):
def combine(...):
... which would allow us to get very close to the existing syntax (only the return type of add changes from Workflow -> Node) wf = Workflow(input_spec={"in_files": list[File], "in_int": int}, output_spec={"out_str": str, "out_files": list[File]}) # defining the output spec is not necessary, but will cause the outputs to be type-checked
myfunc = wf.add(
MyFunc(in_int=wf.lzin.in_int, in_str="hi")
if blah:
myfunc.inputs.in_str = "hello"
myfunc2 = wf.add(
in_int=myfunc.lzout.out_int, # should be ok
in_str=myfunc.lzout.out_int, # should show up as a mypy error because `in_str` expects a str not an int
name="myfunc2", # Tasks can be optionally given a name to differentiate multiple tasks from the same spec class (otherwise defaults to name of spec class)
myshellcmd = wf.add(
another_option=wf.myfunc2.lzout.out_int, # can still access via wf.* if preferred
).split(in_file=wf.lzin.in_files) # Note method call on the outer parentheses, not inner like current
wf.lzout.out_str = myfunc2.out_str # This is an independent proposal I have just thrown in, which could be an alternative to set_output
wf.set_output(("out_files", myshellcmd.lzout.out_file)) This would also address @satra 's concern about having dangling tasks that users would think they should be able to run |
EDIT: I'm leaning towards the more streamlined execution paradigm, where the task configuration (i.e. cache-locations, environments, etc...) is combined in the execution step as opposed to the 3-stage solution (noting that in the current syntax the configuration is combined with the "parameterisation" stage). The main issue I had with this paradigm was how to implement the split and combine methods, but I think these would work pretty well as kwargs during the configuration/execution stage (NB: they would still be defined using the Execution process# Parameterisation of the interface, only contains parameters
myfunc = MyFunc(in_int=1, in_str="hi")
# Configuration & execution of the interface in a single step
result = myfunc(cache_dir="/path/to/cache", environment=Docker("myfuncdocker:1.0"), plugin="cf") and # Workflow definition (typically in a different module)
preprocess_workflow = Workflow(input_spec=["in_files", "in_int"])
# Workflow parameterisation step
my_preprocess = preprocess_workflow(in_files=["/path/to/file1", "/path/to/file2"], in_int=2)
# Execution step
result = my_preprocess(cache_dir="/path/to/cache", plugin="cf") In order to be able to access the task instance (not to be confused with the instantiated print(result.task.output_dir) If you wanted to split or combine the task/workflow, you would provide the splitter/combiner as a kwarg at execution alternative_preprocess_workflow = Workflow(input_spec=["in_file", "threshold"])
# Workflow parameterisation step
my_preprocess = alternative_preprocess_workflow(threshold=0.5)
result = my_preprocess(plugin="cf", inputs={"in_file": ["/path/to/file1", "/path/to/file2"]}, split="in_file") You could also allow the following simplified syntax for simple outer-only splits (which would be 99% of cases for splits at the execution stage I imagine) result = my_preprocess(plugin="cf", split={"in_file": ["/path/to/file1", "/path/to/file2"]}) Alternative 3-stage processNote that the alternative I was considering was the 3-step process from initialisation->configuration->execution, but think this is a bit too verbose: myfunc = MyFunc(in_str="hi")
myfunc_task = myfunc(cache_dir="/path/to/cache", environment=Docker("myfuncdocker:1.0"))
myfunc_task.split(in_int=[1, 2, 3])
result = myfunc_task(plugin="cf") Note for consistency we would probably want to add another layer to configuring workflows, so the workflow above would be executed as my_preprocess = preprocess_workflow(in_files=["/path/to/file1", "/path/to/file2"])
preprocess_task = my_preprocess(cache_dir="/path/to/cache").split(in_int=[2, 3, 4])
result = preprocess_task(plugin="cf") |
Summary of proposed BW-compatibility-breaking syntax changesInterface design
Workflow construction
Task/workflow execution
Hi Tom, thanks for this thorough and thoughtful proposal. I have an overall gut reaction which is "How are we going to teach this?" I would really like to preserve (or create, where absent) the ability to progress in complexity, instead of making people learn a bunch of Pydra concepts to get going. I want to think through how a user might progress from a first task or workflow to a fully-fledged one, targeting the kind of well-defined structures you're proposing, but with minimal changes in syntax as each new idea gets added. I have predictably not been able to get through all of this in the hour I set myself, and my thoughts haven't coalesced into a direct response, but in case you're interested, I've been writing them up here: https://demo.hedgedoc.org/lu5bWq1mRxm14jSH9Is5bA# I'll need more time to figure out how far these are from what you're doing. Unfortunately, this is a bit of a scrambly time for me, so I'm not sure how much time I can devote to getting through this before next week. I'll try to find a couple more hours. |
This sounds like a good idea, I like how you are laying it out as a tutorial in your thoughts. A big part of my motivation for trying to get static type-checking/code-completion to work is to give users some more guard rails when designing workflows.
I think we probably just need to have a preliminary discussion about it next week. We can walk through my thinking and everyone's first thoughts then maybe agree on some general goals/acceptance criteria of any refactor.
No worries, I won't really have any time to implement anything beyond what I have already committed to to finish of the typing/hashing for a while, so we can take our time. |
Same. Also keeping in mind I am giving a lengthy tutorial in a few weeks and I would be quite upset if most of it turn up deprecated within the next releases :-/
The more I use Just the fact that you can define a record type with Now, something type annotations proved to be useful for is code generation. For instance,
I would very much favor this user-first approach to design. First thing may be to formalize what we assume our typical user should know before using and / or contributing to Pydra. For instance, I did a quick lunch survey at my lab recently and very few people use, know or even care about typing in Python. Besides, something we should definitely spend a decent amount of efforts on is the sad path. Error messages like Guiding principles such as "make invalid state unrepresentable" and "avoid primitive obsession" maybe help us reach the appropriate level of abstraction. I believe that's where @tclose is leading us towards, I am just not sure betting everything on type checking is the right angle to it. Happy to discuss it further in the next meeting if I can make it. |
I understand, although feedback from your tutorial series would be the perfect input into this discussion... ;)
Perhaps guard-rails is the wrong term, "guide rails" maybe? I'm imagining a significant fraction of users who won't necessarily write their own tool interfaces but only put-together/tweak-existing workflows, for who code-completion, if not full type-checking, would be a benefit.
Note that with the merge of my typing-PR, this won't be the case for Pydra from the next release. If types are specified for task input/output fields and they don't match what is connected to them, Pydra will raise errors (but not if they aren't specified).
👍 Not wanting to sound too "agilely" but we probably need to think about multiple personas when considering any changes.
Since Pydra utilises the typing system to distinguish between file and non-file types, my thinking is that they will need to become familiar with Python's typing syntax in any case, and once they have gone to that effort that they will want to see some more benefits.
Another one I would throw in there that has come up a fair bit in my thoughts about this, and recently in practice when working on the Mrtrix3 auto-gen package, is separating user and core namespaces as much as possible. For example, I was trying to work out why the
As I said no rush, we can always delay the discussion to a time that suits everyone. |
This is an example # Split out common arguments into separate attrs classes
@attrs.define(kw_only=True, slots=False)
class RecurseOption:
recurse: bool = shell_arg(
help_string="Recursively descend the directory tree",
# Define an interface for the `ls` command
class Ls:
executable = "ls"
class Inputs(RecurseOption):
directory: os.PathLike = shell_arg(
help_string="the directory to list the contents of",
hidden: bool = shell_arg(
help_string="display hidden FS objects",
class Outputs:
entries: list = shell_out(
help_string="list of entries returned by ls command",
) or alternatively as a function in order to dynamically generate interfaces Ls = shell_task(
"directory": {
"type": os.PathLike,
"help_string": "the directory to list the contents of",
"argstr": "",
"mandatory": True,
"position": -1,
"hidden": {
"type": bool,
"help_string": "display hidden FS objects",
"argstr": "-a",
"entries": {
"type": list,
"help_string": "list of entries returned by ls command",
"callable": list_entries,
) |
What would you like changed/added and why?
Static type-checkers, such as mypy, which analyse code at design time (i.e. before it runs), can be very useful in catching errors while you are writing. However, workflow construction in Pydra is currently too magic for them to be of much use and they tend to throw up lots of false positives. Given strong typing is a key feature of Pydra (particularly if #662 is accepted) it seems like it would be disappointing to users if this typing information isn't able to be utilised by static type-checkers like typical code.
While it won't be possible to get type-checkers to parse Pydra tasks/workflows as they are currently, I think we could get them to work with a few minor-ish adaptations along the lines of what I have already proposed in #655.
What would be the benefit?
Pydra workflows could be type-checked at design time, e.g. by editor-integrated linters, giving designers immediate feedback if they have attached the wrong outputs together or have format mismatches between tools they are linking.
I will expand on the changes I think would be necessary/sufficient in the comments below
The text was updated successfully, but these errors were encountered: