diff --git a/airflow/api/common/airflow_health.py b/airflow/api/common/airflow_health.py index 5d37de540a498..fc270f4433fc9 100644 --- a/airflow/api/common/airflow_health.py +++ b/airflow/api/common/airflow_health.py @@ -18,6 +18,7 @@ from typing import Any +from airflow.configuration import conf from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner from airflow.jobs.scheduler_job_runner import SchedulerJobRunner from airflow.jobs.triggerer_job_runner import TriggererJobRunner @@ -61,7 +62,7 @@ def get_airflow_health() -> dict[str, Any]: try: latest_dag_processor_job = DagProcessorJobRunner.most_recent_job() - if latest_dag_processor_job: + if conf.getboolean("scheduler", "standalone_dag_processor") and latest_dag_processor_job: latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat() if latest_dag_processor_job.is_alive(): dag_processor_status = HEALTHY diff --git a/tests/api/common/test_airflow_health.py b/tests/api/common/test_airflow_health.py index ebdc086c69277..0a060af1b0fd9 100644 --- a/tests/api/common/test_airflow_health.py +++ b/tests/api/common/test_airflow_health.py @@ -47,11 +47,12 @@ def test_get_airflow_health_only_metadatabase_healthy( assert health_status == expected_status +@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True) @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception) @patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception) @patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception) def test_get_airflow_health_metadatabase_unhealthy( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + get_boolean_mock, latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock ): health_status = get_airflow_health() @@ -65,6 +66,11 @@ def test_get_airflow_health_metadatabase_unhealthy( assert health_status == expected_status +def test_get_airflow_health_no_dag_processor(): + health_status = get_airflow_health() + assert health_status["dag_processor"] == {"status": None, "latest_dag_processor_heartbeat": None} + + LATEST_SCHEDULER_JOB_MOCK = MagicMock() LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now() LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True) @@ -103,6 +109,7 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer( LATEST_DAG_PROCESSOR_JOB_MOCK.is_alive = MagicMock(return_value=True) +@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True) @patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None) @patch( "airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", @@ -113,7 +120,7 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer( return_value=LATEST_DAG_PROCESSOR_JOB_MOCK, ) def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record( - latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock + get_boolean_mock, latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock ): health_status = get_airflow_health()