From 9c8b8ac009a2943753068074cc104ca4d0b726e5 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Tue, 12 Aug 2025 06:00:35 -0400 Subject: [PATCH 1/2] [v3-0-test] Allow failure callbacks for stuck in queued TIs that fail (#53435) In issue #51301, it was reported that failure callbacks do not run for task instances that get stuck in queued and fail in Airflow 2.10.5. This is happening due to the changes introduced in PR #43520 . In this PR, logic was introduced to requeue tasks that get stuck in queued (up to two times by default) before failing them. Previously, the executor's fail method was called when the task needed to be failed after max requeue attempts. This was replaced by the task instance's set_state method in the PR ti.set_state(TaskInstanceState.FAILED, session=session). Without the executor's fail method being called, failure callbacks will not be executed for such task instances. Therefore, I changed the code to call the executor's fail method instead in Airflow 3. (cherry picked from commit 6da77b1fdfc0b51762b47638489e752384911758) Co-authored-by: Karen Braganza --- .../src/airflow/jobs/scheduler_job_runner.py | 38 +++++++++++++++++-- .../tests/unit/jobs/test_scheduler_job.py | 18 +++++++-- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 0a47913ad5039..0bca396643cc2 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2014,6 +2014,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: self._maybe_requeue_stuck_ti( ti=ti, session=session, + executor=executor, ) session.commit() except NotImplementedError: @@ -2029,7 +2030,7 @@ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: ) ) - def _maybe_requeue_stuck_ti(self, *, ti, session): + def _maybe_requeue_stuck_ti(self, *, ti, session, executor): """ Requeue task if it has not been attempted too many times. @@ -2054,14 +2055,45 @@ def _maybe_requeue_stuck_ti(self, *, ti, session): "Task requeue attempts exceeded max; marking failed. task_instance=%s", ti, ) + msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed." session.add( Log( event="stuck in queued tries exceeded", task_instance=ti.key, - extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + extra=msg, ) ) - ti.set_state(TaskInstanceState.FAILED, session=session) + + try: + dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session) + task = dag.get_task(ti.task_id) + except Exception: + self.log.warning( + "The DAG or task could not be found. If a failure callback exists, it will not be run.", + exc_info=True, + ) + else: + if task.on_failure_callback: + if inspect(ti).detached: + ti = session.merge(ti) + request = TaskCallbackRequest( + filepath=ti.dag_model.relative_fileloc, + bundle_name=ti.dag_version.bundle_name, + bundle_version=ti.dag_version.bundle_version, + ti=ti, + msg=msg, + context_from_server=TIRunContext( + dag_run=ti.dag_run, + max_tries=ti.max_tries, + variables=[], + connections=[], + xcom_keys_to_clear=[], + ), + ) + executor.send_callback(request) + finally: + ti.set_state(TaskInstanceState.FAILED, session=session) + executor.fail(ti.key) def _reschedule_stuck_task(self, ti: TaskInstance, session: Session): session.execute( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index c451f9d0a9166..058504880af64 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2003,11 +2003,15 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_ # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) + @staticmethod + def mock_failure_callback(context): + pass + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): """Verify that tasks stuck in queued will be rescheduled up to N times.""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2073,16 +2077,19 @@ def _queue_tasks(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors): """Reschedule sensors go in and out of running repeatedly using the same try_number Make sure that they get three attempts per reschedule, not 3 attempts per try_number""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2172,9 +2179,12 @@ def _add_running_event(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): """Test that if executor no implement revoke_task then we don't blow up.""" From c37e8d3f34db80ed02211f9d27a3bcd52c682438 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 13 Aug 2025 13:06:48 +0100 Subject: [PATCH 2/2] Apply suggestions from code review --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 0bca396643cc2..2ad76af29c85a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2065,7 +2065,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, executor): ) try: - dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session) + dag = self.scheduler_dag_bag.get_dag(dag_run=ti.dag_run, session=session) task = dag.get_task(ti.task_id) except Exception: self.log.warning(