Skip to content

Conversation

@dominikhei
Copy link
Contributor

@dominikhei dominikhei commented Jun 13, 2025


closes: #51429

The following adjustments make sure that the container name is only polled, if it is actually required (awslogs_group and awslogs_stream_prefix are set and we thus want to see the logs). Moreover I have used the describe_tasksmethod of boto3 to poll them after a 10s and 30s retry, to account for delays until the task has reached RUNNING. Also the container_name is safely accessed to avoid throwing an error.

In case the name is required and cannot be retrieved an info is logged.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jun 13, 2025
@dominikhei dominikhei marked this pull request as ready for review June 13, 2025 15:01
@eladkal eladkal requested review from ferruzzi and vincbeck June 13, 2025 16:56
@seanghaeli
Copy link
Contributor

seanghaeli commented Jul 8, 2025

Reviewing the code. It would be good if you can provide an example of a DAG with the EcsRunTaskOperator so I can test it on the main branch, see it fail, and then try again on this branch.

The Airflow Ecs docs may be out of date because when I follow the example the API fails with an Attribute error, so something about my call is invalid even though I'm using the default example. Seeing an example would help.

@dominikhei
Copy link
Contributor Author

Reviewing the code. It would be good if you can provide an example of a DAG with the EcsRunTaskOperator so I can test it on the main branch, see it fail, and then try again on this branch.

The Airflow Ecs docs may be out of date because when I follow the example the API fails with an Attribute error, so something about my call is invalid even though I'm using the default example. Seeing an example would help.

I think it is tricky to create a sample dag in this case, as we depend on whether ECS returns a container name with the call of run_task. What you need is a EcsRunTaskOperator with no container_name specified and awslogs_group and awslogs_stream_prefix set. It's not trivial to reproduce, as the container needs to be in e.g PENDING upon the call. But let me know if I overlook something.

@seanghaeli
Copy link
Contributor

I see. Okay, trying to reproduce. The main thing I'm trying is to beef up the resources like cpu and memory for the task to take longer to start. Any other ideas of what else I can change to make the container be in PENDING for longer?

Another question I have is: to what extent is your proposed change overlapping with the intended functionality of EcsTaskDefinitionStateSensor, which does the following: "Poll task definition until it reaches a terminal state."

Wouldn't a good, already viable, fix in this situation be for the user to have EcsTaskDefinitionStateSensor ahead of the EcsRunTaskOperator so that the Operator will not execute while the task is in PENDING?

@dominikhei
Copy link
Contributor Author

dominikhei commented Jul 9, 2025

TheEcsTaskDefinitionStateSensor waits for a task definition to be in ACTIVE state. The task itself can be in PENDING state or any other intermediary independent of the definition and needs to be executed first. What's our problem right now (I hope 😄 ) is that when we invoke the boto3 method run_task (so we are talking about the task itself not the definition), the response returned does not contain the container_name, which becomes a problem if we want to use logs from cloud watch in Airflow. There also is an EcsTaskStateSensor,which waits for an ECS Task to be in state x, however as our problem is occurring internally within the operator when we want to run this task, won't help much.

We could also think about making container_name a required param, but I am not a huge fan of that, or simply adding some more warnings, such that users understand clearer why their task is failing.

@seanghaeli
Copy link
Contributor

I see. So just to confirm: the core of the issue here is that, while EcsRegisterTaskDefinitionOperator registers a task definition, EcsRunTaskOperator actually spins up an instance based on that task definition, but it doesn't necessarily wait for its associated containers to be in a terminal state before moving on, which can cause an empty list of containers to be seen.

@dominikhei
Copy link
Contributor Author

I see. So just to confirm: the core of the issue here is that, while EcsRegisterTaskDefinitionOperator registers a task definition, EcsRunTaskOperator actually spins up an instance based on that task definition, but it doesn't necessarily wait for its associated containers to be in a terminal state before moving on, which can cause an empty list of containers to be seen.

The RunTaskOperator executes a task definition, yes. Our problem is: the operator does not require a container name to be specified. However when you want to ship logs from cloud watch to airflow and set awslogs_group and awslogs_stream_prefix a container name is required, as the prefix always contains a container name.

The current solution was to simply get the container name from the return of ECS.Client.run_task(**kwargs). In most cases this works perfectly fine. But in some edge cases, as the ECS endpoints are eventually consistent and the task is in an intermediary state, the container name might not be part of the response. If awslogs_group and awslogs_stream_prefix are not set, getting the container name is now skipped as it is not required. If they are set and no container name is specified we use ECS.Client.describe_task(**kwargs) to get the container name, with some delay to account for the intermediary states.

The issue is related to the change of the cloud watch log stream prefixes introduced a while ago.

@seanghaeli
Copy link
Contributor

Ah okay. It's quite an edge case so I can't find a way to reproduce, I think your fix and tests look good. But I have one more thing I'd like to clarify:

The original PR is concerned with situations where the API response looks like the following:

