From c05c90fadcbb9f4a2e0f5a76d86b53b9383d93c9 Mon Sep 17 00:00:00 2001 From: Daniel Yoo Date: Wed, 28 Oct 2020 18:09:57 -0700 Subject: [PATCH 1/8] Add support for Amazon States Language "ResultSelector" in Task, Map and Parallel States. --- src/stepfunctions/inputs/__init__.py | 8 +- src/stepfunctions/inputs/placeholders.py | 45 +++++++--- src/stepfunctions/steps/compute.py | 4 + src/stepfunctions/steps/fields.py | 1 + src/stepfunctions/steps/service.py | 13 +++ src/stepfunctions/steps/states.py | 22 ++++- tests/unit/test_placeholders.py | 10 +-- tests/unit/test_placeholders_with_steps.py | 100 ++++++++++++++++++--- 8 files changed, 167 insertions(+), 36 deletions(-) diff --git a/src/stepfunctions/inputs/__init__.py b/src/stepfunctions/inputs/__init__.py index ffa01b0..81519d8 100644 --- a/src/stepfunctions/inputs/__init__.py +++ b/src/stepfunctions/inputs/__init__.py @@ -6,10 +6,10 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import -from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput +from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput, StepResult diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 3b7f2b6..45442f7 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -6,9 +6,9 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import @@ -51,11 +51,11 @@ def __init__(self, schema=None, **kwargs): self._set_schema(schema) self._make_immutable() self.json_str_template = "{}" - + self.name = kwargs.get("name") self.type = kwargs.get("type") self.parent = kwargs.get("parent") - + def get(self, name, type): """ @@ -64,11 +64,11 @@ def get(self, name, type): Args: name (str): Name of the placeholder variable. type (type): Type of the placeholder variable. - + Raises: ValueError: If placeholder variable with the same name but different type already exists. ValueError: If placeholder variable does not fit into a previously specified schema for the placeholder collection. - + Returns: Placeholder: Placeholder variable. """ @@ -240,7 +240,7 @@ def _join_path(self, path): def to_jsonpath(self): """ Returns a JSON path representation of the placeholder variable to be used for step parameters. - + Returns: str: JSON path representation of the placeholder variable """ @@ -252,7 +252,7 @@ class ExecutionInput(Placeholder): """ Top-level class for execution input placeholders. """ - + def __init__(self, schema=None, **kwargs): super(ExecutionInput, self).__init__(schema, **kwargs) self.json_str_template = '$$.Execution.Input{}' @@ -268,7 +268,7 @@ def _create_variable(self, name, parent, type=None): return ExecutionInput(name=name, parent=parent, type=type) else: return ExecutionInput(name=name, parent=parent) - + class StepInput(Placeholder): @@ -279,7 +279,7 @@ class StepInput(Placeholder): def __init__(self, schema=None, **kwargs): super(StepInput, self).__init__(schema, **kwargs) self.json_str_template = '${}' - + def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Input. @@ -291,3 +291,26 @@ def _create_variable(self, name, parent, type=None): return StepInput(name=name, parent=parent, type=type) else: return StepInput(name=name, parent=parent) + + +class StepResult(Placeholder): + + """ + Top-level class for step result placeholders. + """ + + def __init__(self, schema=None, **kwargs): + super(StepResult, self).__init__(schema, **kwargs) + self.json_str_template = '${}' + + def _create_variable(self, name, parent, type=None): + """ + Creates a placeholder variable for Step Result. + A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + """ + if self.immutable: + raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + if type: + return StepResult(name=name, parent=parent, type=type) + else: + return StepResult(name=name, parent=parent) diff --git a/src/stepfunctions/steps/compute.py b/src/stepfunctions/steps/compute.py index 203ed47..0f827ac 100644 --- a/src/stepfunctions/steps/compute.py +++ b/src/stepfunctions/steps/compute.py @@ -57,6 +57,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -98,6 +99,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -138,6 +140,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -178,6 +181,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/src/stepfunctions/steps/fields.py b/src/stepfunctions/steps/fields.py index 24c3949..9e17e8e 100644 --- a/src/stepfunctions/steps/fields.py +++ b/src/stepfunctions/steps/fields.py @@ -22,6 +22,7 @@ class Field(Enum): InputPath = 'input_path' OutputPath = 'output_path' Parameters = 'parameters' + ResultSelector = 'result_selector' ResultPath = 'result_path' Next = 'next' Retry = 'retry' diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 5c32a88..3df230a 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -70,6 +70,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -140,6 +141,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -170,6 +172,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -200,6 +203,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -231,6 +235,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -271,6 +276,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -309,6 +315,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -348,6 +355,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -387,6 +395,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -426,6 +435,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -456,6 +466,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -486,6 +497,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -516,6 +528,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index 9669a73..bec265b 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -17,7 +17,7 @@ from stepfunctions.exceptions import DuplicateStatesInChain from stepfunctions.steps.fields import Field -from stepfunctions.inputs import Placeholder, StepInput +from stepfunctions.inputs import Placeholder, StepInput, StepResult logger = logging.getLogger('stepfunctions.states') @@ -71,7 +71,7 @@ def to_dict(self): for k, v in self.fields.items(): if v is not None or k in fields_accepted_as_none: k = to_pascalcase(k) - if k == to_pascalcase(Field.Parameters.value): + if k == to_pascalcase(Field.Parameters.value) or k == to_pascalcase(Field.ResultSelector.value): result[k] = self._replace_placeholders(v) else: result[k] = v @@ -171,6 +171,7 @@ def __init__(self, state_id, state_type, output_schema=None, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -195,6 +196,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath ] @@ -208,6 +210,16 @@ def update_parameters(self, params): if Field.Parameters in self.allowed_fields(): self.fields[Field.Parameters.value] = params + def update_result_selector(self, result_selector): + """ + Update `result_selector` field in the state, if supported. + + Args: + params (dict or list): The value of this field becomes the effective result of the state. + """ + if Field.ResultSelector in self.allowed_fields(): + self.fields[Field.ResultSelector.value] = result_selector + def next(self, next_step): """ Specify the next state or chain to transition to. @@ -494,6 +506,7 @@ def __init__(self, state_id, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -506,6 +519,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.Retry, Field.Catch @@ -546,6 +560,7 @@ def __init__(self, state_id, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -566,6 +581,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.Retry, Field.Catch, @@ -598,6 +614,7 @@ def __init__(self, state_id, **kwargs): comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ @@ -614,6 +631,7 @@ def allowed_fields(self): Field.InputPath, Field.OutputPath, Field.Parameters, + Field.ResultSelector, Field.ResultPath, Field.TimeoutSeconds, Field.TimeoutSecondsPath, diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 456a7bf..4c0ee64 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -6,16 +6,16 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import import pytest import json -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, StepInput, StepResult def test_placeholder_creation_with_subscript_operator(): step_input = StepInput() @@ -154,7 +154,7 @@ def test_placeholder_with_schema(): with pytest.raises(ValueError): workflow_input["A"]["B"]["D"] - + with pytest.raises(ValueError): workflow_input["A"]["B"].get("C", float) diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 54d1543..d1d92fd 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -6,22 +6,22 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import absolute_import import pytest from stepfunctions.steps import Pass, Succeed, Fail, Wait, Choice, ChoiceRule, Parallel, Map, Task, Retry, Catch, Chain, Graph -from stepfunctions.inputs import ExecutionInput, StepInput +from stepfunctions.inputs import ExecutionInput, StepInput, StepResult def test_workflow_input_placeholder(): workflow_input = ExecutionInput() test_step = Pass( - state_id='StateOne', + state_id='StateOne', parameters={ 'ParamA': 'SampleValueA', 'ParamB': workflow_input, @@ -72,7 +72,7 @@ def test_step_input_placeholder(): assert test_step_02.to_dict() == expected_repr - + def test_workflow_with_placeholders(): workflow_input = ExecutionInput() @@ -139,7 +139,7 @@ def test_workflow_with_placeholders(): } assert result == expected_workflow_repr - + def test_step_input_order_validation(): workflow_input = ExecutionInput() @@ -176,8 +176,15 @@ def test_step_input_order_validation(): def test_map_state_with_placeholders(): workflow_input = ExecutionInput() + step_result = StepResult() - map_state = Map('MapState01') + map_state = Map( + state_id='MapState01', + result_selector={ + 'foo': step_result['foo'], + 'bar': step_result['bar1']['bar2'] + } + ) iterator_state = Pass( 'TrainIterator', parameters={ @@ -193,6 +200,10 @@ def test_map_state_with_placeholders(): "States": { "MapState01": { "Type": "Map", + "ResultSelector": { + "foo.$": "$['foo']", + "bar.$": "$['bar1']['bar2']" + }, "End": True, "Iterator": { "StartAt": "TrainIterator", @@ -216,9 +227,16 @@ def test_map_state_with_placeholders(): def test_parallel_state_with_placeholders(): workflow_input = ExecutionInput() + step_result = StepResult() + + parallel_state = Parallel( + state_id='ParallelState01', + result_selector={ + 'foo': step_result['foo'], + 'bar': step_result['bar1']['bar2'] + } + ) - parallel_state = Parallel('ParallelState01') - branch_A = Pass( 'Branch_A', parameters={ @@ -252,6 +270,10 @@ def test_parallel_state_with_placeholders(): "States": { "ParallelState01": { "Type": "Parallel", + "ResultSelector": { + "foo.$": "$['foo']", + "bar.$": "$['bar1']['bar2']" + }, "End": True, "Branches": [ { @@ -308,11 +330,11 @@ def test_choice_state_with_placeholders(): choice_state = Choice('Is Completed?') choice_state.add_choice( - ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True), + ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True), Succeed('Complete') ) choice_state.add_choice( - ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False), + ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False), retry ) @@ -360,7 +382,7 @@ def test_choice_state_with_placeholders(): assert result == expected_repr def test_schema_validation_for_step_input(): - + test_step_01 = Pass( state_id='StateOne', output_schema={ @@ -378,7 +400,7 @@ def test_schema_validation_for_step_input(): "ParamB": "SampleValueB" } ) - + with pytest.raises(ValueError): test_step_03 = Pass( state_id='StateTwo', @@ -387,3 +409,53 @@ def test_schema_validation_for_step_input(): "ParamB": "SampleValueB" } ) + +def test_step_result_placeholder(): + step_result = StepResult() + + test_step_01 = Task( + state_id='StateOne', + result_selector={ + 'ParamA': step_result["foo"], + "ParamC": "SampleValueC" + } + ) + + expected_repr = { + "Type": "Task", + "ResultSelector": { + "ParamA.$": "$['foo']", + "ParamC": "SampleValueC" + }, + "End": True + } + + assert test_step_01.to_dict() == expected_repr + +def test_schema_validation_for_step_result(): + + step_result = StepResult( + schema={ + "Payload": { + "Key01": str + } + } + ) + + with pytest.raises(ValueError): + test_step_01 = Task( + state_id='StateOne', + result_selector={ + 'ParamA': step_result["Payload"]["Key02"], + "ParamB": "SampleValueB" + } + ) + + with pytest.raises(ValueError): + test_step_02 = Task( + state_id='StateOne', + parameters={ + 'ParamA': step_result["Payload"].get("Key01", float), + "ParamB": "SampleValueB" + } + ) From 8667c37ea68bdbdbff07f20aaf48a4be55dc8ec1 Mon Sep 17 00:00:00 2001 From: Daniel Yoo Date: Thu, 29 Jul 2021 15:31:55 -0400 Subject: [PATCH 2/8] Update Placeholder doc and add more tests for step field 'result_selector' --- doc/placeholders.rst | 30 ++++- src/stepfunctions/steps/states.py | 10 -- tests/unit/test_placeholders.py | 127 +++++++++++---------- tests/unit/test_placeholders_with_steps.py | 19 +++ tests/unit/test_steps.py | 4 + 5 files changed, 113 insertions(+), 77 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index fa87b1d..0e12ac9 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -3,9 +3,9 @@ Placeholders Once defined, a workflow is static unless you update it explicitly. But, you can pass input to workflow executions. You can have dynamic values -that you use in the **parameters** fields of the steps in your workflow. For this, +that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. There are 2 mechanisms for passing dynamic values in a workflow. +create your workflow. There are 3 mechanisms for passing dynamic values in a workflow. The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` @@ -64,10 +64,10 @@ that returns the placeholder output for that step. parameters={ "FunctionName": "MakeApiCall", "Payload": { - "input": "20192312" - } + "input": "20192312" } - ) + } + ) lambda_state_second = LambdaStep( state_id="MySecondLambdaStep", @@ -81,7 +81,24 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) +The third mechanism is a placeholder for a step's result. The result of a step can be modified +with the **result_selector** field to replace the step's result. +.. code-block:: python + + lambda_result = StepResult( + schema={ + "Id": str, + } + ) + + lambda_state_first = LambdaStep( + state_id="MyFirstLambdaStep", + result_selector={ + "Output": lambda_result["Id"], + "Status": "Success" + } + ) .. autoclass:: stepfunctions.inputs.Placeholder @@ -90,3 +107,6 @@ that returns the placeholder output for that step. .. autoclass:: stepfunctions.inputs.StepInput :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepResult + :inherited-members: diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index bec265b..2f44452 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -210,16 +210,6 @@ def update_parameters(self, params): if Field.Parameters in self.allowed_fields(): self.fields[Field.Parameters.value] = params - def update_result_selector(self, result_selector): - """ - Update `result_selector` field in the state, if supported. - - Args: - params (dict or list): The value of this field becomes the effective result of the state. - """ - if Field.ResultSelector in self.allowed_fields(): - self.fields[Field.ResultSelector.value] = result_selector - def next(self, next_step): """ Specify the next state or chain to transition to. diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index 4c0ee64..e544384 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -17,64 +17,64 @@ from stepfunctions.inputs import ExecutionInput, StepInput, StepResult -def test_placeholder_creation_with_subscript_operator(): - step_input = StepInput() - placeholder_variable = step_input["A"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_subscript_operator(placeholder): + placeholder_variable = placeholder["A"] assert placeholder_variable.name == "A" assert placeholder_variable.type is None -def test_placeholder_creation_with_type(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"].get("C", float) +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_type(placeholder): + placeholder_variable = placeholder["A"]["b"].get("C", float) assert placeholder_variable.name == "C" assert placeholder_variable.type == float -def test_placeholder_creation_with_int_key(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"][0] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_int_key(placeholder): + placeholder_variable = placeholder["A"][0] assert placeholder_variable.name == 0 assert placeholder_variable.type == None -def test_placeholder_creation_with_invalid_key(): - step_input = StepInput() +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_with_invalid_key(placeholder): with pytest.raises(ValueError): - step_input["A"][1.3] + placeholder["A"][1.3] with pytest.raises(ValueError): - step_input["A"].get(1.2, str) + placeholder["A"].get(1.2, str) -def test_placeholder_creation_failure_with_type(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"].get("C", float) +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_creation_failure_with_type(placeholder): + placeholder_variable = placeholder["A"]["b"].get("C", float) with pytest.raises(ValueError): - workflow_input["A"]["b"].get("C", int) + placeholder["A"]["b"].get("C", int) -def test_placeholder_path(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["b"]["C"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_path(placeholder): + placeholder_variable = placeholder["A"]["b"]["C"] expected_path = ["A", "b", "C"] assert placeholder_variable._get_path() == expected_path -def test_placeholder_contains(): - step_input = StepInput() - var_one = step_input["Key01"] - var_two = step_input["Key02"]["Key03"] - var_three = step_input["Key01"]["Key04"] - var_four = step_input["Key05"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_contains(placeholder): + var_one = placeholder["Key01"] + var_two = placeholder["Key02"]["Key03"] + var_three = placeholder["Key01"]["Key04"] + var_four = placeholder["Key05"] - step_input_two = StepInput() - var_five = step_input_two["Key07"] + placeholder_two = StepInput() + var_five = placeholder_two["Key07"] - assert step_input.contains(var_three) == True - assert step_input.contains(var_five) == False - assert step_input_two.contains(var_three) == False + assert placeholder.contains(var_three) == True + assert placeholder.contains(var_five) == False + assert placeholder_two.contains(var_three) == False -def test_placeholder_schema_as_dict(): - workflow_input = ExecutionInput() - workflow_input["A"]["b"].get("C", float) - workflow_input["Message"] - workflow_input["Key01"]["Key02"] - workflow_input["Key03"] - workflow_input["Key03"]["Key04"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_schema_as_dict(placeholder): + placeholder["A"]["b"].get("C", float) + placeholder["Message"] + placeholder["Key01"]["Key02"] + placeholder["Key03"] + placeholder["Key03"]["Key04"] expected_schema = { "A": { @@ -91,14 +91,14 @@ def test_placeholder_schema_as_dict(): } } - assert workflow_input.get_schema_as_dict() == expected_schema + assert placeholder.get_schema_as_dict() == expected_schema -def test_placeholder_schema_as_json(): - step_input = StepInput() - step_input["Response"].get("StatusCode", int) - step_input["Hello"]["World"] - step_input["A"] - step_input["Hello"]["World"].get("Test", str) +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_schema_as_json(placeholder): + placeholder["Response"].get("StatusCode", int) + placeholder["Hello"]["World"] + placeholder["A"] + placeholder["Hello"]["World"].get("Test", str) expected_schema = { "Response": { @@ -112,29 +112,27 @@ def test_placeholder_schema_as_json(): "A": "str" } - assert step_input.get_schema_as_json() == json.dumps(expected_schema) + assert placeholder.get_schema_as_json() == json.dumps(expected_schema) -def test_placeholder_is_empty(): - workflow_input = ExecutionInput() - placeholder_variable = workflow_input["A"]["B"]["C"] +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_is_empty(placeholder): + placeholder_variable = placeholder["A"]["B"]["C"] assert placeholder_variable._is_empty() == True - workflow_input["A"]["B"]["C"]["D"] + placeholder["A"]["B"]["C"]["D"] assert placeholder_variable._is_empty() == False +@pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) +def test_placeholder_make_immutable(placeholder): + placeholder["A"]["b"].get("C", float) + placeholder["Message"] + placeholder["Key01"]["Key02"] + placeholder["Key03"] + placeholder["Key03"]["Key04"] -def test_placeholder_make_immutable(): - workflow_input = ExecutionInput() - workflow_input["A"]["b"].get("C", float) - workflow_input["Message"] - workflow_input["Key01"]["Key02"] - workflow_input["Key03"] - workflow_input["Key03"]["Key04"] - - assert check_immutable(workflow_input) == False - - workflow_input._make_immutable() - assert check_immutable(workflow_input) == True + assert check_immutable(placeholder) == False + placeholder._make_immutable() + assert check_immutable(placeholder) == True def test_placeholder_with_schema(): test_schema = { @@ -168,6 +166,11 @@ def test_step_input_jsonpath(): placeholder_variable = step_input["A"]["b"].get(0, float) assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" +def test_step_result_jsonpath(): + step_result = StepResult() + placeholder_variable = step_result["A"]["b"].get(0, float) + assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" + # UTILS def check_immutable(placeholder): diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index d1d92fd..4a6fd1e 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -459,3 +459,22 @@ def test_schema_validation_for_step_result(): "ParamB": "SampleValueB" } ) + +def test_schema_validation_success_for_step_result(): + step_result = StepResult( + schema={ + "Payload": { + "Key01": str + } + } + ) + + try: + test_step = Task( + state_id='StateOne', + result_selector={ + 'ParamA': step_result["Payload"]["Key01"] + } + ) + except: + pytest.fail("Step should fetch step result key successfully without raising an Exception") diff --git a/tests/unit/test_steps.py b/tests/unit/test_steps.py index 5c86279..8211155 100644 --- a/tests/unit/test_steps.py +++ b/tests/unit/test_steps.py @@ -32,6 +32,7 @@ def test_state_creation(): input_path='$.Input', output_path='$.Output', parameters={'Key': 'Value'}, + result_selector={'foo': 'bar'}, result_path='$.Result' ) @@ -43,6 +44,9 @@ def test_state_creation(): 'Parameters': { 'Key': 'Value' }, + 'ResultSelector': { + 'foo': 'bar' + }, 'ResultPath': '$.Result', 'End': True } From 851fc0e56858626c58d4465085e92f5d66aaca00 Mon Sep 17 00:00:00 2001 From: Daniel Yoo Date: Wed, 11 Aug 2021 13:38:59 -0400 Subject: [PATCH 3/8] Add result selector support for EventBridgePutEventsStep --- src/stepfunctions/steps/service.py | 1 + tests/unit/test_placeholders_with_steps.py | 118 ++++++++++----------- 2 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 3df230a..15283d6 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -101,6 +101,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index 4a6fd1e..d695b22 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -21,13 +21,13 @@ def test_workflow_input_placeholder(): workflow_input = ExecutionInput() test_step = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': 'SampleValueA', - 'ParamB': workflow_input, - 'ParamC': workflow_input['Key01'], - 'ParamD': workflow_input['Key02']['Key03'], - 'ParamE': workflow_input['Key01']['Key03'], + "ParamA": "SampleValueA", + "ParamB": workflow_input, + "ParamC": workflow_input["Key01"], + "ParamD": workflow_input["Key02"]["Key03"], + "ParamE": workflow_input["Key01"]["Key03"], } ) @@ -48,14 +48,14 @@ def test_workflow_input_placeholder(): def test_step_input_placeholder(): test_step_01 = Pass( - state_id='StateOne' + state_id="StateOne" ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output(), - 'ParamB': test_step_01.output()["Response"]["Key02"], + "ParamA": test_step_01.output(), + "ParamB": test_step_01.output()["Response"]["Key02"], "ParamC": "SampleValueC" } ) @@ -77,26 +77,26 @@ def test_workflow_with_placeholders(): workflow_input = ExecutionInput() test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': workflow_input['Key02']['Key03'], - 'ParamD': workflow_input['Key01']['Key03'], + "ParamA": workflow_input["Key02"]["Key03"], + "ParamD": workflow_input["Key01"]["Key03"], } ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamC': workflow_input["Key05"], + "ParamC": workflow_input["Key05"], "ParamB": "SampleValueB", "ParamE": test_step_01.output()["Response"]["Key04"] } ) test_step_03 = Pass( - state_id='StateThree', + state_id="StateThree", parameters={ - 'ParamG': "SampleValueG", + "ParamG": "SampleValueG", "ParamF": workflow_input["Key06"], "ParamH": "SampleValueH" } @@ -144,26 +144,26 @@ def test_step_input_order_validation(): workflow_input = ExecutionInput() test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': workflow_input['Key02']['Key03'], - 'ParamD': workflow_input['Key01']['Key03'], + "ParamA": workflow_input["Key02"]["Key03"], + "ParamD": workflow_input["Key01"]["Key03"], } ) test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamC': workflow_input["Key05"], + "ParamC": workflow_input["Key05"], "ParamB": "SampleValueB", "ParamE": test_step_01.output()["Response"]["Key04"] } ) test_step_03 = Pass( - state_id='StateThree', + state_id="StateThree", parameters={ - 'ParamG': "SampleValueG", + "ParamG": "SampleValueG", "ParamF": workflow_input["Key06"], "ParamH": "SampleValueH" } @@ -179,17 +179,17 @@ def test_map_state_with_placeholders(): step_result = StepResult() map_state = Map( - state_id='MapState01', + state_id="MapState01", result_selector={ - 'foo': step_result['foo'], - 'bar': step_result['bar1']['bar2'] + "foo": step_result["foo"], + "bar": step_result["bar1"]["bar2"] } ) iterator_state = Pass( - 'TrainIterator', + "TrainIterator", parameters={ - 'ParamA': map_state.output()['X']["Y"], - 'ParamB': workflow_input["Key01"]["Key02"]["Key03"] + "ParamA": map_state.output()["X"]["Y"], + "ParamB": workflow_input["Key01"]["Key02"]["Key03"] }) map_state.attach_iterator(iterator_state) @@ -230,32 +230,32 @@ def test_parallel_state_with_placeholders(): step_result = StepResult() parallel_state = Parallel( - state_id='ParallelState01', + state_id="ParallelState01", result_selector={ - 'foo': step_result['foo'], - 'bar': step_result['bar1']['bar2'] + "foo": step_result["foo"], + "bar": step_result["bar1"]["bar2"] } ) branch_A = Pass( - 'Branch_A', + "Branch_A", parameters={ - 'ParamA': parallel_state.output()['A']["B"], - 'ParamB': workflow_input["Key01"] + "ParamA": parallel_state.output()["A"]["B"], + "ParamB": workflow_input["Key01"] }) branch_B = Pass( - 'Branch_B', + "Branch_B", parameters={ - 'ParamA': "TestValue", - 'ParamB': parallel_state.output()["Response"]["Key"]["State"] + "ParamA": "TestValue", + "ParamB": parallel_state.output()["Response"]["Key"]["State"] }) branch_C = Pass( - 'Branch_C', + "Branch_C", parameters={ - 'ParamA': parallel_state.output()['A']["B"].get("C", float), - 'ParamB': "HelloWorld" + "ParamA": parallel_state.output()["A"]["B"].get("C", float), + "ParamB": "HelloWorld" }) parallel_state.add_branch(branch_A) @@ -325,13 +325,13 @@ def test_parallel_state_with_placeholders(): def test_choice_state_with_placeholders(): - first_state = Task('FirstState', resource='arn:aws:lambda:us-east-1:1234567890:function:FirstState') - retry = Chain([Pass('Retry'), Pass('Cleanup'), first_state]) + first_state = Task("FirstState", resource="arn:aws:lambda:us-east-1:1234567890:function:FirstState") + retry = Chain([Pass("Retry"), Pass("Cleanup"), first_state]) - choice_state = Choice('Is Completed?') + choice_state = Choice("Is Completed?") choice_state.add_choice( ChoiceRule.BooleanEquals(choice_state.output()["Completed"], True), - Succeed('Complete') + Succeed("Complete") ) choice_state.add_choice( ChoiceRule.BooleanEquals(choice_state.output()["Completed"], False), @@ -384,7 +384,7 @@ def test_choice_state_with_placeholders(): def test_schema_validation_for_step_input(): test_step_01 = Pass( - state_id='StateOne', + state_id="StateOne", output_schema={ "Response": { "Key01": str @@ -394,18 +394,18 @@ def test_schema_validation_for_step_input(): with pytest.raises(ValueError): test_step_02 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output()["Response"]["Key02"], + "ParamA": test_step_01.output()["Response"]["Key02"], "ParamB": "SampleValueB" } ) with pytest.raises(ValueError): test_step_03 = Pass( - state_id='StateTwo', + state_id="StateTwo", parameters={ - 'ParamA': test_step_01.output()["Response"].get("Key01", float), + "ParamA": test_step_01.output()["Response"].get("Key01", float), "ParamB": "SampleValueB" } ) @@ -414,9 +414,9 @@ def test_step_result_placeholder(): step_result = StepResult() test_step_01 = Task( - state_id='StateOne', + state_id="StateOne", result_selector={ - 'ParamA': step_result["foo"], + "ParamA": step_result["foo"], "ParamC": "SampleValueC" } ) @@ -444,18 +444,18 @@ def test_schema_validation_for_step_result(): with pytest.raises(ValueError): test_step_01 = Task( - state_id='StateOne', + state_id="StateOne", result_selector={ - 'ParamA': step_result["Payload"]["Key02"], + "ParamA": step_result["Payload"]["Key02"], "ParamB": "SampleValueB" } ) with pytest.raises(ValueError): test_step_02 = Task( - state_id='StateOne', + state_id="StateOne", parameters={ - 'ParamA': step_result["Payload"].get("Key01", float), + "ParamA": step_result["Payload"].get("Key01", float), "ParamB": "SampleValueB" } ) @@ -471,9 +471,9 @@ def test_schema_validation_success_for_step_result(): try: test_step = Task( - state_id='StateOne', + state_id="StateOne", result_selector={ - 'ParamA': step_result["Payload"]["Key01"] + "ParamA": step_result["Payload"]["Key01"] } ) except: From e67804bf55b1b2ef8a5cb0bf560021e647482fbb Mon Sep 17 00:00:00 2001 From: Daniel Yoo Date: Wed, 11 Aug 2021 14:06:54 -0400 Subject: [PATCH 4/8] Add result selector support for GlueDataBrewStartJobRunStep --- src/stepfunctions/steps/service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index ed6fe3f..2c08d20 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -240,6 +240,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) From 9df0b8d0c1ab882cb567ddc317cea0278e75e67d Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 8 Sep 2021 15:46:36 -0700 Subject: [PATCH 5/8] Added tests and updated placeholders documentation --- doc/placeholders.rst | 32 +++- src/stepfunctions/steps/service.py | 8 + tests/unit/test_compute_steps.py | 19 ++- tests/unit/test_service_steps.py | 254 +++++++++++++++++++---------- 4 files changed, 217 insertions(+), 96 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index 0e12ac9..fb57549 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -1,16 +1,32 @@ Placeholders ============= + Once defined, a workflow is static unless you update it explicitly. But, you can pass input to workflow executions. You can have dynamic values that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. There are 3 mechanisms for passing dynamic values in a workflow. +create your workflow. + +.. autoclass:: stepfunctions.inputs.Placeholder + +There are 3 mechanisms for passing dynamic values in a workflow: + +- `Execution Input <#execution-input>`__ +- `Step Input <#step-input>`__ + +- `Step Result <#step-result>`__ + +Execution Input +--------------- The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` to define the schema for this input, and to access the values in your workflow. +.. autoclass:: stepfunctions.inputs.ExecutionInput + :inherited-members: + .. code-block:: python # Create an instance of ExecutionInput class, and define a schema. Defining @@ -50,6 +66,9 @@ to define the schema for this input, and to access the values in your workflow. workflow.execute(inputs={'myDynamicInput': "WorldHello"}) + +Step Input +---------- The second mechanism is for passing dynamic values from one step to the next step. The output of one step becomes the input of the next step. The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this. @@ -57,6 +76,9 @@ The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this. By default, each step has an output method :py:meth:`stepfunctions.steps.states.State.output` that returns the placeholder output for that step. +.. autoclass:: stepfunctions.inputs.StepInput + :inherited-members: + .. code-block:: python lambda_state_first = LambdaStep( @@ -81,6 +103,9 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) + +Step Result +----------- The third mechanism is a placeholder for a step's result. The result of a step can be modified with the **result_selector** field to replace the step's result. @@ -100,13 +125,8 @@ with the **result_selector** field to replace the step's result. } ) -.. autoclass:: stepfunctions.inputs.Placeholder -.. autoclass:: stepfunctions.inputs.ExecutionInput - :inherited-members: -.. autoclass:: stepfunctions.inputs.StepInput - :inherited-members: .. autoclass:: stepfunctions.inputs.StepResult :inherited-members: diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 96890bb..6e3f050 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -250,6 +250,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -289,6 +290,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -328,6 +330,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -367,6 +370,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -406,6 +410,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -445,6 +450,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -484,6 +490,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) @@ -523,6 +530,7 @@ def __init__(self, state_id, **kwargs): heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') parameters (dict, optional): The value of this field becomes the effective input for the state. + result_selector (dict, optional): The value of this field becomes the effective result of the state. result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ diff --git a/tests/unit/test_compute_steps.py b/tests/unit/test_compute_steps.py index 368010a..69020e1 100644 --- a/tests/unit/test_compute_steps.py +++ b/tests/unit/test_compute_steps.py @@ -16,16 +16,24 @@ import boto3 from unittest.mock import patch + +from stepfunctions.inputs import StepResult from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep +STEP_RESULT = StepResult() +RESULT_SELECTOR = { + "OutputA": STEP_RESULT['A'] +} + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_lambda_step_creation(): - step = LambdaStep('Echo') + step = LambdaStep('Echo', result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::lambda:invoke', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -51,11 +59,12 @@ def test_lambda_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_glue_start_job_run_step_creation(): - step = GlueStartJobRunStep('Glue Job', wait_for_completion=False) + step = GlueStartJobRunStep('Glue Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::glue:startJobRun', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -75,11 +84,12 @@ def test_glue_start_job_run_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_batch_submit_job_step_creation(): - step = BatchSubmitJobStep('Batch Job', wait_for_completion=False) + step = BatchSubmitJobStep('Batch Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::batch:submitJob', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -101,11 +111,12 @@ def test_batch_submit_job_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_ecs_run_task_step_creation(): - step = EcsRunTaskStep('Ecs Job', wait_for_completion=False) + step = EcsRunTaskStep('Ecs Job', wait_for_completion=False, result_selector=RESULT_SELECTOR) assert step.to_dict() == { 'Type': 'Task', 'Resource': 'arn:aws:states:::ecs:runTask', + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 25861ec..fd9d1ec 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -15,6 +15,8 @@ import boto3 from unittest.mock import patch + +from stepfunctions.inputs import StepResult from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep from stepfunctions.steps.service import ( EksCallStep, @@ -32,9 +34,15 @@ from stepfunctions.steps.service import GlueDataBrewStartJobRunStep +STEP_RESULT = StepResult() +RESULT_SELECTOR = { + "OutputA": STEP_RESULT['A'] +} + + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_sns_publish_step_creation(): - step = SnsPublishStep('Publish to SNS', parameters={ + step = SnsPublishStep('Publish to SNS', result_selector=RESULT_SELECTOR, parameters={ 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', 'Message': 'message', }) @@ -46,6 +54,7 @@ def test_sns_publish_step_creation(): 'TopicArn': 'arn:aws:sns:us-east-1:123456789012:myTopic', 'Message': 'message', }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -73,7 +82,7 @@ def test_sns_publish_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_sqs_send_message_step_creation(): - step = SqsSendMessageStep('Send to SQS', parameters={ + step = SqsSendMessageStep('Send to SQS', result_selector=RESULT_SELECTOR, parameters={ 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': 'Hello' }) @@ -85,6 +94,7 @@ def test_sqs_send_message_step_creation(): 'QueueUrl': 'https://sqs.us-east-1.amazonaws.com/123456789012/myQueue', 'MessageBody': 'Hello' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -111,7 +121,7 @@ def test_sqs_send_message_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eventbridge_put_events_step_creation(): - step = EventBridgePutEventsStep('Send to EventBridge', parameters={ + step = EventBridgePutEventsStep('Send to EventBridge', result_selector=RESULT_SELECTOR, parameters={ "Entries": [ { "Detail": { @@ -139,6 +149,7 @@ def test_eventbridge_put_events_step_creation(): } ] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, "End": True } @@ -176,7 +187,7 @@ def test_eventbridge_put_events_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_get_item_step_creation(): - step = DynamoDBGetItemStep('Read Message From DynamoDB', parameters={ + step = DynamoDBGetItemStep('Read Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'MessageId': { @@ -196,13 +207,14 @@ def test_dynamodb_get_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_put_item_step_creation(): - step = DynamoDBPutItemStep('Add Message From DynamoDB', parameters={ + step = DynamoDBPutItemStep('Add Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Item': { 'MessageId': { @@ -222,13 +234,14 @@ def test_dynamodb_put_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_delete_item_step_creation(): - step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', parameters={ + step = DynamoDBDeleteItemStep('Delete Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'MessageId': { @@ -248,13 +261,14 @@ def test_dynamodb_delete_item_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_dynamodb_update_item_step_creation(): - step = DynamoDBUpdateItemStep('Update Message From DynamoDB', parameters={ + step = DynamoDBUpdateItemStep('Update Message From DynamoDB', result_selector=RESULT_SELECTOR, parameters={ 'TableName': 'TransferDataRecords-DDBTable-3I41R5L5EAGT', 'Key': { 'RecordId': { @@ -282,13 +296,14 @@ def test_dynamodb_update_item_step_creation(): ':val1': { 'S': '2' } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_create_cluster_step_creation(): - step = EmrCreateClusterStep('Create EMR cluster', parameters={ + step = EmrCreateClusterStep('Create EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'Name': 'MyWorkflowCluster', 'VisibleToAllUsers': True, 'ReleaseLabel': 'emr-5.28.0', @@ -368,6 +383,7 @@ def test_emr_create_cluster_step_creation(): ] } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -458,7 +474,7 @@ def test_emr_create_cluster_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_terminate_cluster_step_creation(): - step = EmrTerminateClusterStep('Terminate EMR cluster', parameters={ + step = EmrTerminateClusterStep('Terminate EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId' }) @@ -468,6 +484,7 @@ def test_emr_terminate_cluster_step_creation(): 'Parameters': { 'ClusterId': 'MyWorkflowClusterId', }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -487,7 +504,7 @@ def test_emr_terminate_cluster_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_add_step_step_creation(): - step = EmrAddStepStep('Add step to EMR cluster', parameters={ + step = EmrAddStepStep('Add step to EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId', 'Step': { 'Name': 'The first step', @@ -533,6 +550,7 @@ def test_emr_add_step_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @@ -588,7 +606,7 @@ def test_emr_add_step_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_cancel_step_step_creation(): - step = EmrCancelStepStep('Cancel step from EMR cluster', parameters={ + step = EmrCancelStepStep('Cancel step from EMR cluster', result_selector=RESULT_SELECTOR, parameters={ 'ClusterId': 'MyWorkflowClusterId', 'StepId': 'MyWorkflowStepId' }) @@ -600,16 +618,20 @@ def test_emr_cancel_step_step_creation(): 'ClusterId': 'MyWorkflowClusterId', 'StepId': 'MyWorkflowStepId' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_set_cluster_termination_protection_step_creation(): - step = EmrSetClusterTerminationProtectionStep('Set termination protection for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'TerminationProtected': True - }) + step = EmrSetClusterTerminationProtectionStep( + 'Set termination protection for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'TerminationProtected': True + }) assert step.to_dict() == { 'Type': 'Task', @@ -618,20 +640,24 @@ def test_emr_set_cluster_termination_protection_step_creation(): 'ClusterId': 'MyWorkflowClusterId', 'TerminationProtected': True }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_modify_instance_fleet_by_name_step_creation(): - step = EmrModifyInstanceFleetByNameStep('Modify Instance Fleet by name for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'InstanceFleetName': 'MyCoreFleet', - 'InstanceFleet': { - 'TargetOnDemandCapacity': 8, - 'TargetSpotCapacity': 0 - } - }) + step = EmrModifyInstanceFleetByNameStep( + 'Modify Instance Fleet by name for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceFleetName': 'MyCoreFleet', + 'InstanceFleet': { + 'TargetOnDemandCapacity': 8, + 'TargetSpotCapacity': 0 + } + }) assert step.to_dict() == { 'Type': 'Task', @@ -644,19 +670,23 @@ def test_emr_modify_instance_fleet_by_name_step_creation(): 'TargetSpotCapacity': 0 } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_emr_modify_instance_group_by_name_step_creation(): - step = EmrModifyInstanceGroupByNameStep('Modify Instance Group by name for EMR cluster', parameters={ - 'ClusterId': 'MyWorkflowClusterId', - 'InstanceGroupName': 'MyCoreGroup', - 'InstanceGroup': { - 'InstanceCount': 8 - } - }) + step = EmrModifyInstanceGroupByNameStep( + 'Modify Instance Group by name for EMR cluster', + result_selector=RESULT_SELECTOR, + parameters={ + 'ClusterId': 'MyWorkflowClusterId', + 'InstanceGroupName': 'MyCoreGroup', + 'InstanceGroup': { + 'InstanceCount': 8 + } + }) assert step.to_dict() == { 'Type': 'Task', @@ -668,15 +698,19 @@ def test_emr_modify_instance_group_by_name_step_creation(): 'InstanceCount': 8 } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation_sync(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={ - "Name": "MyWorkflowJobRun" - }) + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run - Sync', + result_selector=RESULT_SELECTOR, + parameters={ + "Name": "MyWorkflowJobRun" + }) assert step.to_dict() == { 'Type': 'Task', @@ -684,15 +718,20 @@ def test_databrew_start_job_run_step_creation_sync(): 'Parameters': { 'Name': 'MyWorkflowJobRun' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={ - "Name": "MyWorkflowJobRun" - }) + step = GlueDataBrewStartJobRunStep( + 'Start Glue DataBrew Job Run', + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + "Name": "MyWorkflowJobRun" + }) assert step.to_dict() == { 'Type': 'Task', @@ -700,22 +739,27 @@ def test_databrew_start_job_run_step_creation(): 'Parameters': { 'Name': 'MyWorkflowJobRun' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation(): - step = EksCreateClusterStep("Create Eks cluster", wait_for_completion=False, parameters={ - 'Name': 'MyCluster', - 'ResourcesVpcConfig': { - 'SubnetIds': [ - 'subnet-00000000000000000', - 'subnet-00000000000000001' - ] - }, - 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' - }) + step = EksCreateClusterStep( + "Create Eks cluster", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'Name': 'MyCluster', + 'ResourcesVpcConfig': { + 'SubnetIds': [ + 'subnet-00000000000000000', + 'subnet-00000000000000001' + ] + }, + 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' + }) assert step.to_dict() == { 'Type': 'Task', @@ -730,13 +774,14 @@ def test_eks_create_cluster_step_creation(): }, 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation_sync(): - step = EksCreateClusterStep("Create Eks cluster sync", parameters={ + step = EksCreateClusterStep("Create Eks cluster sync", result_selector=RESULT_SELECTOR, parameters={ 'Name': 'MyCluster', 'ResourcesVpcConfig': { 'SubnetIds': [ @@ -760,15 +805,20 @@ def test_eks_create_cluster_step_creation_sync(): }, 'RoleArn': 'arn:aws:iam::123456789012:role/MyEKSClusterRole' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_cluster_step_creation(): - step = EksDeleteClusterStep("Delete Eks cluster", wait_for_completion=False, parameters={ - 'Name': 'MyCluster' - }) + step = EksDeleteClusterStep( + "Delete Eks cluster", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'Name': 'MyCluster' + }) assert step.to_dict() == { 'Type': 'Task', @@ -776,15 +826,19 @@ def test_eks_delete_cluster_step_creation(): 'Parameters': { 'Name': 'MyCluster' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_cluster_step_creation_sync(): - step = EksDeleteClusterStep("Delete Eks cluster sync", parameters={ - 'Name': 'MyCluster' - }) + step = EksDeleteClusterStep( + "Delete Eks cluster sync", + result_selector=RESULT_SELECTOR, + parameters={ + 'Name': 'MyCluster' + }) assert step.to_dict() == { 'Type': 'Task', @@ -792,21 +846,26 @@ def test_eks_delete_cluster_step_creation_sync(): 'Parameters': { 'Name': 'MyCluster' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_fargate_profile_step_creation(): - step = EksCreateFargateProfileStep("Create Fargate profile", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'FargateProfileName': 'MyFargateProfile', - 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', - 'Selectors': [{ - 'Namespace': 'my-namespace', - 'Labels': {'my-label': 'my-value'} - }] - }) + step = EksCreateFargateProfileStep( + "Create Fargate profile", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'FargateProfileName': 'MyFargateProfile', + 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', + 'Selectors': [{ + 'Namespace': 'my-namespace', + 'Labels': {'my-label': 'my-value'} + }] + }) assert step.to_dict() == { 'Type': 'Task', @@ -820,13 +879,14 @@ def test_eks_create_fargate_profile_step_creation(): 'Labels': {'my-label': 'my-value'} }] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_fargate_profile_step_creation_sync(): - step = EksCreateFargateProfileStep("Create Fargate profile sync", parameters={ + step = EksCreateFargateProfileStep("Create Fargate profile sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile', 'PodExecutionRoleArn': 'arn:aws:iam::123456789012:role/MyFargatePodExecutionRole', @@ -848,16 +908,21 @@ def test_eks_create_fargate_profile_step_creation_sync(): 'Labels': {'my-label': 'my-value'} }] }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_fargate_profile_step_creation(): - step = EksDeleteFargateProfileStep("Delete Fargate profile", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'FargateProfileName': 'MyFargateProfile' - }) + step = EksDeleteFargateProfileStep( + "Delete Fargate profile", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'FargateProfileName': 'MyFargateProfile' + }) assert step.to_dict() == { 'Type': 'Task', @@ -866,13 +931,14 @@ def test_eks_delete_fargate_profile_step_creation(): 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_fargate_profile_step_creation_sync(): - step = EksDeleteFargateProfileStep("Delete Fargate profile sync", parameters={ + step = EksDeleteFargateProfileStep("Delete Fargate profile sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }) @@ -884,21 +950,26 @@ def test_eks_delete_fargate_profile_step_creation_sync(): 'ClusterName': 'MyCluster', 'FargateProfileName': 'MyFargateProfile' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_node_group_step_creation(): - step = EksCreateNodeGroupStep("Create Node Group", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'NodegroupName': 'MyNodegroup', - 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', - 'Subnets': [ - 'subnet-00000000000000000', - 'subnet-00000000000000001' - ] - }) + step = EksCreateNodeGroupStep( + "Create Node Group", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'NodegroupName': 'MyNodegroup', + 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', + 'Subnets': [ + 'subnet-00000000000000000', + 'subnet-00000000000000001' + ] + }) assert step.to_dict() == { 'Type': 'Task', @@ -912,12 +983,13 @@ def test_eks_create_node_group_step_creation(): 'subnet-00000000000000001' ], }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } def test_eks_create_node_group_step_creation_sync(): - step = EksCreateNodeGroupStep("Create Node Group sync", parameters={ + step = EksCreateNodeGroupStep("Create Node Group sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup', 'NodeRole': 'arn:aws:iam::123456789012:role/MyNodeInstanceRole', @@ -939,15 +1011,20 @@ def test_eks_create_node_group_step_creation_sync(): 'subnet-00000000000000001' ], }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation(): - step = EksDeleteNodegroupStep("Delete Node Group", wait_for_completion=False, parameters={ - 'ClusterName': 'MyCluster', - 'NodegroupName': 'MyNodegroup' - }) + step = EksDeleteNodegroupStep( + "Delete Node Group", + result_selector=RESULT_SELECTOR, + wait_for_completion=False, + parameters={ + 'ClusterName': 'MyCluster', + 'NodegroupName': 'MyNodegroup' + }) assert step.to_dict() == { 'Type': 'Task', @@ -956,13 +1033,14 @@ def test_eks_delete_node_group_step_creation(): 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_delete_node_group_step_creation_sync(): - step = EksDeleteNodegroupStep("Delete Node Group sync", parameters={ + step = EksDeleteNodegroupStep("Delete Node Group sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }) @@ -974,13 +1052,14 @@ def test_eks_delete_node_group_step_creation_sync(): 'ClusterName': 'MyCluster', 'NodegroupName': 'MyNodegroup' }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_run_job_step_creation(): - step = EksRunJobStep("Run Job", wait_for_completion=False, parameters={ + step = EksRunJobStep("Run Job", result_selector=RESULT_SELECTOR, wait_for_completion=False, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', @@ -1045,13 +1124,14 @@ def test_eks_run_job_step_creation(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_run_job_step_creation_sync(): - step = EksRunJobStep("Run Job sync", parameters={ + step = EksRunJobStep("Run Job sync", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://AKIAIOSFODNN7EXAMPLE.yl4.us-east-1.eks.amazonaws.com', @@ -1122,13 +1202,14 @@ def test_eks_run_job_step_creation_sync(): } } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_call_step_creation(): - step = EksCallStep("Call", parameters={ + step = EksCallStep("Call", result_selector=RESULT_SELECTOR, parameters={ 'ClusterName': 'MyCluster', 'CertificateAuthority': 'ANPAJ2UCCR6DPCEXAMPLE', 'Endpoint': 'https://444455556666.yl4.us-east-1.eks.amazonaws.com', @@ -1156,5 +1237,6 @@ def test_eks_call_step_creation(): ] } }, + 'ResultSelector': {'OutputA.$': "$['A']"}, 'End': True } From ca45e9e646cb974f9a72cbf0f6e819de877b6a8d Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 8 Sep 2021 16:21:50 -0700 Subject: [PATCH 6/8] Removed double negation from function description and added schema in error message --- doc/placeholders.rst | 9 ++++----- src/stepfunctions/inputs/placeholders.py | 18 ++++++++++++------ tests/unit/test_placeholders.py | 19 +++++++++++++++++-- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index fb57549..bc6ccc3 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -107,7 +107,10 @@ that returns the placeholder output for that step. Step Result ----------- The third mechanism is a placeholder for a step's result. The result of a step can be modified -with the **result_selector** field to replace the step's result. +with the **result_selector** field to replace the step's result. + +.. autoclass:: stepfunctions.inputs.StepResult + :inherited-members: .. code-block:: python @@ -126,7 +129,3 @@ with the **result_selector** field to replace the step's result. ) - - -.. autoclass:: stepfunctions.inputs.StepResult - :inherited-members: diff --git a/src/stepfunctions/inputs/placeholders.py b/src/stepfunctions/inputs/placeholders.py index 45442f7..8e63325 100644 --- a/src/stepfunctions/inputs/placeholders.py +++ b/src/stepfunctions/inputs/placeholders.py @@ -260,10 +260,12 @@ def __init__(self, schema=None, **kwargs): def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Workflow Input. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") if type: return ExecutionInput(name=name, parent=parent, type=type) else: @@ -283,10 +285,12 @@ def __init__(self, schema=None, **kwargs): def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Input. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction.. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") if type: return StepInput(name=name, parent=parent, type=type) else: @@ -306,10 +310,12 @@ def __init__(self, schema=None, **kwargs): def _create_variable(self, name, parent, type=None): """ Creates a placeholder variable for Step Result. - A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema. + A placeholder variable can only be created if the collection is mutable. + A collection is mutable if no pre-specified schema was defined at construction. """ if self.immutable: - raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.") + raise ValueError(f"Placeholder variable does not conform to schema set for the placeholder collection:" + f" {self.schema}") if type: return StepResult(name=name, parent=parent, type=type) else: diff --git a/tests/unit/test_placeholders.py b/tests/unit/test_placeholders.py index e544384..98fa898 100644 --- a/tests/unit/test_placeholders.py +++ b/tests/unit/test_placeholders.py @@ -17,24 +17,28 @@ from stepfunctions.inputs import ExecutionInput, StepInput, StepResult + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_creation_with_subscript_operator(placeholder): placeholder_variable = placeholder["A"] assert placeholder_variable.name == "A" assert placeholder_variable.type is None + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_creation_with_type(placeholder): placeholder_variable = placeholder["A"]["b"].get("C", float) assert placeholder_variable.name == "C" assert placeholder_variable.type == float + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_creation_with_int_key(placeholder): placeholder_variable = placeholder["A"][0] assert placeholder_variable.name == 0 assert placeholder_variable.type == None + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_creation_with_invalid_key(placeholder): with pytest.raises(ValueError): @@ -42,18 +46,21 @@ def test_placeholder_creation_with_invalid_key(placeholder): with pytest.raises(ValueError): placeholder["A"].get(1.2, str) + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_creation_failure_with_type(placeholder): placeholder_variable = placeholder["A"]["b"].get("C", float) with pytest.raises(ValueError): placeholder["A"]["b"].get("C", int) + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_path(placeholder): placeholder_variable = placeholder["A"]["b"]["C"] expected_path = ["A", "b", "C"] assert placeholder_variable._get_path() == expected_path + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_contains(placeholder): var_one = placeholder["Key01"] @@ -68,6 +75,7 @@ def test_placeholder_contains(placeholder): assert placeholder.contains(var_five) == False assert placeholder_two.contains(var_three) == False + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_schema_as_dict(placeholder): placeholder["A"]["b"].get("C", float) @@ -93,6 +101,7 @@ def test_placeholder_schema_as_dict(placeholder): assert placeholder.get_schema_as_dict() == expected_schema + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_schema_as_json(placeholder): placeholder["Response"].get("StatusCode", int) @@ -114,6 +123,7 @@ def test_placeholder_schema_as_json(placeholder): assert placeholder.get_schema_as_json() == json.dumps(expected_schema) + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_is_empty(placeholder): placeholder_variable = placeholder["A"]["B"]["C"] @@ -121,6 +131,7 @@ def test_placeholder_is_empty(placeholder): placeholder["A"]["B"]["C"]["D"] assert placeholder_variable._is_empty() == False + @pytest.mark.parametrize("placeholder", [StepInput(), StepResult(), ExecutionInput()]) def test_placeholder_make_immutable(placeholder): placeholder["A"]["b"].get("C", float) @@ -134,6 +145,7 @@ def test_placeholder_make_immutable(placeholder): placeholder._make_immutable() assert check_immutable(placeholder) == True + def test_placeholder_with_schema(): test_schema = { "A": { @@ -156,23 +168,26 @@ def test_placeholder_with_schema(): with pytest.raises(ValueError): workflow_input["A"]["B"].get("C", float) + def test_workflow_input_jsonpath(): workflow_input = ExecutionInput() placeholder_variable = workflow_input["A"]["b"].get("C", float) assert placeholder_variable.to_jsonpath() == "$$.Execution.Input['A']['b']['C']" + def test_step_input_jsonpath(): step_input = StepInput() placeholder_variable = step_input["A"]["b"].get(0, float) assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" + def test_step_result_jsonpath(): step_result = StepResult() placeholder_variable = step_result["A"]["b"].get(0, float) assert placeholder_variable.to_jsonpath() == "$['A']['b'][0]" -# UTILS +# UTILS def check_immutable(placeholder): if placeholder.immutable is True: if placeholder._is_empty(): @@ -181,4 +196,4 @@ def check_immutable(placeholder): for k, v in placeholder.store.items(): return check_immutable(v) else: - return False \ No newline at end of file + return False From a7016da74f5d7a550a09ba92defe9107a051627e Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 8 Sep 2021 17:43:05 -0700 Subject: [PATCH 7/8] Reformat documentation --- doc/placeholders.rst | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index bc6ccc3..10da606 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -6,11 +6,18 @@ Once defined, a workflow is static unless you update it explicitly. But, you can input to workflow executions. You can have dynamic values that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this, the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you -create your workflow. +create your workflow. There are 3 mechanisms for passing dynamic values in a workflow. .. autoclass:: stepfunctions.inputs.Placeholder -There are 3 mechanisms for passing dynamic values in a workflow: +.. autoclass:: stepfunctions.inputs.ExecutionInput + :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepInput + :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepResult + :inherited-members: - `Execution Input <#execution-input>`__ @@ -24,9 +31,6 @@ The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` to define the schema for this input, and to access the values in your workflow. -.. autoclass:: stepfunctions.inputs.ExecutionInput - :inherited-members: - .. code-block:: python # Create an instance of ExecutionInput class, and define a schema. Defining @@ -76,9 +80,6 @@ The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this. By default, each step has an output method :py:meth:`stepfunctions.steps.states.State.output` that returns the placeholder output for that step. -.. autoclass:: stepfunctions.inputs.StepInput - :inherited-members: - .. code-block:: python lambda_state_first = LambdaStep( @@ -109,9 +110,6 @@ Step Result The third mechanism is a placeholder for a step's result. The result of a step can be modified with the **result_selector** field to replace the step's result. -.. autoclass:: stepfunctions.inputs.StepResult - :inherited-members: - .. code-block:: python lambda_result = StepResult( @@ -127,5 +125,3 @@ with the **result_selector** field to replace the step's result. "Status": "Success" } ) - - From 6594b1c7412095c0a24f2daf502dbe5d85e003d6 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 22 Oct 2021 19:26:33 -0700 Subject: [PATCH 8/8] Removed unused import, reformatted doc and modified tests --- doc/placeholders.rst | 37 ++++++++-------------- src/stepfunctions/steps/states.py | 2 +- tests/unit/test_placeholders_with_steps.py | 23 ++++---------- 3 files changed, 21 insertions(+), 41 deletions(-) diff --git a/doc/placeholders.rst b/doc/placeholders.rst index 10da606..103312b 100644 --- a/doc/placeholders.rst +++ b/doc/placeholders.rst @@ -8,25 +8,6 @@ that you use in the **parameters** or **result_selector** fields of the steps in the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you create your workflow. There are 3 mechanisms for passing dynamic values in a workflow. -.. autoclass:: stepfunctions.inputs.Placeholder - -.. autoclass:: stepfunctions.inputs.ExecutionInput - :inherited-members: - -.. autoclass:: stepfunctions.inputs.StepInput - :inherited-members: - -.. autoclass:: stepfunctions.inputs.StepResult - :inherited-members: - -- `Execution Input <#execution-input>`__ - -- `Step Input <#step-input>`__ - -- `Step Result <#step-result>`__ - -Execution Input ---------------- The first mechanism is a global input to the workflow execution. This input is accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput` to define the schema for this input, and to access the values in your workflow. @@ -71,8 +52,6 @@ to define the schema for this input, and to access the values in your workflow. workflow.execute(inputs={'myDynamicInput': "WorldHello"}) -Step Input ----------- The second mechanism is for passing dynamic values from one step to the next step. The output of one step becomes the input of the next step. The SDK provides the :py:meth:`stepfunctions.inputs.StepInput` class for this. @@ -105,10 +84,10 @@ that returns the placeholder output for that step. definition = Chain([lambda_state_first, lambda_state_second]) -Step Result ------------ + The third mechanism is a placeholder for a step's result. The result of a step can be modified with the **result_selector** field to replace the step's result. +The SDK provides :py:meth:`stepfunctions.inputs.StepResult` class for this. .. code-block:: python @@ -125,3 +104,15 @@ with the **result_selector** field to replace the step's result. "Status": "Success" } ) + + +.. autoclass:: stepfunctions.inputs.Placeholder + +.. autoclass:: stepfunctions.inputs.ExecutionInput + :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepInput + :inherited-members: + +.. autoclass:: stepfunctions.inputs.StepResult + :inherited-members: diff --git a/src/stepfunctions/steps/states.py b/src/stepfunctions/steps/states.py index a9c41fb..bb7dfb3 100644 --- a/src/stepfunctions/steps/states.py +++ b/src/stepfunctions/steps/states.py @@ -17,7 +17,7 @@ from stepfunctions.exceptions import DuplicateStatesInChain from stepfunctions.steps.fields import Field -from stepfunctions.inputs import Placeholder, StepInput, StepResult +from stepfunctions.inputs import Placeholder, StepInput logger = logging.getLogger('stepfunctions.states') diff --git a/tests/unit/test_placeholders_with_steps.py b/tests/unit/test_placeholders_with_steps.py index d695b22..11ea80e 100644 --- a/tests/unit/test_placeholders_with_steps.py +++ b/tests/unit/test_placeholders_with_steps.py @@ -410,6 +410,7 @@ def test_schema_validation_for_step_input(): } ) + def test_step_result_placeholder(): step_result = StepResult() @@ -432,8 +433,8 @@ def test_step_result_placeholder(): assert test_step_01.to_dict() == expected_repr -def test_schema_validation_for_step_result(): +def test_schema_validation_for_step_result(): step_result = StepResult( schema={ "Payload": { @@ -460,21 +461,9 @@ def test_schema_validation_for_step_result(): } ) -def test_schema_validation_success_for_step_result(): - step_result = StepResult( - schema={ - "Payload": { - "Key01": str - } + test_step_03 = Task( + state_id="StateOne", + result_selector={ + "ParamA": step_result["Payload"]["Key01"] } ) - - try: - test_step = Task( - state_id="StateOne", - result_selector={ - "ParamA": step_result["Payload"]["Key01"] - } - ) - except: - pytest.fail("Step should fetch step result key successfully without raising an Exception")