From 5a04619b4cadab44f670d87e973fbf811bd4af52 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Tue, 20 Feb 2018 15:01:56 +0100 Subject: [PATCH 01/13] Add expose_* functions where the exposed ports are saved in a nested dict The exposing is stored as _exposed_inputs / _exposed_outputs, which is a dict of [namespace][process_class]. --- aiida/work/process_spec.py | 85 +++++++++++++++++++++++++++++++++++++- aiida/work/processes.py | 39 ++++++++++++++++- 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/aiida/work/process_spec.py b/aiida/work/process_spec.py index 7bd8a81c8a..3e0c1da2af 100644 --- a/aiida/work/process_spec.py +++ b/aiida/work/process_spec.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +from collections import defaultdict + import plumpy import voluptuous @@ -15,7 +17,7 @@ def __call__(self, value): """ Call this to validate the value against the schema. - :param value: a regular dictionary or a ParameterData instance + :param value: a regular dictionary or a ParameterData instance :return: tuple (success, msg). success is True if the value is valid and False otherwise, in which case msg will contain information about the validation failure. @@ -55,4 +57,83 @@ class ProcessSpec(plumpy.ProcessSpec): PORT_NAMESPACE_TYPE = PortNamespace def __init__(self): - super(ProcessSpec, self).__init__() \ No newline at end of file + super(ProcessSpec, self).__init__() + self._exposed_inputs = defaultdict(lambda: defaultdict(list)) + + def expose_inputs(self, process_class, namespace=None, exclude=(), include=None): + """ + This method allows one to automatically add the inputs from another + Process to this ProcessSpec. The optional namespace argument can be + used to group the exposed inputs in a separated PortNamespace + + :param process_class: the Process class whose inputs to expose + :param namespace: a namespace in which to place the exposed inputs + :param exclude: list or tuple of input keys to exclude from being exposed + """ + self._expose_ports( + process_class=process_class, + source=process_class.spec().inputs, + destination=self.inputs, + expose_memory=self._exposed_inputs, + namespace=namespace, + exclude=exclude, + include=include + ) + + def expose_outputs(self, process_class, namespace=None, exclude=(), include=None): + """ + This method allows one to automatically add the otuputs from another + Process to this ProcessSpec. The optional namespace argument can be + used to group the exposed outputs in a separated PortNamespace + + :param process_class: the Process class whose inputs to expose + :param namespace: a namespace in which to place the exposed inputs + :param exclude: list or tuple of input keys to exclude from being exposed + """ + self._expose_ports( + process_class=process_class, + source=process_class.spec().outputs, + destination=self.outputs, + expose_memory=self._exposed_outputs, + namespace=namespace, + exclude=exclude, + include=include + ) + + def _expose_ports( + self, + process_class, + source, + destination, + expose_memory, + namespace, + exclude, + include + ): + if namespace: + port_namespace = destination.create_port_namespace(namespace) + else: + port_namespace = destination + exposed_list = expose_memory[namespace][process_class] + + for name, port in self._filter_names( + source.iteritems(), + exclude=exclude, + include=include + ): + port_namespace[name] = port + exposed_list.append(name) + + @staticmethod + def _filter_names(items, exclude, include): + if exclude and include is not None: + raise ValueError('exclude and include are mutually exclusive') + + for name, port in items: + if include is not None: + if name not in include: + continue + else: + if name in exclude: + continue + yield name, port diff --git a/aiida/work/processes.py b/aiida/work/processes.py index a0fdfda083..3a03f6efb5 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -273,7 +273,7 @@ def _create_and_setup_db_record(self): @override def encode_input_args(self, inputs): - """ + """ Encode input arguments such that they may be saved in a Bundle :param inputs: A mapping of the inputs as passed to the process @@ -286,7 +286,7 @@ def decode_input_args(self, encoded): """ Decode saved input arguments as they came from the saved instance state Bundle - :param encoded: + :param encoded: :return: The decoded input args """ return deserialize_data(encoded) @@ -405,6 +405,41 @@ def _use_cache_enabled(self): ) + def exposed_inputs(self, process_class, namespace=None, agglomerate=True): + """ + Gather a dictionary of the inputs that were exposed for a given Process + class under an optional namespace. + + :param process_class: Process class whose inputs to try and retrieve + :param namespace: PortNamespace in which to look for the inputs + """ + exposed_inputs = {} + namespaces = [namespace] + + # If inputs are to be agglomerated, we prepend the lower lying namespace + if agglomerate: + namespaces.insert(0, None) + + for namespace in namespaces: + exposed_inputs_list = self.spec()._exposed_inputs[namespace][process_class] + # The namespace None indicates the base level namespace + if namespace is None: + inputs = self.inputs + port_namespace = self.spec().inputs + else: + inputs = self.inputs[namespace] + try: + port_namespace = self.spec().get_input(namespace) + except KeyError: + raise ValueError('this process does not contain the "{}" input namespace'.format(namespace)) + + for name, port in port_namespace.ports.iteritems(): + if name in inputs and name in exposed_inputs_list: + exposed_inputs[name] = inputs[name] + + return exposed_inputs + + class FunctionProcess(Process): _func_args = None _calc_node_class = WorkCalculation From 15740de601fa510ee43c8748cd7a1d60d8ef620e Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Tue, 20 Feb 2018 16:01:49 +0100 Subject: [PATCH 02/13] Add test for expose_inputs --- aiida/backends/tests/work/work_chain.py | 85 ++++++++++++++++++++++++- aiida/work/processes.py | 6 +- 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index 15b8da0a40..bd626d893c 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -16,7 +16,7 @@ from aiida.backends.testbase import AiidaTestCase from aiida.common.links import LinkType from aiida.daemon.workflowmanager import execute_steps -from aiida.orm.data.base import Int, Str, Bool +from aiida.orm.data.base import Int, Str, Bool, Float from aiida.work.utils import ProcessStack from aiida.workflows.wf_demo import WorkflowDemo from aiida import work @@ -790,3 +790,86 @@ def step_two(self): x = Int(1) y = Int(2) work.launch.run(Wf, subspace={'one': Int(1), 'two': Int(2)}) + +class ParentExposeWorkChain(work.WorkChain): + @classmethod + def define(cls, spec): + super(ParentExposeWorkChain, cls).define(spec) + + spec.expose_inputs(ChildExposeWorkChain, include=['a']) + spec.expose_inputs( + ChildExposeWorkChain, + exclude=['a'], + namespace='sub_1', + ) + spec.expose_inputs( + ChildExposeWorkChain, + exclude=['a'], + namespace='sub_2.sub_3', + ) + + spec.outline( + cls.start_children, + cls.finalize + ) + + def start_children(self): + child_1 = self.submit( + ChildExposeWorkChain, + **self.exposed_inputs(ChildExposeWorkChain, namespace='sub_1') + ) + child_2 = self.submit( + ChildExposeWorkChain, + a=self.exposed_inputs(ChildExposeWorkChain)['a'], + **self.exposed_inputs( + ChildExposeWorkChain, + namespace='sub_2.sub_3', + agglomerate=False + ) + ) + return ToContext(child_1=child_1, child_2=child_2) + + def finalize(self): + pass + +class ChildExposeWorkChain(work.WorkChain): + @classmethod + def define(cls, spec): + super(ChildExposeWorkChain, cls).define(spec) + + spec.input('a', valid_type=Int) + spec.input('b', valid_type=Float) + spec.input('c', valid_type=Bool) + + spec.outline(cls.do_run) + + spec.output('o', valid_type=Float) + + def do_run(self): + self.out('o', self.inputs.b) + +class TestWorkChainExpose(AiidaTestCase): + """ + Test the expose inputs / outputs functionality + """ + + def setUp(self): + super(TestWorkChainExpose, self).setUp() + self.assertEquals(len(ProcessStack.stack()), 0) + self.runner = utils.create_test_runner() + + def tearDown(self): + super(TestWorkChainExpose, self).tearDown() + work.set_runner(None) + self.runner.close() + self.runner = None + self.assertEquals(len(ProcessStack.stack()), 0) + + def test_expose(self): + res = work.launch.run( + ParentExposeWorkChain, + a=Int(1), + sub_1={'b': Float(2.3), 'c': Bool(True)}, + sub_2={'sub_3': {'b': Float(1.2), 'c': Bool(False)}}, + ) + # self.assertEquals(res['o'], 2.3) diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 3a03f6efb5..912a627feb 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -427,9 +427,11 @@ class under an optional namespace. inputs = self.inputs port_namespace = self.spec().inputs else: - inputs = self.inputs[namespace] + inputs = self.inputs + for ns in namespace.split('.'): + inputs = inputs[ns] try: - port_namespace = self.spec().get_input(namespace) + port_namespace = self.spec().inputs.get_port(namespace) except KeyError: raise ValueError('this process does not contain the "{}" input namespace'.format(namespace)) From 515bced786a33cfa1487f367f210ae9fede2db4b Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Tue, 20 Feb 2018 17:07:41 +0100 Subject: [PATCH 03/13] Check all intermediate namespaces in agglomerating --- aiida/backends/tests/work/work_chain.py | 14 +++++++++----- aiida/work/processes.py | 22 ++++++++++++++++------ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index bd626d893c..f2c7ca266b 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -804,7 +804,12 @@ def define(cls, spec): ) spec.expose_inputs( ChildExposeWorkChain, - exclude=['a'], + include=['b'], + namespace='sub_2', + ) + spec.expose_inputs( + ChildExposeWorkChain, + include=['c'], namespace='sub_2.sub_3', ) @@ -816,15 +821,14 @@ def define(cls, spec): def start_children(self): child_1 = self.submit( ChildExposeWorkChain, - **self.exposed_inputs(ChildExposeWorkChain, namespace='sub_1') + a=self.exposed_inputs(ChildExposeWorkChain)['a'], + **self.exposed_inputs(ChildExposeWorkChain, namespace='sub_1', agglomerate=False) ) child_2 = self.submit( ChildExposeWorkChain, - a=self.exposed_inputs(ChildExposeWorkChain)['a'], **self.exposed_inputs( ChildExposeWorkChain, namespace='sub_2.sub_3', - agglomerate=False ) ) return ToContext(child_1=child_1, child_2=child_2) @@ -870,6 +874,6 @@ def test_expose(self): ParentExposeWorkChain, a=Int(1), sub_1={'b': Float(2.3), 'c': Bool(True)}, - sub_2={'sub_3': {'b': Float(1.2), 'c': Bool(False)}}, + sub_2={'b': Float(1.2), 'sub_3': {'c': Bool(False)}}, ) # self.assertEquals(res['o'], 2.3) diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 912a627feb..090db3bfa8 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -414,13 +414,9 @@ class under an optional namespace. :param namespace: PortNamespace in which to look for the inputs """ exposed_inputs = {} - namespaces = [namespace] - # If inputs are to be agglomerated, we prepend the lower lying namespace - if agglomerate: - namespaces.insert(0, None) - - for namespace in namespaces: + namespace_list = self._get_namespace_list(namespace=namespace, agglomerate=agglomerate) + for namespace in namespace_list: exposed_inputs_list = self.spec()._exposed_inputs[namespace][process_class] # The namespace None indicates the base level namespace if namespace is None: @@ -441,6 +437,20 @@ class under an optional namespace. return exposed_inputs + @staticmethod + def _get_namespace_list(namespace=None, agglomerate=True): + if not agglomerate: + return [namespace] + else: + namespace_list = [None] + if namespace is not None: + split_ns = namespace.split('.') + namespace_list.extend([ + '.'.join(split_ns[:i]) + for i in range(1, len(split_ns) + 1) + ]) + return namespace_list + class FunctionProcess(Process): _func_args = None From 52522e5df6b8a56362e21329810b718da4f697d4 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Tue, 20 Feb 2018 19:47:09 +0100 Subject: [PATCH 04/13] Add expose_outputs test and implementation --- aiida/backends/tests/work/work_chain.py | 50 ++++++++++++++++++++++--- aiida/work/process_spec.py | 1 + aiida/work/processes.py | 46 +++++++++++++++++++++++ 3 files changed, 92 insertions(+), 5 deletions(-) diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index f2c7ca266b..4b76dc2635 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -813,6 +813,23 @@ def define(cls, spec): namespace='sub_2.sub_3', ) + spec.expose_outputs(ChildExposeWorkChain, include=['a']) + spec.expose_outputs( + ChildExposeWorkChain, + exclude=['a'], + namespace='sub_1' + ) + spec.expose_outputs( + ChildExposeWorkChain, + include=['b'], + namespace='sub_2' + ) + spec.expose_outputs( + ChildExposeWorkChain, + include=['c'], + namespace='sub_2.sub_3' + ) + spec.outline( cls.start_children, cls.finalize @@ -834,7 +851,19 @@ def start_children(self): return ToContext(child_1=child_1, child_2=child_2) def finalize(self): - pass + exposed_1 = self.exposed_outputs( + self.ctx.child_1, + ChildExposeWorkChain, + namespace='sub_1', + agglomerate=False + ) + self.out_many(exposed_1) + exposed_2 = self.exposed_outputs( + self.ctx.child_2, + ChildExposeWorkChain, + namespace='sub_2.sub_3' + ) + self.out_many(exposed_2) class ChildExposeWorkChain(work.WorkChain): @classmethod @@ -845,12 +874,16 @@ def define(cls, spec): spec.input('b', valid_type=Float) spec.input('c', valid_type=Bool) - spec.outline(cls.do_run) + spec.output('a', valid_type=Float) + spec.output('b', valid_type=Float) + spec.output('c', valid_type=Bool) - spec.output('o', valid_type=Float) + spec.outline(cls.do_run) def do_run(self): - self.out('o', self.inputs.b) + self.out('a', self.inputs.a + self.inputs.b) + self.out('b', self.inputs.b) + self.out('c', self.inputs.c) class TestWorkChainExpose(AiidaTestCase): """ @@ -876,4 +909,11 @@ def test_expose(self): sub_1={'b': Float(2.3), 'c': Bool(True)}, sub_2={'b': Float(1.2), 'sub_3': {'c': Bool(False)}}, ) - # self.assertEquals(res['o'], 2.3) + self.assertEquals( + res, + { + 'a': Float(2.2), + 'sub_1.b': Float(2.3), 'sub_1.c': Bool(True), + 'sub_2.b': Float(1.2), 'sub_2.sub_3.c': Bool(False) + } + ) diff --git a/aiida/work/process_spec.py b/aiida/work/process_spec.py index 3e0c1da2af..7fda46c29d 100644 --- a/aiida/work/process_spec.py +++ b/aiida/work/process_spec.py @@ -59,6 +59,7 @@ class ProcessSpec(plumpy.ProcessSpec): def __init__(self): super(ProcessSpec, self).__init__() self._exposed_inputs = defaultdict(lambda: defaultdict(list)) + self._exposed_outputs = defaultdict(lambda: defaultdict(list)) def expose_inputs(self, process_class, namespace=None, exclude=(), include=None): """ diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 090db3bfa8..44a4c495a0 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -16,6 +16,7 @@ import plumpy import uuid import traceback +from collections import defaultdict from plumpy import ProcessState from aiida.common import exceptions @@ -168,6 +169,10 @@ def out(self, output_port, value=None): else: return super(Process, self).out(output_port, value) + def out_many(self, out_dict): + for key, value in out_dict.items(): + self.out(key, value) + # region Process messages @override def on_entering(self, state): @@ -437,6 +442,24 @@ class under an optional namespace. return exposed_inputs + def exposed_outputs(self, process_instance, process_class, namespace=None, agglomerate=True): + output_key_map = {} + process_outputs_dict = process_instance.get_outputs_dict() + for ns in self._get_namespace_list(namespace=namespace, agglomerate=agglomerate): + for key in process_outputs_dict: + if key in self.spec()._exposed_outputs[ns][process_class]: + output_key_map[key] = ns + + result = {} + namespace_separator = self.spec().namespace_separator + for key, ns in output_key_map.items(): + if ns is None: + result[key] = process_outputs_dict[key] + else: + result[ns + namespace_separator + key] = process_outputs_dict[key] + return result + + @staticmethod def _get_namespace_list(namespace=None, agglomerate=True): if not agglomerate: @@ -451,6 +474,29 @@ def _get_namespace_list(namespace=None, agglomerate=True): ]) return namespace_list + # def _flatten_output_dict(self, output_dict): + # res = {} + # for key, value in output_dict.items(): + # if not isinstance(value, Data): + # for sub_key, sub_value in self._flatten_output_dict(value).items(): + # res[key + '.' + sub_key] = sub_value + # else: + # res[key] = value + # return res + + # def _wrap_outputs_dict(self, flat_outputs_dict): + # sub_dicts = defaultdict(dict) + # res = {} + # for key, value in flat_outputs_dict.items(): + # if '.' in key: + # namespace, sub_key = key.split('.', 1) + # sub_dicts[namespace][sub_key] = value + # else: + # res[key] = value + # for namespace, sub_dict in sub_dicts.items(): + # res[namespace] = self._wrap_outputs_dict(sub_dict) + # return res + class FunctionProcess(Process): _func_args = None From a84e4e574e0f477d5b58e3d88475990b1ab8ee4a Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Tue, 20 Feb 2018 19:47:44 +0100 Subject: [PATCH 05/13] Remove commented dict flatten / wrap functions --- aiida/work/processes.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 44a4c495a0..7266a787c7 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -474,30 +474,6 @@ def _get_namespace_list(namespace=None, agglomerate=True): ]) return namespace_list - # def _flatten_output_dict(self, output_dict): - # res = {} - # for key, value in output_dict.items(): - # if not isinstance(value, Data): - # for sub_key, sub_value in self._flatten_output_dict(value).items(): - # res[key + '.' + sub_key] = sub_value - # else: - # res[key] = value - # return res - - # def _wrap_outputs_dict(self, flat_outputs_dict): - # sub_dicts = defaultdict(dict) - # res = {} - # for key, value in flat_outputs_dict.items(): - # if '.' in key: - # namespace, sub_key = key.split('.', 1) - # sub_dicts[namespace][sub_key] = value - # else: - # res[key] = value - # for namespace, sub_dict in sub_dicts.items(): - # res[namespace] = self._wrap_outputs_dict(sub_dict) - # return res - - class FunctionProcess(Process): _func_args = None _calc_node_class = WorkCalculation From c39fca8287e86c864d4601f915c4f776ac8b0dbb Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 09:19:48 +0100 Subject: [PATCH 06/13] Add docstrings to expose_* and exposed_* functions --- aiida/work/process_spec.py | 4 ++-- aiida/work/processes.py | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/aiida/work/process_spec.py b/aiida/work/process_spec.py index 7fda46c29d..0b4508a6dd 100644 --- a/aiida/work/process_spec.py +++ b/aiida/work/process_spec.py @@ -83,9 +83,9 @@ def expose_inputs(self, process_class, namespace=None, exclude=(), include=None) def expose_outputs(self, process_class, namespace=None, exclude=(), include=None): """ - This method allows one to automatically add the otuputs from another + This method allows one to automatically add the ouputs from another Process to this ProcessSpec. The optional namespace argument can be - used to group the exposed outputs in a separated PortNamespace + used to group the exposed outputs in a separated PortNamespace. :param process_class: the Process class whose inputs to expose :param namespace: a namespace in which to place the exposed inputs diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 7266a787c7..16ef9c0ab4 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -412,11 +412,15 @@ def _use_cache_enabled(self): def exposed_inputs(self, process_class, namespace=None, agglomerate=True): """ - Gather a dictionary of the inputs that were exposed for a given Process - class under an optional namespace. + Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace. :param process_class: Process class whose inputs to try and retrieve + :param namespace: PortNamespace in which to look for the inputs + :type namespace: str + + :param agglomerate: If set to true, all parent namespaces of the given ``namespace`` will also be searched for inputs. Inputs in lower-lying namespaces take precedence. + :type agglomerate: bool """ exposed_inputs = {} @@ -443,6 +447,15 @@ class under an optional namespace. return exposed_inputs def exposed_outputs(self, process_instance, process_class, namespace=None, agglomerate=True): + """ + Gather the outputs which were exposed from the ``process_class`` and emitted by the specific ``process_instance`` in a dictionary. + + :param namespace: Namespace in which to search for exposed outputs. + :type namespace: str + + :param agglomerate: If set to true, all parent namespaces of the given ``namespace`` will also be searched for outputs. Outputs in lower-lying namespaces take precedence. + :type agglomerate: bool + """ output_key_map = {} process_outputs_dict = process_instance.get_outputs_dict() for ns in self._get_namespace_list(namespace=namespace, agglomerate=agglomerate): From 6ddcddc2d537d063f2637d99b0ae3d14cfb5f48a Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 10:27:11 +0100 Subject: [PATCH 07/13] Add example workchains for expose inputs/outputs --- docs/source/concepts/expose_examples/child.py | 22 ++++++ .../expose_examples/complex_parent.py | 71 +++++++++++++++++++ .../concepts/expose_examples/run_complex.py | 18 +++++ .../concepts/expose_examples/run_simple.py | 16 +++++ .../concepts/expose_examples/simple_parent.py | 27 +++++++ docs/source/concepts/workflows.rst | 20 +++++- 6 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 docs/source/concepts/expose_examples/child.py create mode 100644 docs/source/concepts/expose_examples/complex_parent.py create mode 100755 docs/source/concepts/expose_examples/run_complex.py create mode 100755 docs/source/concepts/expose_examples/run_simple.py create mode 100644 docs/source/concepts/expose_examples/simple_parent.py diff --git a/docs/source/concepts/expose_examples/child.py b/docs/source/concepts/expose_examples/child.py new file mode 100644 index 0000000000..5ac134b886 --- /dev/null +++ b/docs/source/concepts/expose_examples/child.py @@ -0,0 +1,22 @@ +from aiida.orm.data.base import Float, Int, Bool +from aiida.work import WorkChain + +class ChildWorkChain(WorkChain): + @classmethod + def define(cls, spec): + super(ChildWorkChain, cls).define(spec) + + spec.input('a', valid_type=Int) + spec.input('b', valid_type=Float) + spec.input('c', valid_type=Bool) + + spec.output('d', valid_type=Int) + spec.output('e', valid_type=Float) + spec.output('f', valid_type=Bool) + + spec.outline(cls.do_run) + + def do_run(self): + self.out('d', self.inputs.a) + self.out('e', self.inputs.b) + self.out('f', self.inputs.c) diff --git a/docs/source/concepts/expose_examples/complex_parent.py b/docs/source/concepts/expose_examples/complex_parent.py new file mode 100644 index 0000000000..dfef902cd5 --- /dev/null +++ b/docs/source/concepts/expose_examples/complex_parent.py @@ -0,0 +1,71 @@ +from aiida.work import ToContext, WorkChain, run + +from child import ChildWorkChain + +class ComplexParentWorkChain(WorkChain): + @classmethod + def define(cls, spec): + super(ComplexParentWorkChain, cls).define(spec) + + spec.expose_inputs(ChildWorkChain, include=['a']) + spec.expose_inputs( + ChildWorkChain, + namespace='child_1', + exclude=['a'] + ) + spec.expose_inputs( + ChildWorkChain, + namespace='child_2', + exclude=['a'] + ) + + spec.expose_outputs(ChildWorkChain, include=['d']) + spec.expose_outputs( + ChildWorkChain, + namespace='child_1', + exclude=['d'], + ) + spec.expose_outputs( + ChildWorkChain, + namespace='child_2', + exclude=['d'], + ) + + spec.outline(cls.run_children, cls.finalize) + + def run_children(self): + return ToContext( + child_1=self.submit( + ChildWorkChain, + **self.exposed_inputs( + ChildWorkChain, + namespace='child_1' + ) + ), + child_2=self.submit( + ChildWorkChain, + a=self.inputs.a, + **self.exposed_inputs( + ChildWorkChain, + namespace='child_2', + agglomerate=False + ) + ) + ) + + def finalize(self): + self.out_many( + self.exposed_outputs( + self.ctx.child_1, + ChildWorkChain, + namespace='child_1' + ) + ) + self.out_many( + self.exposed_outputs( + self.ctx.child_2, + ChildWorkChain, + namespace='child_2', + agglomerate=False + ) + ) diff --git a/docs/source/concepts/expose_examples/run_complex.py b/docs/source/concepts/expose_examples/run_complex.py new file mode 100755 index 0000000000..4815392c51 --- /dev/null +++ b/docs/source/concepts/expose_examples/run_complex.py @@ -0,0 +1,18 @@ +#!/usr/bin/env runaiida + +from __future__ import print_function + +from aiida.orm.data.base import Int, Float, Bool +from aiida.work import run + +from complex_parent import ComplexParentWorkChain + +if __name__ == '__main__': + print('complex parent:', + run( + ComplexParentWorkChain, + a=Int(1), + child_1=dict(b=Float(1.2), c=Bool(True)), + child_2=dict(b=Float(2.3), c=Bool(False)) + ) + ) diff --git a/docs/source/concepts/expose_examples/run_simple.py b/docs/source/concepts/expose_examples/run_simple.py new file mode 100755 index 0000000000..2f1ca006fb --- /dev/null +++ b/docs/source/concepts/expose_examples/run_simple.py @@ -0,0 +1,16 @@ +#!/usr/bin/env runaiida + +from __future__ import print_function + +from aiida.orm.data.base import Int, Float, Bool +from aiida.work import run + +from simple_parent import SimpleParentWorkChain + +if __name__ == '__main__': + print('simple parent:', + run( + SimpleParentWorkChain, + a=Int(1), b=Float(1.2), c=Bool(True) + ) + ) diff --git a/docs/source/concepts/expose_examples/simple_parent.py b/docs/source/concepts/expose_examples/simple_parent.py new file mode 100644 index 0000000000..5bf7da8038 --- /dev/null +++ b/docs/source/concepts/expose_examples/simple_parent.py @@ -0,0 +1,27 @@ +from aiida.work import ToContext, WorkChain, run + +from child import ChildWorkChain + +class SimpleParentWorkChain(WorkChain): + @classmethod + def define(cls, spec): + super(SimpleParentWorkChain, cls).define(spec) + + spec.expose_inputs(ChildWorkChain) + spec.expose_outputs(ChildWorkChain) + + spec.outline(cls.run_child, cls.finalize) + + def run_child(self): + return ToContext(child=self.submit( + ChildWorkChain, + **self.exposed_inputs(ChildWorkChain) + )) + + def finalize(self): + self.out_many( + self.exposed_outputs( + self.ctx.child, + ChildWorkChain + ) + ) diff --git a/docs/source/concepts/workflows.rst b/docs/source/concepts/workflows.rst index 3159fc76ab..985ded6f31 100644 --- a/docs/source/concepts/workflows.rst +++ b/docs/source/concepts/workflows.rst @@ -37,4 +37,22 @@ ISSUE#1129 Workflow development ==================== -ISSUE#1130 \ No newline at end of file +ISSUE#1130 + +Exposing inputs and outputs +--------------------------- + +.. include:: expose_examples/child.py + :code: python + +.. include:: expose_examples/simple_parent.py + :code: python + +.. include:: expose_examples/run_simple.py + :code: python + +.. include:: expose_examples/complex_parent.py + :code: python + +.. include:: expose_examples/run_complex.py + :code: python From 9a363c1f1783a48bdb2fe8653d545e729981dfa2 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 11:26:49 +0100 Subject: [PATCH 08/13] Add description for simple expose workchain --- aiida/work/process_spec.py | 3 +++ aiida/work/processes.py | 3 +++ docs/source/concepts/workflows.rst | 24 ++++++++++++++++++++++-- docs/source/work/dev.rst | 5 ++++- 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/aiida/work/process_spec.py b/aiida/work/process_spec.py index 0b4508a6dd..05b8f85a41 100644 --- a/aiida/work/process_spec.py +++ b/aiida/work/process_spec.py @@ -52,6 +52,9 @@ def _get_template(self, dict): class ProcessSpec(plumpy.ProcessSpec): + """ + Contains the inputs, outputs and outline of a process. + """ INPUT_PORT_TYPE = InputPort PORT_NAMESPACE_TYPE = PortNamespace diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 16ef9c0ab4..f1b04a1153 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -170,6 +170,9 @@ def out(self, output_port, value=None): return super(Process, self).out(output_port, value) def out_many(self, out_dict): + """ + Add all values given in ``out_dict`` to the outputs. The keys of the dictionary will be used as output names. + """ for key, value in out_dict.items(): self.out(key, value) diff --git a/docs/source/concepts/workflows.rst b/docs/source/concepts/workflows.rst index 985ded6f31..b253807191 100644 --- a/docs/source/concepts/workflows.rst +++ b/docs/source/concepts/workflows.rst @@ -39,18 +39,38 @@ Workflow development ISSUE#1130 -Exposing inputs and outputs ---------------------------- +.. _expose_inputs_outputs: + +Modular workflows: exposing inputs and outputs +---------------------------------------------- + +When creating complex workflows, it is a good idea to split them up into smaller, modular parts. At the lowest level, each workflow should perform exactly one task. These workflows can then be wrapped together by a "parent" workflow to create a larger logical unit. + +In order to make this approach manageable, it needs to be as simple as possible to glue together multiple workflows in a larger parent workflow. For this reason, AiiDA provides the *expose* functionality, which will be explained here. + +Consider the following example workchain, which simply takes a few inputs and returns them again as outputs: .. include:: expose_examples/child.py :code: python +As a first example, we will implement a thin wrapper workflow, which simply forwards its inputs to ``ChildWorkChain``, and forwards the outputs of the child to its outputs: + .. include:: expose_examples/simple_parent.py :code: python +In the ``define`` method of this simple parent workchain, we use the :meth:`.expose_inputs` and :meth:`.expose_outputs`. This creates the corresponding input and output ports in the parent workchain. + +Additionally, AiiDA remembers which inputs and outputs were exposed from that particular workchain class. This is used when calling the child in the ``run_child`` method. The :meth:`.Process.exposed_inputs` method returns a dictionary of inputs that the parent received which were exposed from the child, and so it can be used to pass these on to the child. + +Finally, in the ``finalize`` method, we use :meth:`.Process.exposed_outputs` to retrieve the outputs of the child which were exposed to the parent. Using :meth:`.out_many`, these outputs are added to the outputs of the parent workchain. + +This workchain can now be run in exactly the same way as the child itself: + .. include:: expose_examples/run_simple.py :code: python + + .. include:: expose_examples/complex_parent.py :code: python diff --git a/docs/source/work/dev.rst b/docs/source/work/dev.rst index e97761ef04..8a2817b059 100644 --- a/docs/source/work/dev.rst +++ b/docs/source/work/dev.rst @@ -27,6 +27,9 @@ This section describes the different classes related to workflows, workfunctions .. automodule:: aiida.work.process_builder :members: +.. automodule:: aiida.work.process_spec + :members: + .. automodule:: aiida.work.processes :members: @@ -43,4 +46,4 @@ This section describes the different classes related to workflows, workfunctions :members: .. automodule:: aiida.work.workfunctions - :members: \ No newline at end of file + :members: From e625225ad28bf3641b09fc9dfdc022687a734356 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 12:02:18 +0100 Subject: [PATCH 09/13] Add description for more complex expose workchain --- .../expose_examples/complex_parent.py | 6 +++--- .../concepts/expose_examples/run_complex.py | 20 +++++++++++-------- .../concepts/expose_examples/run_simple.py | 11 +++++----- docs/source/concepts/workflows.rst | 18 +++++++++++++---- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/docs/source/concepts/expose_examples/complex_parent.py b/docs/source/concepts/expose_examples/complex_parent.py index dfef902cd5..6ec933553e 100644 --- a/docs/source/concepts/expose_examples/complex_parent.py +++ b/docs/source/concepts/expose_examples/complex_parent.py @@ -19,16 +19,16 @@ def define(cls, spec): exclude=['a'] ) - spec.expose_outputs(ChildWorkChain, include=['d']) + spec.expose_outputs(ChildWorkChain, include=['e']) spec.expose_outputs( ChildWorkChain, namespace='child_1', - exclude=['d'], + exclude=['e'], ) spec.expose_outputs( ChildWorkChain, namespace='child_2', - exclude=['d'], + exclude=['e'], ) spec.outline(cls.run_children, cls.finalize) diff --git a/docs/source/concepts/expose_examples/run_complex.py b/docs/source/concepts/expose_examples/run_complex.py index 4815392c51..a5b5036335 100755 --- a/docs/source/concepts/expose_examples/run_complex.py +++ b/docs/source/concepts/expose_examples/run_complex.py @@ -8,11 +8,15 @@ from complex_parent import ComplexParentWorkChain if __name__ == '__main__': - print('complex parent:', - run( - ComplexParentWorkChain, - a=Int(1), - child_1=dict(b=Float(1.2), c=Bool(True)), - child_2=dict(b=Float(2.3), c=Bool(False)) - ) - ) + print(run( + ComplexParentWorkChain, + a=Int(1), + child_1=dict(b=Float(1.2), c=Bool(True)), + child_2=dict(b=Float(2.3), c=Bool(False)) + )) + # Result: + # { + # u'e': 1.2, + # u'child_1.d': 1, u'child_1.f': True, + # u'child_2.d': 1, u'child_2.f': False + # } diff --git a/docs/source/concepts/expose_examples/run_simple.py b/docs/source/concepts/expose_examples/run_simple.py index 2f1ca006fb..1f7c6ed4ac 100755 --- a/docs/source/concepts/expose_examples/run_simple.py +++ b/docs/source/concepts/expose_examples/run_simple.py @@ -8,9 +8,8 @@ from simple_parent import SimpleParentWorkChain if __name__ == '__main__': - print('simple parent:', - run( - SimpleParentWorkChain, - a=Int(1), b=Float(1.2), c=Bool(True) - ) - ) + print(run( + SimpleParentWorkChain, + a=Int(1), b=Float(1.2), c=Bool(True) + )) + # Result: {u'e': 1.2, u'd': 1, u'f': True} diff --git a/docs/source/concepts/workflows.rst b/docs/source/concepts/workflows.rst index b253807191..710f02aa49 100644 --- a/docs/source/concepts/workflows.rst +++ b/docs/source/concepts/workflows.rst @@ -41,8 +41,8 @@ ISSUE#1130 .. _expose_inputs_outputs: -Modular workflows: exposing inputs and outputs ----------------------------------------------- +Exposing inputs and outputs +--------------------------- When creating complex workflows, it is a good idea to split them up into smaller, modular parts. At the lowest level, each workflow should perform exactly one task. These workflows can then be wrapped together by a "parent" workflow to create a larger logical unit. @@ -69,10 +69,20 @@ This workchain can now be run in exactly the same way as the child itself: .. include:: expose_examples/run_simple.py :code: python +Next, we will see how a more complex parent workchain can be created by using the additional features of the expose functionality. The following workchain launches two children. These children share the input ``a``, but have different ``b`` and ``c``. The output ``e`` will be taken only from the first child, whereas ``d`` and ``f`` are taken from both children. In order to avoid name conflicts, we need to create a *namespace* for each of the two children, where the inputs and outputs which are not shared are stored. Our goal is that the workflow can be called as follows: +.. include:: expose_examples/run_complex.py + :code: python + +This is achieved by the following workflow. In the next section, we will explain each of the steps. .. include:: expose_examples/complex_parent.py :code: python -.. include:: expose_examples/run_complex.py - :code: python +First of all, we want to expose the ``a`` input and the ``e`` output at the top-level. For this, we again use :meth:`.expose_inputs` and :meth:`.expose_outputs`, but with the optional keyword ``include``. This specifies a list of keys, and only inputs or outputs which are in that list will be exposed. So by passing ``include=['a']`` to :meth:`.expose_inputs`, only the input ``a`` is exposed. + +Additionally, we want to expose the inputs ``b`` and ``c`` (outputs ``d`` and ``f``), but in a namespace specific for each of the two children. For this purpose, we pass the ``namespace`` parameter to the expose functions. However, since we now shouldn't expose ``a`` (``e``) again, we use the ``exclude`` keyword, which specifies a list of keys that will not be exposed. + +When calling the children, we again use the :meth:`.Process.exposed_inputs` method to forward the exposed inputs. Since the inputs ``b`` and ``c`` are now in a specific namespace, we need to pass this namespace as an additional parameter. By default, :meth:`.exposed_inputs` will search through all the parent namespaces of the given namespace to search for input, as shown in the call for ``child_1``. If the same input key exists in multiple namespaces, the input in the lowest namespace takes precedence. It's also possible to disable this behavior, and instead search only in the explicit namespace that was passed. This is done by setting ``agglomerate=False``, as shown in the call to ``child_2``. Of course, we then need to explicitly pass the input ``a``. + +Finally, we use :meth:`.exposed_outputs` and :meth:`.out_many` to forward the outputs of the children to the outputs of the parent. Again, the ``namespace`` and ``agglomerate`` options can be used to select which outputs are returned by the :meth:`.exposed_outputs` method. From 72de7673c3eca555ca35beabcf98f47eadc5f144 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 12:08:47 +0100 Subject: [PATCH 10/13] Remove unused defaultdict import --- aiida/work/processes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aiida/work/processes.py b/aiida/work/processes.py index f1b04a1153..4c4d4468bf 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -16,7 +16,6 @@ import plumpy import uuid import traceback -from collections import defaultdict from plumpy import ProcessState from aiida.common import exceptions From 9c549364b2d0c81a390a671056a0b07243258998 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 13:08:35 +0100 Subject: [PATCH 11/13] Add a test with nested exposing --- aiida/backends/tests/work/work_chain.py | 44 +++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index 4b76dc2635..6b928fe8a7 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -791,6 +791,32 @@ def step_two(self): y = Int(2) work.launch.run(Wf, subspace={'one': Int(1), 'two': Int(2)}) +class GrandParentExposeWorkChain(work.WorkChain): + @classmethod + def define(cls, spec): + super(GrandParentExposeWorkChain, cls).define(spec) + + spec.expose_inputs(ParentExposeWorkChain, namespace='sub') + spec.expose_outputs(ParentExposeWorkChain, namespace='sub') + + spec.outline(cls.do_run, cls.finalize) + + def do_run(self): + # raise ValueError(self.exposed_inputs(ParentExposeWorkChain, namespace='sub')) + return ToContext(child=self.submit( + ParentExposeWorkChain, + **self.exposed_inputs(ParentExposeWorkChain, namespace='sub') + )) + + def finalize(self): + self.out_many( + self.exposed_outputs( + self.ctx.child, + ParentExposeWorkChain, + namespace='sub' + ) + ) + class ParentExposeWorkChain(work.WorkChain): @classmethod def define(cls, spec): @@ -917,3 +943,21 @@ def test_expose(self): 'sub_2.b': Float(1.2), 'sub_2.sub_3.c': Bool(False) } ) + + def test_nested_expose(self): + res = work.launch.run( + GrandParentExposeWorkChain, + sub=dict( + a=Int(1), + sub_1={'b': Float(2.3), 'c': Bool(True)}, + sub_2={'b': Float(1.2), 'sub_3': {'c': Bool(False)}}, + ) + ) + self.assertEquals( + res, + { + 'sub.a': Float(2.2), + 'sub.sub_1.b': Float(2.3), 'sub.sub_1.c': Bool(True), + 'sub.sub_2.b': Float(1.2), 'sub.sub_2.sub_3.c': Bool(False) + } + ) From f794aabea30bba7402619dd8cb5be6d91cb93acd Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Wed, 21 Feb 2018 14:57:33 +0100 Subject: [PATCH 12/13] Fix exposed_outputs implementation to get all outputs from an exposed namespace --- aiida/backends/tests/work/work_chain.py | 23 +++++++++-------- aiida/work/processes.py | 34 +++++++++++++++++-------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/aiida/backends/tests/work/work_chain.py b/aiida/backends/tests/work/work_chain.py index 6b928fe8a7..c3c5c2cbf5 100644 --- a/aiida/backends/tests/work/work_chain.py +++ b/aiida/backends/tests/work/work_chain.py @@ -796,16 +796,15 @@ class GrandParentExposeWorkChain(work.WorkChain): def define(cls, spec): super(GrandParentExposeWorkChain, cls).define(spec) - spec.expose_inputs(ParentExposeWorkChain, namespace='sub') - spec.expose_outputs(ParentExposeWorkChain, namespace='sub') + spec.expose_inputs(ParentExposeWorkChain, namespace='sub.sub') + spec.expose_outputs(ParentExposeWorkChain, namespace='sub.sub') spec.outline(cls.do_run, cls.finalize) def do_run(self): - # raise ValueError(self.exposed_inputs(ParentExposeWorkChain, namespace='sub')) return ToContext(child=self.submit( ParentExposeWorkChain, - **self.exposed_inputs(ParentExposeWorkChain, namespace='sub') + **self.exposed_inputs(ParentExposeWorkChain, namespace='sub.sub') )) def finalize(self): @@ -813,7 +812,7 @@ def finalize(self): self.exposed_outputs( self.ctx.child, ParentExposeWorkChain, - namespace='sub' + namespace='sub.sub' ) ) @@ -948,16 +947,18 @@ def test_nested_expose(self): res = work.launch.run( GrandParentExposeWorkChain, sub=dict( - a=Int(1), - sub_1={'b': Float(2.3), 'c': Bool(True)}, - sub_2={'b': Float(1.2), 'sub_3': {'c': Bool(False)}}, + sub=dict( + a=Int(1), + sub_1={'b': Float(2.3), 'c': Bool(True)}, + sub_2={'b': Float(1.2), 'sub_3': {'c': Bool(False)}}, + ) ) ) self.assertEquals( res, { - 'sub.a': Float(2.2), - 'sub.sub_1.b': Float(2.3), 'sub.sub_1.c': Bool(True), - 'sub.sub_2.b': Float(1.2), 'sub.sub_2.sub_3.c': Bool(False) + 'sub.sub.a': Float(2.2), + 'sub.sub.sub_1.b': Float(2.3), 'sub.sub.sub_1.c': Bool(True), + 'sub.sub.sub_2.b': Float(1.2), 'sub.sub.sub_2.sub_3.c': Bool(False) } ) diff --git a/aiida/work/processes.py b/aiida/work/processes.py index 4c4d4468bf..77d19b8c41 100644 --- a/aiida/work/processes.py +++ b/aiida/work/processes.py @@ -458,20 +458,34 @@ def exposed_outputs(self, process_instance, process_class, namespace=None, agglo :param agglomerate: If set to true, all parent namespaces of the given ``namespace`` will also be searched for outputs. Outputs in lower-lying namespaces take precedence. :type agglomerate: bool """ + namespace_separator = self.spec().namespace_separator + output_key_map = {} - process_outputs_dict = process_instance.get_outputs_dict() + # maps the exposed name to all outputs that belong to it + top_namespace_map = collections.defaultdict(list) + process_outputs_dict = { + k: v for k, v in process_instance.get_outputs(also_labels=True, link_type=LinkType.RETURN) + } + + for port_name in process_outputs_dict: + top_namespace = port_name.split(namespace_separator)[0] + top_namespace_map[top_namespace].append(port_name) + for ns in self._get_namespace_list(namespace=namespace, agglomerate=agglomerate): - for key in process_outputs_dict: - if key in self.spec()._exposed_outputs[ns][process_class]: - output_key_map[key] = ns + # only the top-level key is stored in _exposed_outputs + for top_name in top_namespace_map: + if top_name in self.spec()._exposed_outputs[ns][process_class]: + output_key_map[top_name] = ns result = {} - namespace_separator = self.spec().namespace_separator - for key, ns in output_key_map.items(): - if ns is None: - result[key] = process_outputs_dict[key] - else: - result[ns + namespace_separator + key] = process_outputs_dict[key] + + for top_name, ns in output_key_map.items(): + # collect all outputs belonging to the given top_name + for port_name in top_namespace_map[top_name]: + if ns is None: + result[port_name] = process_outputs_dict[port_name] + else: + result[ns + namespace_separator + port_name] = process_outputs_dict[port_name] return result From ebd3f0d679b1cf3e46fae3e0656000c8642db937 Mon Sep 17 00:00:00 2001 From: Dominik Gresch Date: Fri, 23 Feb 2018 13:39:39 +0100 Subject: [PATCH 13/13] Update plumpy requirement --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index d834946b64..006f85d987 100644 --- a/requirements.txt +++ b/requirements.txt @@ -144,5 +144,5 @@ wcwidth==0.1.7 Werkzeug==0.14.1 wrapt==1.10.11 yapf==0.19.0 --e git://github.com/muhrin/plumpy.git@c9b8a79bd3b0fef4c27a5793e9f3393525c0762b#egg=plumpy +-e git://github.com/muhrin/plumpy.git@6be7b2d9b6d2d6704bc605c97432512ca38f181f#egg=plumpy -e git://github.com/muhrin/kiwipy.git@249fe038f109424b6cdd007092d6f3535346aa7f#egg=kiwipy