From 589df29a348a194b6ff28a5553ac202cf00a0a1c Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 2 Dec 2025 12:11:00 +0100 Subject: [PATCH] Fix backfill max_active_runs race condition with concurrent schedulers (#58807) * Fix backfill max_active_runs race condition with concurrent schedulers When two schedulers run concurrently, both could start more backfill dag runs than max_active_runs allows. This happened because each scheduler read the count of running dag runs before either committed, causing both to see stale counts and start runs simultaneously. The fix adds row-level locking on the Backfill table. When a scheduler processes backfill dag runs, it first locks the relevant Backfill rows. If another scheduler already holds the lock, the current scheduler skips those backfills rather than potentially violating the max_active_runs constraint. This ensures that only one scheduler can process a given backfill's dag runs at a time, preventing the race condition while remaining non-blocking (schedulers don't wait on each other). (cherry picked from commit 22af27ea5a750ed92b203bd47846e5197cfda7fa) --- .../src/airflow/jobs/scheduler_job_runner.py | 34 +++++++- .../tests/unit/jobs/test_scheduler_job.py | 83 +++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 8468e7ab2d9c1..cc2968c8d8213 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1763,12 +1763,41 @@ def _should_update_dag_next_dagruns( return False return True + def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dict[int, Backfill]: + """ + Lock Backfill rows to prevent race conditions when multiple schedulers run concurrently. + + :param dag_runs: Collection of Dag runs to process + :param session: DB session + :return: Dict mapping backfill_id to locked Backfill objects + """ + if not (backfill_ids := {dr.backfill_id for dr in dag_runs if dr.backfill_id is not None}): + return {} + + locked_backfills = { + b.id: b + for b in session.scalars( + select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True) + ) + } + + if skipped_backfills := backfill_ids - locked_backfills.keys(): + self.log.debug( + "Skipping backfill runs for backfill_ids=%s - locked by another scheduler", + skipped_backfills, + ) + + return locked_backfills + @add_debug_span def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" # added all() to save runtime, otherwise query is executed more than once dag_runs: Collection[DagRun] = DagRun.get_queued_dag_runs_to_set_running(session).all() + # Lock backfills to prevent race conditions with concurrent schedulers + locked_backfills = self._lock_backfills(dag_runs, session) + query = ( select( DagRun.dag_id, @@ -1832,13 +1861,16 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): dag_id = dag_run.dag_id run_id = dag_run.run_id backfill_id = dag_run.backfill_id - backfill = dag_run.backfill dag = dag_run.dag = cached_get_dag(dag_run) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue active_runs = active_runs_of_dags[(dag_id, backfill_id)] if backfill_id is not None: + if backfill_id not in locked_backfills: + # Another scheduler has this backfill locked, skip this run + continue + backfill = dag_run.backfill if active_runs >= backfill.max_active_runs: # todo: delete all "candidate dag runs" from list for this dag right now self.log.info( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 4234fb27641f8..1ad6ffb9984c8 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5699,6 +5699,89 @@ def _running_counts(): assert session.scalar(select(func.count()).select_from(DagRun)) == 6 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6 + def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self, dag_maker, session): + """Test that a scheduler skips backfill runs when another scheduler holds the lock.""" + dag_id = "test_dag1" + backfill_max_active_runs = 3 + dag_max_active_runs = 1 + + with dag_maker( + dag_id=dag_id, + start_date=DEFAULT_DATE, + schedule=timedelta(days=1), + max_active_runs=dag_max_active_runs, + catchup=True, + ): + EmptyOperator(task_id="mytask") + + from_date = pendulum.parse("2021-01-01") + to_date = pendulum.parse("2021-01-05") + _create_backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=backfill_max_active_runs, + reverse=False, + triggering_user_name="test_user", + dag_run_conf={}, + ) + + queued_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.QUEUED, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert queued_count == 5 + + scheduler_job = Job(executor=MockExecutor(do_update=False)) + job_runner = SchedulerJobRunner(job=scheduler_job) + + # Simulate another scheduler holding the lock by returning empty from _lock_backfills + with patch.object(job_runner, "_lock_backfills", return_value={}): + job_runner._start_queued_dagruns(session) + session.flush() + + # No runs should be started because we couldn't acquire the lock + running_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert running_count == 0, f"Expected 0 running when lock not acquired, but got {running_count}. " + # no locks now: + job_runner._start_queued_dagruns(session) + session.flush() + + running_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert running_count == backfill_max_active_runs + queued_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.QUEUED, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + # 2 runs are still queued + assert queued_count == 2 + def test_start_queued_dagruns_do_follow_logical_date_order(self, dag_maker): session = settings.Session() with dag_maker("test_dag1", max_active_runs=1):