From 81923b1ddbf886a1ad04862c31c5ee7778f87dc9 Mon Sep 17 00:00:00 2001 From: Pierluigi Dell'Arciprete Date: Tue, 9 Dec 2025 11:39:36 +0100 Subject: [PATCH 1/4] explicitly exclude tasks that are already in the `SCHEDULED` state in the `WHERE` clause --- airflow-core/src/airflow/models/dagrun.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 89999bdbf29f0..210c9aea51954 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -2081,6 +2081,7 @@ def schedule_tis( result = session.execute( update(TI) .where(TI.id.in_(id_chunk)) + .where(TI.state.in_(SCHEDULEABLE_STATES)) .values( state=TaskInstanceState.SCHEDULED, scheduled_dttm=timezone.utcnow(), From 929518e4cea183d1ad9e342f4cbf850bcbdfb3f5 Mon Sep 17 00:00:00 2001 From: Pierluigi Dell'Arciprete Date: Sat, 13 Dec 2025 23:09:25 +0100 Subject: [PATCH 2/4] change where condition --- airflow-core/src/airflow/models/dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 210c9aea51954..a13c1e85ec0c0 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -2081,7 +2081,7 @@ def schedule_tis( result = session.execute( update(TI) .where(TI.id.in_(id_chunk)) - .where(TI.state.in_(SCHEDULEABLE_STATES)) + .where(TI.state != TaskInstanceState.SCHEDULED) .values( state=TaskInstanceState.SCHEDULED, scheduled_dttm=timezone.utcnow(), From 87f6c00eb7343778a83901654dd8dca8720465cf Mon Sep 17 00:00:00 2001 From: Pierluigi Dell'Arciprete Date: Sun, 14 Dec 2025 20:05:02 +0100 Subject: [PATCH 3/4] fix where condition to include null cases --- airflow-core/src/airflow/models/dagrun.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index a13c1e85ec0c0..fd9f214e83be2 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -119,6 +119,8 @@ log = structlog.get_logger(__name__) +# Create a set without None for the IN clause +NON_NULL_SCHEDULABLE_STATES = SCHEDULEABLE_STATES - {None} class TISchedulingDecision(NamedTuple): """Type of return for DagRun.task_instance_scheduling_decisions.""" @@ -2081,7 +2083,12 @@ def schedule_tis( result = session.execute( update(TI) .where(TI.id.in_(id_chunk)) - .where(TI.state != TaskInstanceState.SCHEDULED) + .where( + or_( + TI.state.is_(None), + TI.state.in_(NON_NULL_SCHEDULABLE_STATES) + ) + ) .values( state=TaskInstanceState.SCHEDULED, scheduled_dttm=timezone.utcnow(), From 0256413ff7bd9bc77066b18dec4a266be26cb74b Mon Sep 17 00:00:00 2001 From: Pierluigi Dell'Arciprete Date: Mon, 15 Dec 2025 09:39:58 +0100 Subject: [PATCH 4/4] fix pre-commit check --- airflow-core/src/airflow/models/dagrun.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index fd9f214e83be2..5da5d9c856525 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -122,6 +122,7 @@ # Create a set without None for the IN clause NON_NULL_SCHEDULABLE_STATES = SCHEDULEABLE_STATES - {None} + class TISchedulingDecision(NamedTuple): """Type of return for DagRun.task_instance_scheduling_decisions.""" @@ -2083,12 +2084,7 @@ def schedule_tis( result = session.execute( update(TI) .where(TI.id.in_(id_chunk)) - .where( - or_( - TI.state.is_(None), - TI.state.in_(NON_NULL_SCHEDULABLE_STATES) - ) - ) + .where(or_(TI.state.is_(None), TI.state.in_(NON_NULL_SCHEDULABLE_STATES))) .values( state=TaskInstanceState.SCHEDULED, scheduled_dttm=timezone.utcnow(),