Skip to content

Commit

Permalink
Add test for behavior for paused backfill (apache#42837)
Browse files Browse the repository at this point in the history
-----
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
dstandish authored and PaulKobow7536 committed Oct 24, 2024
1 parent a58763b commit 3b17ce8
Showing 1 changed file with 89 additions and 1 deletion.
90 changes: 89 additions & 1 deletion tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel
from airflow.models.backfill import _create_backfill
from airflow.models.backfill import Backfill, _create_backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -4960,6 +4960,94 @@ def _running_counts():
assert session.scalar(select(func.count()).select_from(DagRun)) == 46
assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36

@pytest.mark.parametrize(
"pause_it, expected_running",
[
(True, 0),
(False, 3),
],
)
def test_backfill_runs_not_started_when_backfill_paused(
self, pause_it, expected_running, dag_maker, session
):
"""
When backfill is paused, will not start.
"""
dag1_dag_id = "test_dag1"
with dag_maker(
dag_id=dag1_dag_id,
start_date=DEFAULT_DATE,
schedule=timedelta(days=1),
max_active_runs=1,
):
EmptyOperator(task_id="mytask")

def _running_counts():
dag1_non_b_running = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag1_dag_id,
DagRun.state == State.RUNNING,
DagRun.run_type != DagRunType.BACKFILL_JOB,
)
.scalar()
)
dag1_b_running = (
session.query(func.count(DagRun.id))
.filter(
DagRun.dag_id == dag1_dag_id,
DagRun.state == State.RUNNING,
DagRun.run_type == DagRunType.BACKFILL_JOB,
)
.scalar()
)
total_running_count = (
session.query(func.count(DagRun.id)).filter(DagRun.state == State.RUNNING).scalar()
)
return dag1_non_b_running, dag1_b_running, total_running_count

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

from_date = pendulum.parse("2021-01-01")
to_date = pendulum.parse("2021-01-06")
b = _create_backfill(
dag_id=dag1_dag_id,
from_date=from_date,
to_date=to_date,
max_active_runs=3,
reverse=False,
dag_run_conf={},
)
dag1_non_b_running, dag1_b_running, total_running = _running_counts()

# initial state -- nothing is running
assert dag1_non_b_running == 0
assert dag1_b_running == 0
assert total_running == 0
assert session.query(func.count(DagRun.id)).scalar() == 6
assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6

if pause_it:
b = session.get(Backfill, b.id)
b.is_paused = True

session.commit()

# now let's run scheduler once
self.job_runner._start_queued_dagruns(session)
session.flush()

assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20
dag1_non_b_running, dag1_b_running, total_running = _running_counts()
assert dag1_non_b_running == 0
assert dag1_b_running == expected_running
assert total_running == expected_running
assert session.scalar(select(func.count()).select_from(DagRun)) == 6
assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 6

def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
session = settings.Session()
with dag_maker("test_dag1", max_active_runs=1):
Expand Down

0 comments on commit 3b17ce8

Please sign in to comment.