Skip to content

Commit

Permalink
feat: Change ECS Task to use IntegrationPattern for input
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaei committed Dec 10, 2021
1 parent fbab338 commit b09ee26
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
58 changes: 29 additions & 29 deletions src/stepfunctions/steps/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,25 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs):


class EcsRunTaskStep(Task):

"""
Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions <https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html>`_ for more details.
"""

def __init__(self, state_id, wait_for_completion=True, wait_for_callback=False, **kwargs):
supported_integration_patterns = [
IntegrationPattern.WaitForCompletion,
IntegrationPattern.WaitForTaskToken,
IntegrationPattern.CallAndContinue
]

def __init__(self, state_id, wait_for_completion=True, integration_pattern=None, **kwargs):
"""
Args:
state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine.
wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True)
wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False)
integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. This is mutually exclusive from wait_for_completion Supported integration patterns (default: None):
* WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync>`_ for more details.)
* WaitForTaskToken: Wait for the state machine execution to return a task token before progressing to the next state (See `Wait for a Callback with the Task Token <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token>`_ for more details.)
* CallAndContinue: Call StartExecution and progress to the next state (See `Request Response <https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default>`_ for more details.)
timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60)
timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer.
heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name.
Expand All @@ -182,31 +190,23 @@ def __init__(self, state_id, wait_for_completion=True, wait_for_callback=False,
result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$')
output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$')
"""
if wait_for_completion and wait_for_callback:
raise ValueError("Only one of wait_for_completion and wait_for_callback can be true")

if wait_for_callback:
"""
Example resource arn: arn:aws:states:::ecs:runTask.waitForTaskToken
"""

kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask,
IntegrationPattern.WaitForTaskToken)
elif wait_for_completion:
"""
Example resource arn: arn:aws:states:::ecs:runTask.sync
"""

kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask,
IntegrationPattern.WaitForCompletion)
else:
"""
Example resource arn: arn:aws:states:::ecs:runTask
"""

kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask)
if wait_for_completion and integration_pattern:
raise ValueError(
"Only one of wait_for_completion and integration_pattern set. "
"Set wait_for_completion to False if you wish to use integration_pattern."
)

# The old implementation type still has to be supported until a new
# major is realeased.
if wait_for_completion:
integration_pattern = IntegrationPattern.WaitForCompletion
if not wait_for_completion and not integration_pattern:
integration_pattern = IntegrationPattern.CallAndContinue

is_integration_pattern_valid(integration_pattern,
self.supported_integration_patterns)
kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME,
EcsApi.RunTask,
integration_pattern)

super(EcsRunTaskStep, self).__init__(state_id, **kwargs)
8 changes: 3 additions & 5 deletions tests/unit/test_compute_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from unittest.mock import patch
from stepfunctions.steps.compute import LambdaStep, GlueStartJobRunStep, BatchSubmitJobStep, EcsRunTaskStep
from stepfunctions.steps.integration_resources import IntegrationPattern


@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
Expand Down Expand Up @@ -102,17 +103,15 @@ def test_batch_submit_job_step_creation():
@patch.object(boto3.session.Session, 'region_name', 'us-east-1')
def test_ecs_run_task_step_creation():
step = EcsRunTaskStep('Ecs Job', wait_for_completion=False)

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::ecs:runTask',
'End': True
}

step = EcsRunTaskStep('Ecs Job',
wait_for_callback=True,
integration_pattern=IntegrationPattern.WaitForTaskToken,
wait_for_completion=False)

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::ecs:runTask.waitForTaskToken',
Expand All @@ -122,7 +121,6 @@ def test_ecs_run_task_step_creation():
step = EcsRunTaskStep('Ecs Job', parameters={
'TaskDefinition': 'Task'
})

assert step.to_dict() == {
'Type': 'Task',
'Resource': 'arn:aws:states:::ecs:runTask.sync',
Expand All @@ -135,4 +133,4 @@ def test_ecs_run_task_step_creation():
with pytest.raises(ValueError):
step = EcsRunTaskStep('Ecs Job',
wait_for_completion=True,
wait_for_callback=True)
integration_pattern=IntegrationPattern.WaitForTaskToken)

0 comments on commit b09ee26

Please sign in to comment.