Skip to content

Commit

Permalink
Fix class instance vs. class type in validate_database_executor_compa…
Browse files Browse the repository at this point in the history
…tibility() call (#40626)

* Fix class instance vs. class type in check call

* Fix class instance vs. class type in check call, adjust pytests
  • Loading branch information
jscheffl authored Jul 7, 2024
1 parent e827bfb commit e7dedbe
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
2 changes: 1 addition & 1 deletion airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _run_scheduler_job(args) -> None:
job_runner = SchedulerJobRunner(
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor.__class__)
InternalApiConfig.force_database_direct_access()
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
Expand Down
48 changes: 44 additions & 4 deletions tests/cli/commands/test_scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ def setup_class(cls):
("LocalKubernetesExecutor", True),
],
)
@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_serve_logs_on_scheduler(
self,
mock_process,
mock_scheduler_job,
mock_validate,
executor,
expect_serve_logs,
):
Expand All @@ -70,10 +75,14 @@ def test_serve_logs_on_scheduler(
with pytest.raises(AssertionError):
mock_process.assert_has_calls([mock.call(target=serve_logs)])

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"])
def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor):
def test_skip_serve_logs(self, mock_process, mock_scheduler_job, mock_validate, executor):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler", "--skip-serve-logs"])
with conf_vars({("core", "executor"): executor}):
Expand All @@ -82,34 +91,50 @@ def test_skip_serve_logs(self, mock_process, mock_scheduler_job, executor):
with pytest.raises(AssertionError):
mock_process.assert_has_calls([mock.call(target=serve_logs)])

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.utils.db.check_and_run_migrations")
@mock.patch("airflow.utils.db.synchronize_log_template")
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_check_migrations_is_false(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration):
def test_check_migrations_is_false(
self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate
):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("database", "check_migrations"): "False"}):
scheduler_command.scheduler(args)
mock_run_migration.assert_not_called()
mock_log.assert_called_once()

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.utils.db.check_and_run_migrations")
@mock.patch("airflow.utils.db.synchronize_log_template")
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_check_migrations_is_true(self, mock_process, mock_scheduler_job, mock_log, mock_run_migration):
def test_check_migrations_is_true(
self, mock_process, mock_scheduler_job, mock_log, mock_run_migration, mock_validate
):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("database", "check_migrations"): "True"}):
scheduler_command.scheduler(args)
mock_run_migration.assert_called_once()
mock_log.assert_called_once()

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@pytest.mark.parametrize("executor", ["LocalExecutor", "SequentialExecutor"])
def test_graceful_shutdown(self, mock_process, mock_scheduler_job, executor):
def test_graceful_shutdown(self, mock_process, mock_scheduler_job, mock_validate, executor):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
with conf_vars({("core", "executor"): executor}):
Expand All @@ -120,25 +145,35 @@ def test_graceful_shutdown(self, mock_process, mock_scheduler_job, executor):
finally:
mock_process().terminate.assert_called()

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_enable_scheduler_health(
self,
mock_process,
mock_scheduler_job,
mock_validate,
):
with conf_vars({("scheduler", "enable_health_check"): "True"}):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
scheduler_command.scheduler(args)
mock_process.assert_has_calls([mock.call(target=serve_health_check)])

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
def test_disable_scheduler_health(
self,
mock_process,
mock_scheduler_job,
mock_validate,
):
mock_scheduler_job.return_value.job_type = "SchedulerJob"
args = self.parser.parse_args(["scheduler"])
Expand All @@ -162,6 +197,10 @@ def test_scheduler_health_host(
serve_health_check()
assert http_server_mock.call_args.args[0] == (health_check_host, health_check_port)

@mock.patch(
"airflow.cli.commands.scheduler_command.ExecutorLoader.validate_database_executor_compatibility",
side_effect=None,
)
@mock.patch("airflow.cli.commands.scheduler_command.SchedulerJobRunner")
@mock.patch("airflow.cli.commands.scheduler_command.Process")
@mock.patch("airflow.cli.commands.scheduler_command.run_job", side_effect=Exception("run_job failed"))
Expand All @@ -170,6 +209,7 @@ def test_run_job_exception_handling(
mock_run_job,
mock_process,
mock_scheduler_job,
mock_validate,
):
args = self.parser.parse_args(["scheduler"])
with pytest.raises(Exception, match="run_job failed"):
Expand Down

0 comments on commit e7dedbe

Please sign in to comment.