diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 1d15b9b87781b..703cecf509cb2 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1215,6 +1215,16 @@ def _handle_current_task_failed( ) -> tuple[RetryTask, TaskInstanceState] | tuple[TaskState, TaskInstanceState]: end_date = datetime.now(tz=timezone.utc) ti.end_date = end_date + + # Record operator and task instance failed metrics + operator = ti.task.__class__.__name__ + stats_tags = {"dag_id": ti.dag_id, "task_id": ti.task_id} + + Stats.incr(f"operator_failures_{operator}", tags=stats_tags) + # Same metric with tagging + Stats.incr("operator_failures", tags={**stats_tags, "operator": operator}) + Stats.incr("ti_failures", tags=stats_tags) + if ti._ti_context_from_server and ti._ti_context_from_server.should_retry: return RetryTask(end_date=end_date), TaskInstanceState.UP_FOR_RETRY return (