From cdc9511b1c1323aae6cfbcc0bff0c413d7ef8b6d Mon Sep 17 00:00:00 2001 From: Sean Muth Date: Wed, 31 Dec 2025 06:11:24 -0600 Subject: [PATCH] [v3-1-test] Record missing `ti_failure` metrics for tasks in Airflow 3 (#59731) * emit ti failures stats * fix whitespace, comment (cherry picked from commit 98d59763242fac3fe79561967f226bfb3cfacfdf) Co-authored-by: Sean Muth --- task-sdk/src/airflow/sdk/execution_time/task_runner.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 79df1beb605ad..611228ec20148 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1119,6 +1119,16 @@ def _handle_current_task_failed( ) -> tuple[RetryTask, TaskInstanceState] | tuple[TaskState, TaskInstanceState]: end_date = datetime.now(tz=timezone.utc) ti.end_date = end_date + + # Record operator and task instance failed metrics + operator = ti.task.__class__.__name__ + stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id} + + Stats.incr(f"operator_failures_{operator}", tags=stats_tags) + # Same metric with tagging + Stats.incr("operator_failures", tags={**stats_tags, "operator": operator}) + Stats.incr("ti_failures", tags=stats_tags) + if ti._ti_context_from_server and ti._ti_context_from_server.should_retry: return RetryTask(end_date=end_date), TaskInstanceState.UP_FOR_RETRY return (