diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index b55098f1a1098..11d4b92d0cdf8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -237,7 +237,8 @@ def register_signals(self) -> ExitStack: def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None: """Clean up processor_agent to avoid leaving orphan processes.""" - self._end_active_spans() + if self._is_tracing_enabled(): + self._end_active_spans() if not _is_parent_process(): # Only the parent process should perform the cleanup. @@ -722,6 +723,20 @@ def _process_task_event_logs(log_records: deque[Log], session: Session): objects = (log_records.popleft() for _ in range(len(log_records))) session.bulk_save_objects(objects=objects, preserve_order=False) + @staticmethod + def _is_metrics_enabled(): + return any( + [ + conf.getboolean("metrics", "statsd_datadog_enabled", fallback=False), + conf.getboolean("metrics", "statsd_on", fallback=False), + conf.getboolean("metrics", "otel_on", fallback=False), + ] + ) + + @staticmethod + def _is_tracing_enabled(): + return conf.getboolean("traces", "otel_on") + def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: return SchedulerJobRunner.process_executor_events( executor=executor, @@ -1185,15 +1200,17 @@ def _run_scheduler_loop(self) -> None: self._mark_backfills_complete, ) - timers.call_regular_interval( - conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), - self._emit_pool_metrics, - ) + if self._is_metrics_enabled() or self._is_tracing_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0), + self._emit_pool_metrics, + ) - timers.call_regular_interval( - conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), - self._emit_running_ti_metrics, - ) + if self._is_metrics_enabled(): + timers.call_regular_interval( + conf.getfloat("scheduler", "running_metrics_interval", fallback=30.0), + self._emit_running_ti_metrics, + ) timers.call_regular_interval( conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0), @@ -1237,7 +1254,8 @@ def _run_scheduler_loop(self) -> None: ) with create_session() as session: - self._end_spans_of_externally_ended_ops(session) + if self._is_tracing_enabled(): + self._end_spans_of_externally_ended_ops(session) # This will schedule for as many executors as possible. num_queued_tis = self._do_scheduling(session) @@ -1849,7 +1867,8 @@ def _schedule_dag_run( return callback if ( - dag_run.scheduled_by_job_id is not None + self._is_tracing_enabled() + and dag_run.scheduled_by_job_id is not None and dag_run.scheduled_by_job_id != self.job.id and self.active_spans.get("dr:" + str(dag_run.id)) is None ):