Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1763,12 +1763,41 @@ def _should_update_dag_next_dagruns(
return False
return True

def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dict[int, Backfill]:
"""
Lock Backfill rows to prevent race conditions when multiple schedulers run concurrently.

:param dag_runs: Collection of Dag runs to process
:param session: DB session
:return: Dict mapping backfill_id to locked Backfill objects
"""
if not (backfill_ids := {dr.backfill_id for dr in dag_runs if dr.backfill_id is not None}):
return {}

locked_backfills = {
b.id: b
for b in session.scalars(
select(Backfill).where(Backfill.id.in_(backfill_ids)).with_for_update(skip_locked=True)
)
}

if skipped_backfills := backfill_ids - locked_backfills.keys():
self.log.debug(
"Skipping backfill runs for backfill_ids=%s - locked by another scheduler",
skipped_backfills,
)

return locked_backfills

@add_debug_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running state."""
# added all() to save runtime, otherwise query is executed more than once
dag_runs: Collection[DagRun] = DagRun.get_queued_dag_runs_to_set_running(session).all()

# Lock backfills to prevent race conditions with concurrent schedulers
locked_backfills = self._lock_backfills(dag_runs, session)

query = (
select(
DagRun.dag_id,
Expand Down Expand Up @@ -1832,13 +1861,16 @@ def _update_state(dag: SerializedDAG, dag_run: DagRun):
dag_id = dag_run.dag_id
run_id = dag_run.run_id
backfill_id = dag_run.backfill_id
backfill = dag_run.backfill
dag = dag_run.dag = cached_get_dag(dag_run)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
active_runs = active_runs_of_dags[(dag_id, backfill_id)]
if backfill_id is not None:
if backfill_id not in locked_backfills:
# Another scheduler has this backfill locked, skip this run
continue
backfill = dag_run.backfill
if active_runs >= backfill.max_active_runs:
# todo: delete all "candidate dag runs" from list for this dag right now
self.log.info(
Expand Down
83 changes: 83 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5699,6 +5699,89 @@ def _running_counts():
assert session.scalar(select(func.count()).select_from(DagRun)) == 6
assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6

def test_backfill_runs_skipped_when_lock_held_by_another_scheduler(self, dag_maker, session):
"""Test that a scheduler skips backfill runs when another scheduler holds the lock."""
dag_id = "test_dag1"
backfill_max_active_runs = 3
dag_max_active_runs = 1

with dag_maker(
dag_id=dag_id,
start_date=DEFAULT_DATE,
schedule=timedelta(days=1),
max_active_runs=dag_max_active_runs,
catchup=True,
):
EmptyOperator(task_id="mytask")

from_date = pendulum.parse("2021-01-01")
to_date = pendulum.parse("2021-01-05")
_create_backfill(
dag_id=dag_id,
from_date=from_date,
to_date=to_date,
max_active_runs=backfill_max_active_runs,
reverse=False,
triggering_user_name="test_user",
dag_run_conf={},
)

queued_count = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag_id,
DagRun.state == State.QUEUED,
DagRun.run_type == DagRunType.BACKFILL_JOB,
)
.scalar()
)
assert queued_count == 5

scheduler_job = Job(executor=MockExecutor(do_update=False))
job_runner = SchedulerJobRunner(job=scheduler_job)

# Simulate another scheduler holding the lock by returning empty from _lock_backfills
with patch.object(job_runner, "_lock_backfills", return_value={}):
job_runner._start_queued_dagruns(session)
session.flush()

# No runs should be started because we couldn't acquire the lock
running_count = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag_id,
DagRun.state == State.RUNNING,
DagRun.run_type == DagRunType.BACKFILL_JOB,
)
.scalar()
)
assert running_count == 0, f"Expected 0 running when lock not acquired, but got {running_count}. "
# no locks now:
job_runner._start_queued_dagruns(session)
session.flush()

running_count = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag_id,
DagRun.state == State.RUNNING,
DagRun.run_type == DagRunType.BACKFILL_JOB,
)
.scalar()
)
assert running_count == backfill_max_active_runs
queued_count = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag_id,
DagRun.state == State.QUEUED,
DagRun.run_type == DagRunType.BACKFILL_JOB,
)
.scalar()
)
# 2 runs are still queued
assert queued_count == 2

def test_start_queued_dagruns_do_follow_logical_date_order(self, dag_maker):
session = settings.Session()
with dag_maker("test_dag1", max_active_runs=1):
Expand Down