{
    "tasks": [
        {
            "attachments": [],
            "capacityProviderName": "Infra-ECS-Cluster-***-EC2CapacityProvider-***",
            "clusterArn": "arn:aws:ecs:***:***:cluster/***",
            "containers": [],
            "cpu": "1024",
            "createdAt": datetime.datetime(
                2025, 6, 5, 8, 55, 51, 381000, tzinfo=tzlocal()
            ),
            "desiredStatus": "RUNNING",
            "enableExecuteCommand": False,
            "group": "family:***",
            "lastStatus": "PROVISIONING",
            "launchType": "EC2",
            "memory": "4096",
            "overrides": {
                "containerOverrides": [{ ... }],
                "inferenceAcceleratorOverrides": [],
            },
            "startedBy": "airflow",
            "tags": [],
            "taskArn": "arn:aws:ecs:***:***:task/***/***",
            "taskDefinitionArn": "arn:aws:ecs:***:***:task-definition/***",
            "version": 1,
        }
    ],
    "failures": [],
    "ResponseMetadata": {
        "RequestId": "***",
        "HTTPStatusCode": 200,
        "HTTPHeaders": {
            "x-amzn-requestid": "***",
            "content-type": "application/x-amz-json-1.1",
            "content-length": "1360",
            "date": "Thu, 05 Jun 2025 08:55:51 GMT",
        },
        "RetryAttempts": 0,
    },
}

where the response has an empty array in the 'containers' field.

But in your test case, there is a populated containers field, it just doesn't have a name. Has the scope of the issue changed, or should you also be testing the case where the whole 'containers' field is empty?

@dominikhei
Copy link
Contributor Author

Ah okay. It's quite an edge case so I can't find a way to reproduce, I think your fix and tests look good. But I have one more thing I'd like to clarify:

The original PR is concerned with situations where the API response looks like the following:

{
    "tasks": [
        {
            "attachments": [],
            "capacityProviderName": "Infra-ECS-Cluster-***-EC2CapacityProvider-***",
            "clusterArn": "arn:aws:ecs:***:***:cluster/***",
            "containers": [],
            "cpu": "1024",
            "createdAt": datetime.datetime(
                2025, 6, 5, 8, 55, 51, 381000, tzinfo=tzlocal()
            ),
            "desiredStatus": "RUNNING",
            "enableExecuteCommand": False,
            "group": "family:***",
            "lastStatus": "PROVISIONING",
            "launchType": "EC2",
            "memory": "4096",
            "overrides": {
                "containerOverrides": [{ ... }],
                "inferenceAcceleratorOverrides": [],
            },
            "startedBy": "airflow",
            "tags": [],
            "taskArn": "arn:aws:ecs:***:***:task/***/***",
            "taskDefinitionArn": "arn:aws:ecs:***:***:task-definition/***",
            "version": 1,
        }
    ],
    "failures": [],
    "ResponseMetadata": {
        "RequestId": "***",
        "HTTPStatusCode": 200,
        "HTTPHeaders": {
            "x-amzn-requestid": "***",
            "content-type": "application/x-amz-json-1.1",
            "content-length": "1360",
            "date": "Thu, 05 Jun 2025 08:55:51 GMT",
        },
        "RetryAttempts": 0,
    },
}

where the response has an empty array in the 'containers' field.

But in your test case, there is a populated containers field, it just doesn't have a name. Has the scope of the issue changed, or should you also be testing the case where the whole 'containers' field is empty?

Good catch! Will adjust the test case, thanks

@dominikhei dominikhei force-pushed the ecs-describe-tasks branch from 1c8222b to 8467c32 Compare July 11, 2025 22:37
Copy link
Contributor

@seanghaeli seanghaeli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Difficult to reproduce in a DAG, but I'm happy with the proposed change and unit test coverage.

Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I can't tell if this will fix thew issue or not, but it looks like he changes are scoped tightly enough behind the if blocks that it shouldn't break anything else.

@dominikhei
Copy link
Contributor Author

thanks for having a look! I agree with both of you, it's tricky to test. That said, based on my experience, 40 seconds is usually sufficient to exit transient states in ECS, even when dealing with large images and similar overhead. But of course, it's not an exact science...

@github-actions
Copy link

github-actions bot commented Sep 6, 2025

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 6, 2025
@seanghaeli
Copy link
Contributor

@ferruzzi I think we should merge this before it is deleted.

@ferruzzi ferruzzi merged commit 230da3e into apache:main Sep 6, 2025
75 checks passed
mangal-vairalkar pushed a commit to mangal-vairalkar/airflow that referenced this pull request Sep 7, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Sep 8, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Sep 30, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 1, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 2, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
…nse (apache#51692)

* Added logic to safely access the container name only if its required and poll multiple times in case the task is not yet active

* Adujusted the test case to better reflect actual ecs response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

EcsRunTaskOperator fails when no containers are provided in the response

3 participants