Skip to content

Commit

Permalink
Mock _executor_names
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed May 16, 2024
1 parent 5b94349 commit 30fd161
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 2 additions & 0 deletions python-sdk/src/astro/sql/operators/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ def _get_executor_from_job_id(job_id: int) -> str | None:

with create_session() as session:
job = session.get(Job, job_id)
if job.executor_class is None and job.executor:
return type(job.executor).__name__
return job.executor_class if job else None

def get_all_task_outputs(self, context: Context) -> list[BaseTable]:
Expand Down
6 changes: 4 additions & 2 deletions python-sdk/tests/sql/operators/test_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ def test_error_raised_with_blocking_op_executors(
(None, "SequentialExecutor", True),
],
)
def test_single_worker_mode_backfill(executor_in_job, executor_in_cfg, expected_val):
def test_single_worker_mode_backfill(monkeypatch, executor_in_job, executor_in_cfg, expected_val):
"""Test that if we run Backfill Job it should be marked as single worker node"""
from airflow.jobs.backfill_job_runner import BackfillJobRunner
from airflow.jobs.job import Job

monkeypatch.setattr("airflow.executors.executor_loader._executor_names", [])
dag = DAG("test_single_worker_mode_backfill", start_date=datetime(2022, 1, 1))
dr = DagRun(dag_id=dag.dag_id)

Expand Down Expand Up @@ -181,11 +182,12 @@ def test_single_worker_mode_backfill_airflow_2_5(executor_in_job, executor_in_cf
(None, "SequentialExecutor", True),
],
)
def test_single_worker_mode_scheduler_job(executor_in_job, executor_in_cfg, expected_val):
def test_single_worker_mode_scheduler_job(monkeypatch, executor_in_job, executor_in_cfg, expected_val):
"""Test that if we run Scheduler Job it should be marked as single worker node"""
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner

monkeypatch.setattr("airflow.executors.executor_loader._executor_names", [])
dag = DAG("test_single_worker_mode_scheduler_job", start_date=datetime(2022, 1, 1))
dr = DagRun(dag_id=dag.dag_id)

Expand Down

0 comments on commit 30fd161

Please sign in to comment.