From e7438845ac22e4e0cda3bc7100abd63849d55d90 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 7 Jul 2025 18:50:02 -0400 Subject: [PATCH 01/10] Call executor fail method for stuck in queued tasks --- airflow/jobs/scheduler_job_runner.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index c9afd40f719ed..23ffd8af7c049 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1801,10 +1801,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: try: for ti in stuck_tis: executor.revoke_task(ti=ti) - self._maybe_requeue_stuck_ti( - ti=ti, - session=session, - ) + self._maybe_requeue_stuck_ti(ti=ti, session=session, executor=executor) except NotImplementedError: # this block only gets entered if the executor has not implemented `revoke_task`. # in which case, we try the fallback logic @@ -1824,7 +1821,7 @@ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: ) ) - def _maybe_requeue_stuck_ti(self, *, ti, session): + def _maybe_requeue_stuck_ti(self, *, ti, session, executor): """ Requeue task if it has not been attempted too many times. @@ -1856,7 +1853,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session): extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", ) ) - ti.set_state(TaskInstanceState.FAILED, session=session) + executor.fail(ti.key) @deprecated( reason="This is backcompat layer for older executor interface. Should be removed in 3.0", From 101f432dd5293ff42e21c1eaa4163923fb05b426 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Tue, 8 Jul 2025 17:12:40 -0400 Subject: [PATCH 02/10] Modify test to assert executor fail --- tests/jobs/test_scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index f8a879145df49..80548253287fa 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2310,7 +2310,7 @@ def _queue_tasks(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method + mock_executors[0].fail.assert_called() states = [x.state for x in dr.get_task_instances(session=session)] assert states == ["failed", "failed"] From 79392135aa7eeda4f0c6005919d4643fb00468ca Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Fri, 11 Jul 2025 13:32:28 -0400 Subject: [PATCH 03/10] Remove ti failed state assertion from test_handle_stuck_queued_tasks_multiple_attempts --- tests/jobs/test_scheduler_job.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 80548253287fa..40e9ac07103b5 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2311,8 +2311,6 @@ def _queue_tasks(tis): ] mock_executors[0].fail.assert_called() - states = [x.state for x in dr.get_task_instances(session=session)] - assert states == ["failed", "failed"] @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors): From 49517eb56bbeb7a53c06af52afd4860c399ce2b7 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Fri, 11 Jul 2025 16:01:44 -0400 Subject: [PATCH 04/10] Modify assertions in test_handle_stuck_queued_tasks_reschedule_sensors --- tests/jobs/test_scheduler_job.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 40e9ac07103b5..e70440229aed3 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2407,9 +2407,7 @@ def _add_running_event(tis): "stuck in queued tries exceeded", ] - mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method - states = [x.state for x in dr.get_task_instances(session=session)] - assert states == ["failed", "failed"] + mock_executors[0].fail.assert_called() def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): """Test that if executor no implement revoke_task then we don't blow up.""" From c025164c4e446cd8473e37f059e2c957d98bbca9 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Wed, 15 Oct 2025 21:36:14 -0400 Subject: [PATCH 05/10] Directly trigger callback --- airflow/jobs/scheduler_job_runner.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index d0530308f5c30..8057aaae5afa0 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1846,14 +1846,35 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, executor): "Task requeue attempts exceeded max; marking failed. task_instance=%s", ti, ) + msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed." session.add( Log( event="stuck in queued tries exceeded", task_instance=ti.key, - extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.", + extra=msg, ) ) - executor.fail(ti.key) + try: + dag = self.dagbag.get_dag(ti.dag_id) + task = dag.get_task(ti.task_id) + except Exception: + self.log.warning( + "The DAG or task could not be found. If a failure callback exists, it will not be run.", + exc_info=True, + ) + else: + ti.task = task + if task.on_failure_callback: + request = TaskCallbackRequest( + full_filepath=ti.dag_model.fileloc, + simple_task_instance=SimpleTaskInstance.from_ti(ti), + msg=msg, + processor_subdir=ti.dag_model.processor_subdir, + ) + executor.send_callback(request) + finally: + ti.set_state(TaskInstanceState.FAILED, session=session) + executor.fail(ti.key) @deprecated( reason="This is backcompat layer for older executor interface. Should be removed in 3.0", From 1e62cbc20c095b1c37a74ebb672dedfafa46063a Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Fri, 17 Oct 2025 19:17:19 -0400 Subject: [PATCH 06/10] Correct indentation --- airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 8057aaae5afa0..3f63ddd32495c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1871,7 +1871,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, executor): msg=msg, processor_subdir=ti.dag_model.processor_subdir, ) - executor.send_callback(request) + executor.send_callback(request) finally: ti.set_state(TaskInstanceState.FAILED, session=session) executor.fail(ti.key) From 0f9f852b83c58ffcc94fb4e3374eedafc77b110a Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 20 Oct 2025 21:06:29 -0400 Subject: [PATCH 07/10] Prevent detached ti objects --- airflow/jobs/scheduler_job_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 3f63ddd32495c..c49465227ac7c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -33,7 +33,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator from deprecated import deprecated -from sqlalchemy import and_, delete, desc, func, not_, or_, select, text, update +from sqlalchemy import and_, delete, desc, func, inspect, not_, or_, select, text, update from sqlalchemy.exc import OperationalError from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload from sqlalchemy.sql import expression @@ -1865,6 +1865,8 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, executor): else: ti.task = task if task.on_failure_callback: + if inspect(ti).detached: + ti = session.merge(ti) request = TaskCallbackRequest( full_filepath=ti.dag_model.fileloc, simple_task_instance=SimpleTaskInstance.from_ti(ti), From 9d9d84a412d74b74a3b25e79ecddae8539b87a58 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Wed, 22 Oct 2025 15:39:13 -0400 Subject: [PATCH 08/10] Update unit tests --- tests/jobs/test_scheduler_job.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index e70440229aed3..abf394743ee86 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2243,11 +2243,15 @@ def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session, mock_exe mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1]) mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3]) + @staticmethod + def mock_failure_callback(context): + pass + @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors): """Verify that tasks stuck in queued will be rescheduled up to N times.""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2310,6 +2314,11 @@ def _queue_tasks(tis): "stuck in queued tries exceeded", ] + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback + states = [x.state for x in dr.get_task_instances(session=session)] + assert states == ["failed", "failed"] mock_executors[0].fail.assert_called() @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"}) @@ -2317,7 +2326,7 @@ def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, """Reschedule sensors go in and out of running repeatedly using the same try_number Make sure that they get three attempts per reschedule, not 3 attempts per try_number""" with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"): - EmptyOperator(task_id="op1") + EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback) EmptyOperator(task_id="op2", executor="default_exec") def _queue_tasks(tis): @@ -2407,6 +2416,11 @@ def _add_running_event(tis): "stuck in queued tries exceeded", ] + mock_executors[ + 0 + ].send_callback.assert_called_once() # this should only be called for the task that has a callback + states = [x.state for x in dr.get_task_instances(session=session)] + assert states == ["failed", "failed"] mock_executors[0].fail.assert_called() def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog): From 83f83041e7f747ff61f601ea23a5e3cdf6c0ba7f Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 27 Oct 2025 15:34:39 -0400 Subject: [PATCH 09/10] Update rich-click package version to fix failing CI image test --- dev/breeze/README.md | 2 +- dev/breeze/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/breeze/README.md b/dev/breeze/README.md index 1c5f19929b939..1241ffc1aefe2 100644 --- a/dev/breeze/README.md +++ b/dev/breeze/README.md @@ -128,6 +128,6 @@ PLEASE DO NOT MODIFY THE HASH BELOW! IT IS AUTOMATICALLY UPDATED BY PRE-COMMIT. --------------------------------------------------------------------------------------------------------- -Package config hash: 004dd496a1cbe6c9b13a49cbe3cb8b6a72359aa6225785ef186f24236466c11ed8e797274bbdc440067a5cb5bb024cce0c8a7833b8ce436db096cefded100e56 +Package config hash: ba5ee49cbc2dc4428ab70a2ac991a9cd41118a9ef21afdc8db6d92d08f6c411606795eb8728e9f9d5d773cfbdaf582a90980cd23e9cb4c2cfadbbf60a7050e12 --------------------------------------------------------------------------------------------------------- diff --git a/dev/breeze/pyproject.toml b/dev/breeze/pyproject.toml index 3b7fb9271ab2c..a5084f4c6547c 100644 --- a/dev/breeze/pyproject.toml +++ b/dev/breeze/pyproject.toml @@ -73,7 +73,7 @@ dependencies = [ "pytest>=8.2,<9", "pyyaml>=6.0.1", "requests>=2.31.0", - "rich-click>=1.7.1", + "rich-click>=1.7.1,<1.9.0", "rich>=13.6.0", "semver>=3.0.2", "tabulate>=0.9.0", From 1ffb9c06474a03a3520c93cf58d4602e5bf49b16 Mon Sep 17 00:00:00 2001 From: Karen Braganza Date: Mon, 27 Oct 2025 16:21:34 -0400 Subject: [PATCH 10/10] Fix failing test Compile / format / lint WWW --- airflow/www/static/js/types/api-generated.ts | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 8cc92e140038f..f2c052800e482 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -2532,11 +2532,8 @@ export interface components { | "dummy" | "all_skipped" | "always"; - /** - * @description Weight rule. - * @enum {string} - */ - WeightRule: "downstream" | "upstream" | "absolute"; + /** @description Weight rule. One of 'downstream', 'upstream', 'absolute', or the path of the custom priority weight strategy class. */ + WeightRule: string; /** * @description Health status * @enum {string|null}