diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index eb8e7b22770af..9f85ac7cc112c 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2450,8 +2450,8 @@ def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NE @provide_session def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None: metric_states = {State.SCHEDULED, State.QUEUED, State.RUNNING, State.DEFERRED} - all_states_metric = ( - session.query( + stmt = ( + select( TaskInstance.state, TaskInstance.dag_id, TaskInstance.task_id, @@ -2460,22 +2460,24 @@ def _emit_ti_metrics(self, session: Session = NEW_SESSION) -> None: ) .filter(TaskInstance.state.in_(metric_states)) .group_by(TaskInstance.state, TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.queue) - .all() ) + all_states_metric = session.execute(stmt).all() for state in metric_states: if state not in self.previous_ti_metrics: self.previous_ti_metrics[state] = {} ti_metrics = { - (row.dag_id, row.task_id, row.queue): row.count - for row in all_states_metric - if row.state == state + (dag_id, task_id, queue): count + for row_state, dag_id, task_id, queue, count in all_states_metric + if row_state == state } for (dag_id, task_id, queue), count in ti_metrics.items(): - Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", count) - Stats.gauge(f"ti.{state}", count, tags={"queue": queue, "dag_id": dag_id, "task_id": task_id}) + Stats.gauge(f"ti.{state}.{queue}.{dag_id}.{task_id}", float(count)) + Stats.gauge( + f"ti.{state}", float(count), tags={"queue": queue, "dag_id": dag_id, "task_id": task_id} + ) for prev_key in self.previous_ti_metrics[state]: # Reset previously exported stats that are no longer present in current metrics to zero