From 07533236de78f333f7586f1d24f186b471072907 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 28 Oct 2021 18:17:16 -0700 Subject: [PATCH 1/6] Update Databrew StartJobRun step to use integration pattern input --- src/stepfunctions/steps/service.py | 29 +++++++++++++-------------- tests/unit/test_service_steps.py | 32 ++++++++++++++++++++++++------ 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 986217c..80b076f 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -544,10 +544,14 @@ class GlueDataBrewStartJobRunStep(Task): Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions `_ for more details. """ - def __init__(self, state_id, wait_for_completion=True, **kwargs): + def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. + integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion) + Supported integration patterns: + WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job Date: Thu, 28 Oct 2021 19:06:45 -0700 Subject: [PATCH 2/6] Reformat code --- src/stepfunctions/steps/service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 80b076f..218a6d6 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -566,8 +566,8 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl is_integration_pattern_valid(integration_pattern, supported_integ_patterns) kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME, - GlueDataBrewApi.StartJobRun, - integration_pattern) + GlueDataBrewApi.StartJobRun, + integration_pattern) """ Example resource arns: - CallAndContinue: arn: arn:aws:states:::databrew:startJobRun From d64b4a0368c167289378cb5aa1c965b94e2db886 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Tue, 2 Nov 2021 16:28:32 -0700 Subject: [PATCH 3/6] Added unit test to cover unsupported integration pattern --- tests/unit/test_service_steps.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 913ece6..ac5d269 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -727,6 +727,13 @@ def test_databrew_start_job_run_step_creation_call_and_continue(): } +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error(): + with pytest.raises(ValueError): + GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken', + integration_pattern=IntegrationPattern.WaitForTaskToken) + + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_eks_create_cluster_step_creation(): step = EksCreateClusterStep("Create Eks cluster", wait_for_completion=False, parameters={ From 3325e2450bf20362f26b30f2d77b32cbea82b2f8 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Thu, 4 Nov 2021 12:10:02 -0700 Subject: [PATCH 4/6] Update tests/unit/test_service_steps.py Co-authored-by: Shiv Lakshminarayan --- tests/unit/test_service_steps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index ac5d269..6935ce0 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -677,7 +677,7 @@ def test_emr_modify_instance_group_by_name_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation_default(): - step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Default', parameters={ + step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - default', parameters={ "Name": "MyWorkflowJobRun" }) From 06892625e8d520c428b889e30060045a631382b7 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 4 Nov 2021 12:21:47 -0700 Subject: [PATCH 5/6] Added assert on error message --- tests/unit/test_service_steps.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 6935ce0..642f7a6 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -14,6 +14,7 @@ import boto3 import pytest +import re from unittest.mock import patch from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep @@ -729,7 +730,10 @@ def test_databrew_start_job_run_step_creation_call_and_continue(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_databrew_start_job_run_step_creation_wait_for_task_token_raises_error(): - with pytest.raises(ValueError): + error_message = re.escape(f"Integration Pattern ({IntegrationPattern.WaitForTaskToken.name}) is not supported for this step - " + f"Please use one of the following: " + f"{[IntegrationPattern.WaitForCompletion.name, IntegrationPattern.CallAndContinue.name]}") + with pytest.raises(ValueError, match=error_message): GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - WaitForTaskToken', integration_pattern=IntegrationPattern.WaitForTaskToken) From fe1e28c644c29f567c9085b92413dcfcc1db958d Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 4 Nov 2021 20:36:34 -0700 Subject: [PATCH 6/6] Reformatted docstring --- src/stepfunctions/steps/service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 218a6d6..35a12dd 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -548,10 +548,10 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. - integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion) - Supported integration patterns: - WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job `_ for more details.) + * CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response `_ for more details.) comment (str, optional): Human-readable comment or description. (default: None) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.