diff --git a/src/stepfunctions/steps/compute.py b/src/stepfunctions/steps/compute.py index 203ed47..3b8e450 100644 --- a/src/stepfunctions/steps/compute.py +++ b/src/stepfunctions/steps/compute.py @@ -166,11 +166,12 @@ class EcsRunTaskStep(Task): Creates a Task State to run Amazon ECS or Fargate Tasks. See `Manage Amazon ECS or Fargate Tasks with Step Functions `_ for more details. """ - def __init__(self, state_id, wait_for_completion=True, **kwargs): + def __init__(self, state_id, wait_for_completion=True, wait_for_callback=False, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. wait_for_completion(bool, optional): Boolean value set to `True` if the Task state should wait for the ecs job to complete before proceeding to the next step in the workflow. Set to `False` if the Task state should submit the ecs job and proceed to the next step. (default: True) + wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. @@ -181,7 +182,18 @@ def __init__(self, state_id, wait_for_completion=True, **kwargs): result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ - if wait_for_completion: + if wait_for_completion and wait_for_callback: + raise ValueError("Only one of wait_for_completion and wait_for_callback can be true") + + if wait_for_callback: + """ + Example resource arn: arn:aws:states:::ecs:runTask.waitForTaskToken + """ + + kwargs[Field.Resource.value] = get_service_integration_arn(ECS_SERVICE_NAME, + EcsApi.RunTask, + IntegrationPattern.WaitForTaskToken) + elif wait_for_completion: """ Example resource arn: arn:aws:states:::ecs:runTask.sync """ diff --git a/tests/unit/test_compute_steps.py b/tests/unit/test_compute_steps.py index 368010a..bf8fb0f 100644 --- a/tests/unit/test_compute_steps.py +++ b/tests/unit/test_compute_steps.py @@ -109,6 +109,14 @@ def test_ecs_run_task_step_creation(): 'End': True } + step = EcsRunTaskStep('Ecs Job', wait_for_callback=True) + + assert step.to_dict() == { + 'Type': 'Task', + 'Resource': 'arn:aws:states:::ecs:runTask.waitForTaskToken', + 'End': True + } + step = EcsRunTaskStep('Ecs Job', parameters={ 'TaskDefinition': 'Task' }) @@ -121,3 +129,8 @@ def test_ecs_run_task_step_creation(): }, 'End': True } + + with pytest.raises(ValueError): + step = EcsRunTaskStep('Ecs Job', + wait_for_completion=True, + wait_for_callback=True)