Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented May 28, 2025

Resolves #50300 (comment)

When using dag.test() with deferred tasks, tasks that complete their trigger execution were incorrectly being set to SUCCESS state instead of SCHEDULED state. This prevented task resumption!

Dag used to test:

from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
@kaxil kaxil requested a review from vincbeck May 28, 2025 22:58
@kaxil kaxil requested review from amoghrajesh and ashb as code owners May 28, 2025 22:58
@kaxil kaxil added backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch and removed area:CLI area:task-sdk labels May 28, 2025
@kaxil kaxil added this to the Airflow 3.0.2 milestone May 28, 2025
@kaxil kaxil mentioned this pull request May 28, 2025
3 tasks
@kaxil kaxil merged commit 1b83f71 into apache:main May 29, 2025
77 checks passed
@kaxil kaxil deleted the fix-dag-test-bug branch May 29, 2025 11:18
github-actions bot pushed a commit that referenced this pull request May 29, 2025
When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@github-actions
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request May 29, 2025
)

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
kaxil added a commit that referenced this pull request May 29, 2025
…51199)

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@vincbeck
Copy link
Contributor

Thank you!

kaxil added a commit that referenced this pull request Jun 3, 2025
…51199)

When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
(cherry picked from commit 1b83f71)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
sanederchik pushed a commit to sanederchik/airflow that referenced this pull request Jun 7, 2025
When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
jose-lehmkuhl pushed a commit to jose-lehmkuhl/airflow that referenced this pull request Jul 11, 2025
When using `dag.test()` with deferred tasks, tasks that complete their trigger execution were incorrectly being set to `SUCCESS` state instead of `SCHEDULED` state. This prevented task resumption!

Dag used to test:

```python
from datetime import datetime, timedelta, timezone
from typing import Any

import pendulum

from airflow.providers.standard.triggers.temporal import DateTimeTrigger
from airflow.sdk import Context, task, BaseOperator, DAG

class DummyOperator(BaseOperator):

    def execute(self, context: Context):
        self.defer(
            trigger=DateTimeTrigger(
                moment=datetime.now(timezone.utc) + timedelta(seconds=2),
            ),
            method_name="execute_complet",
        )

    def execute_complet(self, context: Context, event: Any = None):
        assert event is not None
        return "test"

@task
def dummy_task(param):
    print("DEBUG")
    assert param == "test", "Parameter should be 'test'"

with DAG(
    dag_id="example_debug",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = DummyOperator(task_id="task1")
    task2 = dummy_task(task1.output)
    task1 >> task2

if __name__ == "__main__":
    dag.test()
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants