Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator

from deprecated import deprecated
from sqlalchemy import and_, delete, desc, func, not_, or_, select, text, update
from sqlalchemy import and_, delete, desc, func, inspect, not_, or_, select, text, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
Expand Down Expand Up @@ -1801,10 +1801,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
try:
for ti in stuck_tis:
executor.revoke_task(ti=ti)
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
)
self._maybe_requeue_stuck_ti(ti=ti, session=session, executor=executor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do this in main, do you think that should be updated too? or is it 2.* specific? If latter, why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is implemented in separate PR #53435 @kaxil ..

except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
Expand All @@ -1824,7 +1821,7 @@ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
)
)

def _maybe_requeue_stuck_ti(self, *, ti, session):
def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
"""
Requeue task if it has not been attempted too many times.

Expand All @@ -1849,14 +1846,37 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed."
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
extra=msg,
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)
try:
dag = self.dagbag.get_dag(ti.dag_id)
task = dag.get_task(ti.task_id)
except Exception:
self.log.warning(
"The DAG or task could not be found. If a failure callback exists, it will not be run.",
exc_info=True,
)
else:
ti.task = task
if task.on_failure_callback:
if inspect(ti).detached:
ti = session.merge(ti)
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg,
processor_subdir=ti.dag_model.processor_subdir,
)
executor.send_callback(request)
finally:
ti.set_state(TaskInstanceState.FAILED, session=session)
executor.fail(ti.key)

@deprecated(
reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
Expand Down
7 changes: 2 additions & 5 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2532,11 +2532,8 @@ export interface components {
| "dummy"
| "all_skipped"
| "always";
/**
* @description Weight rule.
* @enum {string}
*/
WeightRule: "downstream" | "upstream" | "absolute";
/** @description Weight rule. One of 'downstream', 'upstream', 'absolute', or the path of the custom priority weight strategy class. */
WeightRule: string;
/**
* @description Health status
* @enum {string|null}
Expand Down
2 changes: 1 addition & 1 deletion dev/breeze/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,6 @@ PLEASE DO NOT MODIFY THE HASH BELOW! IT IS AUTOMATICALLY UPDATED BY PRE-COMMIT.

---------------------------------------------------------------------------------------------------------

Package config hash: 004dd496a1cbe6c9b13a49cbe3cb8b6a72359aa6225785ef186f24236466c11ed8e797274bbdc440067a5cb5bb024cce0c8a7833b8ce436db096cefded100e56
Package config hash: ba5ee49cbc2dc4428ab70a2ac991a9cd41118a9ef21afdc8db6d92d08f6c411606795eb8728e9f9d5d773cfbdaf582a90980cd23e9cb4c2cfadbbf60a7050e12

---------------------------------------------------------------------------------------------------------
2 changes: 1 addition & 1 deletion dev/breeze/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ dependencies = [
"pytest>=8.2,<9",
"pyyaml>=6.0.1",
"requests>=2.31.0",
"rich-click>=1.7.1",
"rich-click>=1.7.1,<1.9.0",
"rich>=13.6.0",
"semver>=3.0.2",
"tabulate>=0.9.0",
Expand Down
18 changes: 14 additions & 4 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2243,11 +2243,15 @@ def test_handle_stuck_queued_tasks_backcompat(self, dag_maker, session, mock_exe
mock_exec_1.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
mock_exec_2.cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])

@staticmethod
def mock_failure_callback(context):
pass

@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors):
"""Verify that tasks stuck in queued will be rescheduled up to N times."""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
EmptyOperator(task_id="op1")
EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")

def _queue_tasks(tis):
Expand Down Expand Up @@ -2310,16 +2314,19 @@ def _queue_tasks(tis):
"stuck in queued tries exceeded",
]

mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method
mock_executors[
0
].send_callback.assert_called_once() # this should only be called for the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
mock_executors[0].fail.assert_called()

@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors):
"""Reschedule sensors go in and out of running repeatedly using the same try_number
Make sure that they get three attempts per reschedule, not 3 attempts per try_number"""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
EmptyOperator(task_id="op1")
EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")

def _queue_tasks(tis):
Expand Down Expand Up @@ -2409,9 +2416,12 @@ def _add_running_event(tis):
"stuck in queued tries exceeded",
]

mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method
mock_executors[
0
].send_callback.assert_called_once() # this should only be called for the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
mock_executors[0].fail.assert_called()

def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
"""Test that if executor no implement revoke_task then we don't blow up."""
Expand Down
Loading