diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 8468e7ab2d9c1..cc2968c8d8213 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -1763,12 +1763,41 @@ def _should_update_dag_next_dagruns( return False return True + def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dict[int, Backfill]: + """ + Lock Backfill rows to prevent race conditions when multiple schedulers run concurrently. + + :param dag_runs: Collection of Dag runs to process + :param session: DB session + :return: Dict mapping backfill_id to locked Backfill objects + """ + if not (backfill_ids := {dr.backfill_id for dr in dag_runs if dr.backfill_id is not None}): + return {} + + locked_backfills = { + b.id: b + for b in session.scalars( + select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True) + ) + } + + if skipped_backfills := backfill_ids - locked_backfills.keys(): + self.log.debug( + "Skipping backfill runs for backfill_ids=%s - locked by another scheduler", + skipped_backfills, + ) + + return locked_backfills + @add_debug_span def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" # added all() to save runtime, otherwise query is executed more than once dag_runs: Collection[DagRun] = DagRun.get_queued_dag_runs_to_set_running(session).all() + # Lock backfills to prevent race conditions with concurrent schedulers + locked_backfills = self._lock_backfills(dag_runs, session) + query = ( select( DagRun.dag_id, @@ -1832,13 +1861,16 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun): dag_id = dag_run.dag_id run_id = dag_run.run_id backfill_id = dag_run.backfill_id - backfill = dag_run.backfill dag = dag_run.dag = cached_get_dag(dag_run) if not dag: self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) continue active_runs = active_runs_of_dags[(dag_id, backfill_id)] if backfill_id is not None: + if backfill_id not in locked_backfills: + # Another scheduler has this backfill locked, skip this run + continue + backfill = dag_run.backfill if active_runs >= backfill.max_active_runs: # todo: delete all "candidate dag runs" from list for this dag right now self.log.info( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 4234fb27641f8..1ad6ffb9984c8 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5699,6 +5699,89 @@ def _running_counts(): assert session.scalar(select(func.count()).select_from(DagRun)) == 6 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6 + def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self, dag_maker, session): + """Test that a scheduler skips backfill runs when another scheduler holds the lock.""" + dag_id = "test_dag1" + backfill_max_active_runs = 3 + dag_max_active_runs = 1 + + with dag_maker( + dag_id=dag_id, + start_date=DEFAULT_DATE, + schedule=timedelta(days=1), + max_active_runs=dag_max_active_runs, + catchup=True, + ): + EmptyOperator(task_id="mytask") + + from_date = pendulum.parse("2021-01-01") + to_date = pendulum.parse("2021-01-05") + _create_backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=backfill_max_active_runs, + reverse=False, + triggering_user_name="test_user", + dag_run_conf={}, + ) + + queued_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.QUEUED, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert queued_count == 5 + + scheduler_job = Job(executor=MockExecutor(do_update=False)) + job_runner = SchedulerJobRunner(job=scheduler_job) + + # Simulate another scheduler holding the lock by returning empty from _lock_backfills + with patch.object(job_runner, "_lock_backfills", return_value={}): + job_runner._start_queued_dagruns(session) + session.flush() + + # No runs should be started because we couldn't acquire the lock + running_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert running_count == 0, f"Expected 0 running when lock not acquired, but got {running_count}. " + # no locks now: + job_runner._start_queued_dagruns(session) + session.flush() + + running_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + assert running_count == backfill_max_active_runs + queued_count = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == State.QUEUED, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + # 2 runs are still queued + assert queued_count == 2 + def test_start_queued_dagruns_do_follow_logical_date_order(self, dag_maker): session = settings.Session() with dag_maker("test_dag1", max_active_runs=1):