diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index e641af6063841..47d41ae34c32d 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -32,7 +32,7 @@ from itertools import groupby from typing import TYPE_CHECKING, Any -from sqlalchemy import and_, delete, desc, exists, func, or_, select, text, tuple_, update +from sqlalchemy import and_, delete, desc, exists, func, inspect, or_, select, text, tuple_, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -1946,6 +1946,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: self._maybe_requeue_stuck_ti( ti=ti, session=session, + executor=executor, ) session.commit() except NotImplementedError: @@ -1961,7 +1962,7 @@ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: ) ) - def _maybe_requeue_stuck_ti(self, *, ti, session): + def _maybe_requeue_stuck_ti(self, *, ti, session, executor): """ Requeue task if it has not been attempted too many times. @@ -1986,14 +1987,45 @@ def _maybe_requeue_stuck_ti(self, *, ti, session): "Task requeue attempts exceeded max; marking failed. task_instance=%s", ti, ) + msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed." session.add( Log( event="stuck in queued tries exceeded", task_instance=ti.key, - extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + extra=msg, ) ) - ti.set_state(TaskInstanceState.FAILED, session=session) + + try: + dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session) + task = dag.get_task(ti.task_id) + except Exception: + self.log.warning( + "The DAG or task could not be found. If a failure callback exists, it will not be run.", + exc_info=True, + ) + else: + if task.on_failure_callback: + if inspect(ti).detached: + ti = session.merge(ti) + request = TaskCallbackRequest( + filepath=ti.dag_model.relative_fileloc, + bundle_name=ti.dag_version.bundle_name, + bundle_version=ti.dag_version.bundle_version, + ti=ti, + msg=msg, + context_from_server=TIRunContext( + dag_run=ti.dag_run, + max_tries=ti.max_tries, + variables=[], + connections=[], + xcom_keys_to_clear=[], + ), + ) + executor.send_callback(request) + finally: + ti.set_state(TaskInstanceState.FAILED, session=session) + executor.fail(ti.key) def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): session.execute( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 27c89a85981d9..f12018d87e2a6 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2020,11 +2020,15 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_ # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) + @staticmethod + def mock_failure_callback(context): + pass + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): """Verify that tasks stuck in queued will be rescheduled up to N times.""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2090,16 +2094,19 @@ def _queue_tasks(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors): """Reschedule sensors go in and out of running repeatedly using the same try_number Make sure that they get three attempts per reschedule, not 3 attempts per try_number""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2189,9 +2196,12 @@ def _add_running_event(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): """Test that if executor no implement revoke_task then we don't blow up."""