From 3b17ce85672e9ec4b6a510d21f5ec2359b1713ce Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 10 Oct 2024 14:40:10 -0700 Subject: [PATCH] Add test for behavior for paused backfill (#42837) ----- Co-authored-by: Kaxil Naik --- tests/jobs/test_scheduler_job.py | 90 +++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 639a3528fad8c..78c3acdce0de6 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -53,7 +53,7 @@ from airflow.jobs.local_task_job_runner import LocalTaskJobRunner from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel -from airflow.models.backfill import _create_backfill +from airflow.models.backfill import Backfill, _create_backfill from airflow.models.dag import DAG, DagModel from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun @@ -4960,6 +4960,94 @@ def _running_counts(): assert session.scalar(select(func.count()).select_from(DagRun)) == 46 assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 + @pytest.mark.parametrize( + "pause_it, expected_running", + [ + (True, 0), + (False, 3), + ], + ) + def test_backfill_runs_not_started_when_backfill_paused( + self, pause_it, expected_running, dag_maker, session + ): + """ + When backfill is paused, will not start. + """ + dag1_dag_id = "test_dag1" + with dag_maker( + dag_id=dag1_dag_id, + start_date=DEFAULT_DATE, + schedule=timedelta(days=1), + max_active_runs=1, + ): + EmptyOperator(task_id="mytask") + + def _running_counts(): + dag1_non_b_running = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag1_dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type != DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + dag1_b_running = ( + session.query(func.count(DagRun.id)) + .filter( + DagRun.dag_id == dag1_dag_id, + DagRun.state == State.RUNNING, + DagRun.run_type == DagRunType.BACKFILL_JOB, + ) + .scalar() + ) + total_running_count = ( + session.query(func.count(DagRun.id)).filter(DagRun.state == State.RUNNING).scalar() + ) + return dag1_non_b_running, dag1_b_running, total_running_count + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + scheduler_job.executor = MockExecutor(do_update=False) + self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + from_date = pendulum.parse("2021-01-01") + to_date = pendulum.parse("2021-01-06") + b = _create_backfill( + dag_id=dag1_dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=3, + reverse=False, + dag_run_conf={}, + ) + dag1_non_b_running, dag1_b_running, total_running = _running_counts() + + # initial state -- nothing is running + assert dag1_non_b_running == 0 + assert dag1_b_running == 0 + assert total_running == 0 + assert session.query(func.count(DagRun.id)).scalar() == 6 + assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6 + + if pause_it: + b = session.get(Backfill, b.id) + b.is_paused = True + + session.commit() + + # now let's run scheduler once + self.job_runner._start_queued_dagruns(session) + session.flush() + + assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20 + dag1_non_b_running, dag1_b_running, total_running = _running_counts() + assert dag1_non_b_running == 0 + assert dag1_b_running == expected_running + assert total_running == expected_running + assert session.scalar(select(func.count()).select_from(DagRun)) == 6 + assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6 + def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker): session = settings.Session() with dag_maker("test_dag1", max_active_runs=1):