Skip to content

Deferrable ExternalTaskSensor doesn't correctly wait on external_task_group_id #39616

@nathadfield

Description

@nathadfield

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Using a deferred ExternalTaskSensor to wait against a TaskGroup will not complete until the external_dag_id specified in the sensor is complete.

What you think should happen instead?

Looking at the code it would appear that the call to WorkflowTrigger is missing the external_task_group_id arguments.

https://github.com/apache/airflow/blob/main/airflow/sensors/external_task.py#L352-L358

How to reproduce

This has issue has been identified against main using breeze`.

breeze --python 3.10 --backend postgres start-airflow

The first DAG creates a TaskGroup that contains three simple Bash operators that wait for a short period, followed by another that waits for a longer period.

from datetime import datetime
from airflow import models

from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup

with models.DAG(
    dag_id='task_group_dag',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule='0 0 * * *',
) as dag:

    with TaskGroup('sleep_group') as task_group:
        sleep1 = BashOperator(
            task_id='sleep1',
            bash_command='sleep 10'
        )

        sleep2 = BashOperator(
            task_id='sleep2',
            bash_command='sleep 15'
        )

        sleep3 = BashOperator(
            task_id='sleep3',
            bash_command='sleep 20'
        )

    sleep4 = BashOperator(
        task_id='sleep4',
        bash_command='sleep 120'
    )

    task_group >> sleep4

The second DAG contains a deferred ExternalTaskSensor that waits for completion of the sleep_group.

from datetime import datetime
from airflow import models

from airflow.sensors.external_task import ExternalTaskSensor

with models.DAG(
    dag_id='sensor_dag',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule='0 0 * * *',
) as dag:

    wait = ExternalTaskSensor(
        task_id='wait',
        external_dag_id='task_group_dag',
        external_task_group_id='sleep_group',
        deferrable=True
    )

If both of these tasks are started you will observe that, although the sleep_group finishes after 20 seconds, the wait task in sensor_dag is still in a deferred state.

Screenshot 2024-05-14 at 11 58 25

Screenshot 2024-05-14 at 11 58 41

Only once all the tasks in task_group_dag have completed will the sensor report a success.

Screenshot 2024-05-14 at 12 00 35

Removing deferrable=True results in the correct behaviour.

Operating System

n/a

Versions of Apache Airflow Providers

n/a

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions