Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Update AWS Glue Databrew StartJobRun step to use integration pattern input #176

Merged
merged 8 commits into from
Nov 15, 2021
31 changes: 15 additions & 16 deletions src/stepfunctions/steps/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,14 @@ class GlueDataBrewStartJobRunStep(Task):
Creates a Task state that starts a DataBrew job. See `Manage AWS Glue DataBrew Jobs with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-databrew.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, **kwargs):
def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompletion, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion)
Supported integration patterns:
WaitForCompletion: Wait for the Databrew job to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync`_ for more details.)
CallAndContinue: Call StartJobRun and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default`_ for more details.)
comment (str, optional): Human-readable comment or description. (default: None)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
Expand All @@ -557,23 +561,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):
parameters (dict, optional): The value of this field becomes the effective input for the state.
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True)
"""
if wait_for_completion:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun.sync
"""
supported_integ_patterns = [IntegrationPattern.WaitForCompletion, IntegrationPattern.CallAndContinue]

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::databrew:startJobRun
"""

kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun)
is_integration_pattern_valid(integration_pattern, supported_integ_patterns)
kwargs[Field.Resource.value] = get_service_integration_arn(GLUE_DATABREW_SERVICE_NAME,
GlueDataBrewApi.StartJobRun,
integration_pattern)
"""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (non-blocking): would be good to have the comment precede the Field.Resource assignment.

Example resource arns:
- CallAndContinue: arn: arn:aws:states:::databrew:startJobRun
- WaitForCompletion: arn: arn:aws:states:::databrew:startJobRun.sync
"""

super(GlueDataBrewStartJobRunStep, self).__init__(state_id, **kwargs)

Expand Down
32 changes: 26 additions & 6 deletions tests/unit/test_service_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ def test_emr_modify_instance_group_by_name_step_creation():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_sync():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Sync', parameters={
def test_databrew_start_job_run_step_creation_default():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run - Default', parameters={
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved
"Name": "MyWorkflowJobRun"
})

Expand All @@ -692,10 +692,30 @@ def test_databrew_start_job_run_step_creation_sync():


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation():
step = GlueDataBrewStartJobRunStep('Start Glue DataBrew Job Run', wait_for_completion=False, parameters={
"Name": "MyWorkflowJobRun"
})
def test_databrew_start_job_run_step_creation_wait_for_completion():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - WaitForCompletion', integration_pattern=IntegrationPattern.WaitForCompletion,
parameters={
"Name": "MyWorkflowJobRun"
})
ca-nguyen marked this conversation as resolved.
Show resolved Hide resolved

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::databrew:startJobRun.sync',
'Parameters': {
'Name': 'MyWorkflowJobRun'
},
'End': True
}


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_databrew_start_job_run_step_creation_call_and_continue():
step = GlueDataBrewStartJobRunStep(
'Start Glue DataBrew Job Run - CallAndContinue',
integration_pattern=IntegrationPattern.CallAndContinue, parameters={
"Name": "MyWorkflowJobRun"
})

assert step.to_dict() == {
'Type': 'Task',
Expand Down