Skip to content

Commit

Permalink
Engine: Make process inputs in launchers positional (#6202)
Browse files Browse the repository at this point in the history
So far the launcher functions, `run` and `submit`, had a signature where
the process class was the only positional argument and the inputs for
the process were defined through keyword arguments.

Recently, however, some fixed arguments were added to `submit`, namely
`wait` and `wait_interval`. A potential problem was missed in this
change as these new keywords now "shadow" a similarly named input port
for a process. If a `Process` defines an input port named `wait` or
`wait_interval`, when passed through the keyword arguments, it will now
be mistaken as the fixed argument for the `submit` function and won't be
passed to the process inputs.

The new functionality by `wait` and `wait_interval` is desirable and in
the future additional arguments may need to be added. Therefore, rather
than undoing the change, another solution is implemented. The `inputs`
positional argument is added and the `wait` and `wait_interval`
arguments are made keyword only. The process inputs should now be
defined as a dictionary for the `inputs` argument and no longer through
`**kwargs`. The `**kwargs` are still supported, as in certain situations
it is more legible and forcing to define an intermediate input dictionary
is unnecessary.

With this approach, backwards-compatibility is maintained and users will
be able to transition to passing inputs as a dictionary instead of
keywords. In addition, if a user actually gets hit by `wait` shadowing
the input port of a `Process`, they can now switch to specifying the
inputs through `inputs` and their input won't be blocked.
  • Loading branch information
sphuber authored Dec 20, 2023
1 parent bfdb282 commit 6d18ccb
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 33 deletions.
33 changes: 23 additions & 10 deletions aiida/engine/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .processes.functions import FunctionProcess
from .processes.process import Process
from .runners import ResultAndPk
from .utils import instantiate_process, is_process_scoped # pylint: disable=no-name-in-module
from .utils import instantiate_process, is_process_scoped, prepare_inputs # pylint: disable=no-name-in-module

__all__ = ('run', 'run_get_pk', 'run_get_node', 'submit', 'await_processes')

Expand All @@ -33,7 +33,7 @@
LOGGER = AIIDA_LOGGER.getChild('engine.launch')


def run(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> dict[str, t.Any]:
def run(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> dict[str, t.Any]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class or process function to run
Expand All @@ -45,10 +45,12 @@ def run(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> dict[str, t
else:
runner = manager.get_manager().get_runner()

return runner.run(process, *args, **inputs)
return runner.run(process, inputs, **kwargs)


def run_get_node(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> tuple[dict[str, t.Any], ProcessNode]:
def run_get_node(process: TYPE_RUN_PROCESS,
inputs: dict[str, t.Any] | None = None,
**kwargs: t.Any) -> tuple[dict[str, t.Any], ProcessNode]:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
Expand All @@ -60,10 +62,10 @@ def run_get_node(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> tu
else:
runner = manager.get_manager().get_runner()

return runner.run_get_node(process, *args, **inputs)
return runner.run_get_node(process, inputs, **kwargs)


def run_get_pk(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> ResultAndPk:
def run_get_pk(process: TYPE_RUN_PROCESS, inputs: dict[str, t.Any] | None = None, **kwargs: t.Any) -> ResultAndPk:
"""Run the process with the supplied inputs in a local runner that will block until the process is completed.
:param process: the process class, instance, builder or function to run
Expand All @@ -75,10 +77,17 @@ def run_get_pk(process: TYPE_RUN_PROCESS, *args: t.Any, **inputs: t.Any) -> Resu
else:
runner = manager.get_manager().get_runner()

return runner.run_get_pk(process, *args, **inputs)
return runner.run_get_pk(process, inputs, **kwargs)


def submit(process: TYPE_SUBMIT_PROCESS, wait: bool = False, wait_interval: int = 5, **inputs: t.Any) -> ProcessNode:
def submit(
process: TYPE_SUBMIT_PROCESS,
inputs: dict[str, t.Any] | None = None,
*,
wait: bool = False,
wait_interval: int = 5,
**kwargs: t.Any
) -> ProcessNode:
"""Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
.. warning: this should not be used within another process. Instead, there one should use the ``submit`` method of
Expand All @@ -87,20 +96,24 @@ def submit(process: TYPE_SUBMIT_PROCESS, wait: bool = False, wait_interval: int
.. warning: submission of processes requires ``store_provenance=True``.
:param process: the process class, instance or builder to submit
:param inputs: the inputs to be passed to the process
:param inputs: the input dictionary to be passed to the process
:param wait: when set to ``True``, the submission will be blocking and wait for the process to complete at which
point the function returns the calculation node.
:param wait_interval: the number of seconds to wait between checking the state of the process when ``wait=True``.
:param kwargs: inputs to be passed to the process. This is deprecated and the inputs should instead be passed as a
dictionary to the ``inputs`` argument.
:return: the calculation node of the process
"""
inputs = prepare_inputs(inputs, **kwargs)

# Submitting from within another process requires ``self.submit``` unless it is a work function, in which case the
# current process in the scope should be an instance of ``FunctionProcess``.
if is_process_scoped() and not isinstance(Process.current(), FunctionProcess):
raise InvalidOperation('Cannot use top-level `submit` from within another process, use `self.submit` instead')

Check failure on line 112 in aiida/engine/launch.py

View workflow job for this annotation

GitHub Actions / tests (3.6)

Cannot use top-level `submit` from within another process, use `self.submit` instead

Check failure on line 112 in aiida/engine/launch.py

View workflow job for this annotation

GitHub Actions / tests (3.7)

Cannot use top-level `submit` from within another process, use `self.submit` instead

Check failure on line 112 in aiida/engine/launch.py

View workflow job for this annotation

GitHub Actions / tests (3.8)

Cannot use top-level `submit` from within another process, use `self.submit` instead

runner = manager.get_manager().get_runner()
assert runner.persister is not None, 'runner does not have a persister'
assert runner.controller is not None, 'runner does not have a persister'
assert runner.controller is not None, 'runner does not have a controller'

process_inited = instantiate_process(runner, process, **inputs)

Expand Down
35 changes: 23 additions & 12 deletions aiida/engine/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def instantiate_process(self, process: TYPE_RUN_PROCESS, **inputs):
from .utils import instantiate_process # pylint: disable=no-name-in-module
return instantiate_process(self, process, **inputs)

def submit(self, process: TYPE_SUBMIT_PROCESS, **inputs: Any):
def submit(self, process: TYPE_SUBMIT_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any):
"""
Submit the process with the supplied inputs to this runner immediately returning control to
the interpreter. The return value will be the calculation node of the submitted process
Expand All @@ -182,6 +182,7 @@ def submit(self, process: TYPE_SUBMIT_PROCESS, **inputs: Any):
assert not utils.is_process_function(process), 'Cannot submit a process function'
assert not self._closed

inputs = utils.prepare_inputs(inputs, **kwargs)
process_inited = self.instantiate_process(process, **inputs)

if not process_inited.metadata.store_provenance:
Expand All @@ -201,7 +202,9 @@ def submit(self, process: TYPE_SUBMIT_PROCESS, **inputs: Any):

return process_inited.node

def schedule(self, process: TYPE_SUBMIT_PROCESS, *args: Any, **inputs: Any) -> ProcessNode:
def schedule(
self, process: TYPE_SUBMIT_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any
) -> ProcessNode:
"""
Schedule a process to be executed by this runner
Expand All @@ -212,11 +215,15 @@ def schedule(self, process: TYPE_SUBMIT_PROCESS, *args: Any, **inputs: Any) -> P
assert not utils.is_process_function(process), 'Cannot submit a process function'
assert not self._closed

process_inited = self.instantiate_process(process, *args, **inputs)
inputs = utils.prepare_inputs(inputs, **kwargs)
process_inited = self.instantiate_process(process, **inputs)
self.loop.create_task(process_inited.step_until_terminated())
return process_inited.node

def _run(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Tuple[Dict[str, Any], ProcessNode]:
def _run(self,
process: TYPE_RUN_PROCESS,
inputs: dict[str, Any] | None = None,
**kwargs: Any) -> Tuple[Dict[str, Any], ProcessNode]:
"""
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
Expand All @@ -227,12 +234,14 @@ def _run(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Tuple[Di
"""
assert not self._closed

inputs = utils.prepare_inputs(inputs, **kwargs)

if utils.is_process_function(process):
result, node = process.run_get_node(*args, **inputs) # type: ignore[union-attr]
result, node = process.run_get_node(**inputs) # type: ignore[union-attr]
return result, node

with utils.loop_scope(self.loop):
process_inited = self.instantiate_process(process, *args, **inputs)
process_inited = self.instantiate_process(process, **inputs)

def kill_process(_num, _frame):
"""Send the kill signal to the process in the current scope."""
Expand All @@ -255,7 +264,7 @@ def kill_process(_num, _frame):

return process_inited.outputs, process_inited.node

def run(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Dict[str, Any]:
def run(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> Dict[str, Any]:
"""
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
Expand All @@ -264,10 +273,12 @@ def run(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Dict[str,
:param inputs: the inputs to be passed to the process
:return: the outputs of the process
"""
result, _ = self._run(process, *args, **inputs)
result, _ = self._run(process, inputs, **kwargs)
return result

def run_get_node(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> ResultAndNode:
def run_get_node(
self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any
) -> ResultAndNode:
"""
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
Expand All @@ -276,10 +287,10 @@ def run_get_node(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) ->
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and the calculation node
"""
result, node = self._run(process, *args, **inputs)
result, node = self._run(process, inputs, **kwargs)
return ResultAndNode(result, node)

def run_get_pk(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> ResultAndPk:
def run_get_pk(self, process: TYPE_RUN_PROCESS, inputs: dict[str, Any] | None = None, **kwargs: Any) -> ResultAndPk:
"""
Run the process with the supplied inputs in this runner that will block until the process is completed.
The return value will be the results of the completed process
Expand All @@ -288,7 +299,7 @@ def run_get_pk(self, process: TYPE_RUN_PROCESS, *args: Any, **inputs: Any) -> Re
:param inputs: the inputs to be passed to the process
:return: tuple of the outputs of the process and process node pk
"""
result, node = self._run(process, *args, **inputs)
result, node = self._run(process, inputs, **kwargs)
return ResultAndPk(result, node.pk)

def call_on_process_finish(self, pk: int, callback: Callable[[], Any]) -> None:
Expand Down
22 changes: 22 additions & 0 deletions aiida/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
###########################################################################
# pylint: disable=invalid-name
"""Utilities for the workflow engine."""
from __future__ import annotations

import asyncio
import contextlib
from datetime import datetime
Expand All @@ -27,6 +29,26 @@
PROCESS_STATE_CHANGE_DESCRIPTION = 'The last time a process of type {}, changed state'


def prepare_inputs(inputs: dict[str, Any] | None = None, **kwargs: Any) -> dict[str, Any]:
"""Prepare inputs for launch of a process.
This is a utility function to pre-process inputs for the process that can be specified both through keyword
arguments as well as through the explicit ``inputs`` argument. When both are specified, a ``ValueError`` is raised.
:param inputs: Inputs dictionary.
:param kwargs: Inputs defined as keyword arguments.
:raises ValueError: If both ``kwargs`` and ``inputs`` are defined.
:returns: The dictionary of inputs for the process.
"""
if inputs is not None and kwargs:
raise ValueError('Cannot specify both `inputs` and `kwargs`.')

if kwargs:
inputs = kwargs

return inputs or {}


def instantiate_process(
runner: 'Runner', process: Union['Process', Type['Process'], 'ProcessBuilder'], **inputs
) -> 'Process':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
'x': orm.Int(1),
'y': orm.Int(2)
}
node = submit(ArithmeticAddCalculation, **inputs)
node = submit(ArithmeticAddCalculation, inputs)
16 changes: 13 additions & 3 deletions docs/source/topics/processes/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,19 @@ As the name suggest, the first three will 'run' the process and the latter will
Running means that the process will be executed in the same interpreter in which it is launched, blocking the interpreter, until the process is terminated.
Submitting to the daemon, in contrast, means that the process will be sent to the daemon for execution, and the interpreter is released straight away.

All functions have the exact same interface ``launch(process, **inputs)`` where:
All functions have the exact same interface ``launch(process, inputs)`` where:

* ``process`` is the process class or process function to launch
* ``inputs`` are the inputs as keyword arguments to pass to the process.
* ``inputs`` the inputs dictionary to pass to the process.

.. versionchanged:: 2.5

Before AiiDA v2.5, the inputs could only be passed as keyword arguments.
This behavior is still supported, e.g., one can launch a process as ``launch(process, **inputs)`` or ``launch(process, input_a=value_a, input_b=value_b)``.
However, the recommended approach is now to use an input dictionary passed as the second positional argument.
The reason is that certain launchers define arguments themselves which can overlap with inputs of the process.
For example, the ``submit`` method defines the ``wait`` keyword.
If the process being launched *also* defines an input named ``wait``, the launcher method cannot tell them apart.

What inputs can be passed depends on the exact process class that is to be launched.
For example, when we want to run an instance of the :py:class:`~aiida.calculations.arithmetic.add.ArithmeticAddCalculation` process, which takes two :py:class:`~aiida.orm.nodes.data.int.Int` nodes as inputs under the name ``x`` and ``y`` [#f1]_, we would do the following:
Expand All @@ -350,7 +359,8 @@ The function will submit the calculation to the daemon and immediately return co
.. warning::
For a process to be submittable, the class or function needs to be importable in the daemon environment by a) giving it an :ref:`associated entry point<how-to:plugin-codes:entry-points>` or b) :ref:`including its module path<how-to:faq:process-not-importable-daemon>` in the ``PYTHONPATH`` that the daemon workers will have.

.. tip::
.. versionadded:: 2.5

Use ``wait=True`` when calling ``submit`` to wait for the process to complete before returning the node.
This can be useful for tutorials and demos in interactive notebooks where the user should not continue before the process is done.
One could of course also use ``run`` (see below), but then the process would be lost if the interpreter gets accidentally shut down.
Expand Down
15 changes: 8 additions & 7 deletions tests/engine/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ def calc_done():
assert future.result()


def test_submit_args(runner):
"""Test that a useful exception is raised when the inputs are passed as a dictionary instead of expanded kwargs.
def test_submit(runner):
"""Test that inputs can be specified either as a positional dictionary or through keyword arguments."""
inputs = {'a': Str('input')}

Regression test for #3609. Before, it would throw the validation exception of the first port to be validated. If
a user accidentally forgot to expand the inputs with `**` it would be a misleading error.
"""
with pytest.raises(TypeError, match=r'takes 2 positional arguments but 3 were given'):
runner.submit(Proc, {'a': Str('input')})
with pytest.raises(ValueError, match='Cannot specify both `inputs` and `kwargs`.'):
runner.submit(Proc, inputs, **inputs)

runner.submit(Proc, inputs)
runner.submit(Proc, **inputs)


def test_run_return_value_cached(aiida_local_code_factory):
Expand Down

0 comments on commit 6d18ccb

Please sign in to comment.