Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Amazon States Language "ResultSelector" in Task, Map … #102

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions doc/placeholders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ Placeholders

Once defined, a workflow is static unless you update it explicitly. But, you can pass
input to workflow executions. You can have dynamic values
that you use in the **parameters** fields of the steps in your workflow. For this,
that you use in the **parameters** or **result_selector** fields of the steps in your workflow. For this,
the AWS Step Functions Data Science SDK provides a way to define placeholders to pass around when you
create your workflow. There are 2 mechanisms for passing dynamic values in a workflow.
create your workflow. There are 3 mechanisms for passing dynamic values in a workflow.
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved

The first mechanism is a global input to the workflow execution. This input is
accessible to all the steps in the workflow. The SDK provides :py:meth:`stepfunctions.inputs.ExecutionInput`
Expand Down Expand Up @@ -64,10 +64,10 @@ that returns the placeholder output for that step.
parameters={
"FunctionName": "MakeApiCall",
"Payload": {
"input": "20192312"
}
"input": "20192312"
}
)
}
)

lambda_state_second = LambdaStep(
state_id="MySecondLambdaStep",
Expand All @@ -81,7 +81,24 @@ that returns the placeholder output for that step.

definition = Chain([lambda_state_first, lambda_state_second])

The third mechanism is a placeholder for a step's result. The result of a step can be modified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wondering if it's really necessary to introduce StepResult when StepInput could also be used to achieve the same thing. I left that comment earlier but it didn't get an answer. Literally the only difference is the class name. Thoughts @shivlaks?

We can always add classes later, but removing it afterwards breaks back-compat.

lambda_result = StepInput(
    schema={
        "Id": str,
    }
)

lambda_state_first = LambdaStep(
    state_id="MyFirstLambdaStep",
    result_selector={
        "Output": lambda_result["Id"],
        "Status": "Success"
    }
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - I missed that comment earlier

While it does make sense to use the same object, I think the name StepInput could bring confusion since, in this case, we are not using the step's input, but the step's result (or output)

To be backwards compatible, we can't rename StepInput to a more general term. Alternatively, we could make StepResult inherit from StepInput instead of Placeholder and avoid code duplication.

with the **result_selector** field to replace the step's result.

.. code-block:: python

lambda_result = StepResult(
schema={
"Id": str,
}
)

lambda_state_first = LambdaStep(
state_id="MyFirstLambdaStep",
result_selector={
"Output": lambda_result["Id"],
"Status": "Success"
}
)

.. autoclass:: stepfunctions.inputs.Placeholder

Expand All @@ -90,3 +107,6 @@ that returns the placeholder output for that step.

.. autoclass:: stepfunctions.inputs.StepInput
:inherited-members:

.. autoclass:: stepfunctions.inputs.StepResult
:inherited-members:
8 changes: 4 additions & 4 deletions src/stepfunctions/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput
from stepfunctions.inputs.placeholders import Placeholder, ExecutionInput, StepInput, StepResult
45 changes: 34 additions & 11 deletions src/stepfunctions/inputs/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import absolute_import

Expand Down Expand Up @@ -51,11 +51,11 @@ def __init__(self, schema=None, **kwargs):
self._set_schema(schema)
self._make_immutable()
self.json_str_template = "{}"

self.name = kwargs.get("name")
self.type = kwargs.get("type")
self.parent = kwargs.get("parent")


def get(self, name, type):
"""
Expand All @@ -64,11 +64,11 @@ def get(self, name, type):
Args:
name (str): Name of the placeholder variable.
type (type): Type of the placeholder variable.

Raises:
ValueError: If placeholder variable with the same name but different type already exists.
ValueError: If placeholder variable does not fit into a previously specified schema for the placeholder collection.

Returns:
Placeholder: Placeholder variable.
"""
Expand Down Expand Up @@ -240,7 +240,7 @@ def _join_path(self, path):
def to_jsonpath(self):
"""
Returns a JSON path representation of the placeholder variable to be used for step parameters.

Returns:
str: JSON path representation of the placeholder variable
"""
Expand All @@ -252,7 +252,7 @@ class ExecutionInput(Placeholder):
"""
Top-level class for execution input placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(ExecutionInput, self).__init__(schema, **kwargs)
self.json_str_template = '$$.Execution.Input{}'
Expand All @@ -268,7 +268,7 @@ def _create_variable(self, name, parent, type=None):
return ExecutionInput(name=name, parent=parent, type=type)
else:
return ExecutionInput(name=name, parent=parent)


class StepInput(Placeholder):

Expand All @@ -279,7 +279,7 @@ class StepInput(Placeholder):
def __init__(self, schema=None, **kwargs):
super(StepInput, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Input.
Expand All @@ -291,3 +291,26 @@ def _create_variable(self, name, parent, type=None):
return StepInput(name=name, parent=parent, type=type)
else:
return StepInput(name=name, parent=parent)


class StepResult(Placeholder):
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved

"""
Top-level class for step result placeholders.
"""

def __init__(self, schema=None, **kwargs):
super(StepResult, self).__init__(schema, **kwargs)
self.json_str_template = '${}'

def _create_variable(self, name, parent, type=None):
"""
Creates a placeholder variable for Step Result.
A placeholder variable can only be created if the collection is not immutable due to a pre-specified schema.
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved
"""
if self.immutable:
raise ValueError("Placeholder variable does not conform to schema set for the placeholder collection.")
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved
if type:
return StepResult(name=name, parent=parent, type=type)
else:
return StepResult(name=name, parent=parent)
4 changes: 4 additions & 0 deletions src/stepfunctions/steps/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, state_id, wait_for_callback=False, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -98,6 +99,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintaining these docstrings for every inheriting class is pretty annoying. I wonder if there anything we can do to make that easier.

result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -138,6 +140,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down Expand Up @@ -178,6 +181,7 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
comment (str, optional): Human-readable comment or description. (default: None)
input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$')
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_selector (dict, optional): The value of this field becomes the effective result of the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
Expand Down
1 change: 1 addition & 0 deletions src/stepfunctions/steps/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Field(Enum):
InputPath = 'input_path'
OutputPath = 'output_path'
Parameters = 'parameters'
ResultSelector = 'result_selector'
ResultPath = 'result_path'
Next = 'next'
Retry = 'retry'
Expand Down
Loading