-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support placeholders for processing step #155
Support placeholders for processing step #155
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this helps illustrate the idea you have in mind - thank you again for shrinking down the PR size.
Wanted to post the comments so far so you can consider them. I still have to take a closer look at the sagemaker.py
class as well as your open questions.
src/stepfunctions/steps/constants.py
Outdated
|
||
# Path to SageMaker placeholder parameters | ||
placeholder_paths = { | ||
# Paths taken from https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question:
- what does the comment mean when it says "taken" from the documentation link?
- can all properties be represented by placeholders or is it only some?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1: I took the location to the CreateProcessingJob request from there to save in placeholder_paths for each arg
2: all can
src/stepfunctions/steps/constants.py
Outdated
Field.Role.value: ['RoleArn'], | ||
Field.ImageUri.value: ['AppSpecification', 'ImageUri'], | ||
Field.InstanceCount.value: ['ProcessingResources', 'ClusterConfig', 'InstanceCount'], | ||
Field.InstanceType.value: ['ProcessingResources', 'ClusterConfig', 'InstanceType'], | ||
Field.Entrypoint.value: ['AppSpecification', 'ContainerEntrypoint'], | ||
Field.VolumeSizeInGB.value: ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'], | ||
Field.VolumeKMSKey.value: ['ProcessingResources', 'ClusterConfig', 'VolumeKmsKeyId'], | ||
Field.Env.value: ['Environment'], | ||
Field.Tags.value: ['Tags'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: is there a way to read these from the SDK or automate it? having this hand-rolled can be problematic for a few reasons:
- drift if the API signatures expand
- prone to error, since it's reliant on everything being hand rolled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same thought - doing this by hand can introduce errors and is not easily maintainable. Doing it this way allowed less code redundancy.
Another option would be, for each SageMaker property, to call the function that adds it to the Parameters in the Sagemaker code instead of manually getting the path from the API docs
That would mean:
- Each property will call a different function (if existing) in sagemaker in order to add it to Parameters
- Some properties will not have an existing function in sagemaker that adds them to Parameters and we will have to do it by hand using the
placeholder_paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read @wong-a 's proposed solution after posting the previous comment - will go with that since it removes the need to map each args with placeholder_paths
src/stepfunctions/steps/constants.py
Outdated
Field.VolumeSizeInGB.value: ['ProcessingResources', 'ClusterConfig', 'VolumeSizeInGB'], | ||
Field.VolumeKMSKey.value: ['ProcessingResources', 'ClusterConfig', 'VolumeKmsKeyId'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: haven't played much with this, but do all nested properties (i.e. VolumeSizeInGb) support placeholders to supply their value? - Admittedly, Ive only supplied top level properties and haven't tinkered deep enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are when passed to parameters
The placeholders are all replaced in parameters(included nested values) here
src/stepfunctions/steps/fields.py
Outdated
# Sagemaker step fields | ||
# Processing Step: Processor | ||
Role = 'role' | ||
ImageUri = 'image_uri' | ||
InstanceCount = 'instance_count' | ||
InstanceType = 'instance_type' | ||
Entrypoint = 'entrypoint' | ||
VolumeSizeInGB = 'volume_size_in_gb' | ||
VolumeKMSKey = 'volume_kms_key' | ||
OutputKMSKey = 'output_kms_key' | ||
MaxRuntimeInSeconds = 'max_runtime_in_seconds' | ||
Env = 'env' | ||
Tags = 'tags' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: is this the right place for storing these properties? everything else in this file is specific to states and ASL, but this introduces properties specific to a service integration's API.
some properties will also be duplicated across APIs/Service Integrations (things like role, tags, etc are probably used in multiple APIs)
another thing to think about:
as a customer, would it be more intuitive to have something like Placeholders.SagemakerProcessingStep.blah
than Fields.blah
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love what you are proposing! Separating the SageMaker property fields and the state and ASL specific fields will definitely make it more intuitive to the customer
Changes will be made in next commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This class is for ASL fields, not parameters of specific service integrations
src/stepfunctions/steps/sagemaker.py
Outdated
@@ -473,13 +576,19 @@ def __init__(self, state_id, tuner, job_name, data, wait_for_completion=True, ta | |||
super(TuningStep, self).__init__(state_id, **kwargs) | |||
|
|||
|
|||
class ProcessingStep(Task): | |||
class ProcessingStep(SageMakerTask): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
breaking change??
the class constructor used to take in a Task but now that's been changed. when customers upgrade versions, won't their existing code fail?
we cannot make breaking changes as we need to follow semantic versioning while releasing minor version updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes ProcessingStep's base class, but not the constructor arguments. With this change, instead of calling Task's constructor directly in init(), we call SageMakerTask's constructor which in turn calls Task's constructor.
Before:
- ProcessingtStep.init()
- Task.init()
After:
- ProcessingtStep.init()
- SageMakerTask.init()
- Task.init()
- SageMakerTask.init()
src/stepfunctions/steps/sagemaker.py
Outdated
in the processing job (compatible with Placeholders): role, image_uri, instance_count, instance_type, | ||
volume_size_in_gb, volume_kms_key, output_kms_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: what's the source of truth here? how did we verify that these properties are the ones supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ones that are made to be placeholder compatible are :
- The args that are documented as being placeholder compatible in the Args section (for ex: job_name)
- The ones that are included in placeholder_paths (src/stepfunctions/steps/constants.py)
Since we are replacing the Placeholders with the ExecutionInput when starting the job, when the Sagemaker job starts, all placeholders are replaced. If some args that we configured to hold placeholder in our state machine were not replaced, this should trigger an error.
I can add a test to confirm the behaviour in the next commit
Thanks for bringing this up - this confirms that this documentation is not clear and we might want to switch to the Alternative solution where we would add all Placeholder compatible properties as optional args in the step constructor, making it clearer to the customer which are Placeholder compatible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is docstring is out of date with the new implementation
tests/integ/test_sagemaker_steps.py
Outdated
|
||
# Cleanup | ||
state_machine_delete_wait(sfn_client, workflow.state_machine_arn) | ||
# End of Cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: i think the code is self explanatory. we can drop this comment 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right! i'll remove the comments :)
They are included in all the other tests - will do a cleanup for the other tests in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you forget to remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - will remove it in the next commit!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, only dicts, str and number objects were made placeholder compatible to facilitate testing purposes. Is there a need for other object types to be dynamically passed down to the Processor?
Ideally any field can be specified at runtime. In the end, all objects and Classes we accept get serialized to a dict/JSON so we can handle this. It is possible to specify the value of any key in Parameters using JSONPath.
Both proposed solutions require the stepfunctions SDK to maintain a mapping of each argument's location in the final CreateProcessingJob payload. I'm not sure we can derive this from the AWS SDK. Writing it by hand is more prone to mistakes and may lag behind if SageMaker adds new parameters to their APIs.
Here's a simpler solution with two parts that's more or less future-proof:
1. Each constructor argument we already accept today that is a primitive type can also be a Placeholder
We can hand-roll the Parameters substitution similar to #142 . We can be selective about which fields we want to support here.
2. Allow customers to use parameters
to specify a dict containing static or placeholder values.
In each SageMaker Task constructor, we accept parameters
argument which is a dict
containing static or Placeholder
values. At the end of the constructor, we merge the autogenerated parameters from SageMaker classes and other constructor args with parameters
. Adding this bit lets the customer use Placholders or static values for any field, including nested fields in SageMaker Python classes.
All non-SageMaker Tasks already accept a parameters
arguemnt which automatically substitutes the keys for Placeholders
. For SageMaker steps, we override this.
src/stepfunctions/steps/fields.py
Outdated
# Sagemaker step fields | ||
# Processing Step: Processor | ||
Role = 'role' | ||
ImageUri = 'image_uri' | ||
InstanceCount = 'instance_count' | ||
InstanceType = 'instance_type' | ||
Entrypoint = 'entrypoint' | ||
VolumeSizeInGB = 'volume_size_in_gb' | ||
VolumeKMSKey = 'volume_kms_key' | ||
OutputKMSKey = 'output_kms_key' | ||
MaxRuntimeInSeconds = 'max_runtime_in_seconds' | ||
Env = 'env' | ||
Tags = 'tags' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This class is for ASL fields, not parameters of specific service integrations
Thank you both for your review! This makes for a simpler and more effective solution and makes it the customer's responsibility to use a |
…meters received in args
Field.Role.value: str, | ||
Field.VolumeSizeInGB.value: int, | ||
Field.MaxRuntimeInSeconds.value: int | ||
'image_uri': str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're only using these values for test purposes, using the direct string values for better code readability
…meters received in args
…thub.com/ca-nguyen/aws-step-functions-data-science-sdk-python into support-placeholders-for-processing-step
src/stepfunctions/steps/utils.py
Outdated
@@ -45,3 +46,24 @@ def get_aws_partition(): | |||
return cur_partition | |||
|
|||
return cur_partition | |||
|
|||
|
|||
def merge_dicts(first, second, first_name, second_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be used to merge the hyperparameters in TrainingStep - will make the changes in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: First and second don't describe the side effects and which dict gets merged into what. Borrowing from JavaScript's Object.assign
:
def merge_dicts(first, second, first_name, second_name): | |
def merge_dicts(target, source): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - I also like to push for doc strings where behaviour is not entirely intuitive. i.e. what happens if there are clashes, are overwrites allowed, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking close to finished with the new solution. Provided some minor comments for documentation and readability.
src/stepfunctions/steps/sagemaker.py
Outdated
in the processing job (compatible with Placeholders): role, image_uri, instance_count, instance_type, | ||
volume_size_in_gb, volume_kms_key, output_kms_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is docstring is out of date with the new implementation
src/stepfunctions/steps/utils.py
Outdated
@@ -45,3 +46,24 @@ def get_aws_partition(): | |||
return cur_partition | |||
|
|||
return cur_partition | |||
|
|||
|
|||
def merge_dicts(first, second, first_name, second_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: First and second don't describe the side effects and which dict gets merged into what. Borrowing from JavaScript's Object.assign
:
def merge_dicts(first, second, first_name, second_name): | |
def merge_dicts(target, source): |
src/stepfunctions/steps/utils.py
Outdated
logger.info( | ||
f"{first_name} property: <{key}> with value: <{first[key]}>" | ||
f" will be overwritten with value provided in {second_name} : <{value}>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Do we think this is useful? If not, can just use Python's built-in dict.update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The built in update() does not take into account nested dictionary values - for ex:
d1 = {'a': {'aa': 1, 'bb': 2, 'c': 3}}
d2 = {'a': {'bb': 1}}
d1.update(d2)
print(d1)
Will have following output: {'a': {'bb': 1}}
Since we would expect to get {'a': {'aa': 1, 'bb': 1, 'c': 3}}
, we can't use the update() function in our case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially added them to facilitate troubleshooting, but I'm open to remove the logs if we deem them not useful enough or too noisy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the expected behaviour is well documented it seems unnecessary. Since the method only exists for logging, if we get rid of it there's less code to maintain. What do you think, @shivlaks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The built in update() does not take into account nested dictionary values
Missed this comment. Since we need a deep merge, dict.update is not going to work here
Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com>
…riting dict values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach as it's much closer to what we do in the CDK for property bags vs. maintaining hand-rolled properties, which has proven to be untenable in the past. In this case, it was also becoming unwieldy due to the number of properties in these APIs.
had a couple nits and a question or two, but overall it looks good!
The summary indicates that this closes #85 but that one doesn't seem to be an issue for processing step. please amend if needed before you merge.
src/stepfunctions/exceptions.py
Outdated
class DuplicateStatesInChain(Exception): | ||
pass No newline at end of file | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: keep files with style changes out of the PR for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - will remove it from this PR
src/stepfunctions/steps/utils.py
Outdated
@@ -45,3 +46,24 @@ def get_aws_partition(): | |||
return cur_partition | |||
|
|||
return cur_partition | |||
|
|||
|
|||
def merge_dicts(first, second, first_name, second_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - I also like to push for doc strings where behaviour is not entirely intuitive. i.e. what happens if there are clashes, are overwrites allowed, etc.
@@ -30,6 +30,7 @@ | |||
|
|||
SAGEMAKER_SERVICE_NAME = "sagemaker" | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to see us embracing pep8 in files we touch 🙌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙌🙌🙌
for key, value in source.items(): | ||
if key in target: | ||
if isinstance(target[key], dict) and isinstance(source[key], dict): | ||
merge_dicts(target[key], source[key]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice to see recursion being used :)
tests/integ/test_sagemaker_steps.py
Outdated
def test_processing_step_with_placeholders(sklearn_processor_fixture, sagemaker_session, sfn_client, sfn_role_arn, | ||
sagemaker_role_arn): | ||
region = boto3.session.Session().region_name | ||
input_data = 's3://sagemaker-sample-data-{}/processing/census/census-income.csv'.format(region) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not use f strings
here too instead of format
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that using fstring
is more readable and efficient. format
was used for all other tests so i kept it for consistency.
Will change it for this added test and perhaps we can make the change for the rest of the file in a separate PR
tests/integ/test_sagemaker_steps.py
Outdated
key_prefix='integ-test-data/sklearn_processing/code' | ||
) | ||
|
||
output_s3 = 's3://' + sagemaker_session.default_bucket() + '/integ-test-data/sklearn_processing' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not use f strings here instead of concatenation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - using fstring
would be more readable and efficient.
Same comment: format
was used for all other tests so i kept it for consistency.
Will change it for this added test and perhaps we can make the change for the rest of the file in a separate PR
tests/integ/test_sagemaker_steps.py
Outdated
|
||
# Cleanup | ||
state_machine_delete_wait(sfn_client, workflow.state_machine_arn) | ||
# End of Cleanup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you forget to remove this?
@@ -45,3 +46,28 @@ def get_aws_partition(): | |||
return cur_partition | |||
|
|||
return cur_partition | |||
|
|||
|
|||
def merge_dicts(target, source): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: is it typical to modify a dict in place rather than return a merged dict that doesn't manipulate inputs?
i'm not sure if it's idiomatic, or my Java tendencies to declare inputs as final
is kicking in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was implemented having dict.update() function in mind, where it is possible to update a dict with another. In our case, we are merging nested dicts as well.
Mutable objects are all passed by reference in Python and the description explains the function behaviour, so I think it makes sense to leave it as is - what do you think? :)
tests/integ/test_sagemaker_steps.py
Outdated
workflow_graph = Chain([processing_step]) | ||
|
||
with timeout(minutes=DEFAULT_TIMEOUT_MINUTES): | ||
# Create workflow and check definition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unnecessary comment as the method name expresses this in snake case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed- will be removed with the next commit
You are right! will remove it from the linked issues! |
Received 2 ship-its - merging PR |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Issue #, if available: #117, #139, #94
Description of changes:
Currently, it is not possible to use placeholders for Sagemaker Processor properties . The properties cannot be defined dynamically, as they need to be defined in the Processor which does not accept placeholders.
This change makes it possible to use placeholders for Processor properties by using the
parameters
field that are passed down from the ProcessingStep.Proposed changes
Use the
parameters
field that is compatible with placeholders to define ProcessingStep properties.Merge the sagemaker generated configs with the input
parameters
:parameters
will overwrite the sagemaker generated configs if the properties were defined in bothBy submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.