Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from itertools import groupby
from typing import TYPE_CHECKING, Any

from sqlalchemy import and_, delete, desc, exists, func, or_, select, text, tuple_, update
from sqlalchemy import and_, delete, desc, exists, func, inspect, or_, select, text, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
Expand Down Expand Up @@ -1946,6 +1946,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
self._maybe_requeue_stuck_ti(
ti=ti,
session=session,
executor=executor,
)
session.commit()
except NotImplementedError:
Expand All @@ -1961,7 +1962,7 @@ def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
)
)

def _maybe_requeue_stuck_ti(self, *, ti, session):
def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
"""
Requeue task if it has not been attempted too many times.

Expand All @@ -1986,14 +1987,45 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
ti,
)
msg = f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed."
session.add(
Log(
event="stuck in queued tries exceeded",
task_instance=ti.key,
extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
extra=msg,
)
)
ti.set_state(TaskInstanceState.FAILED, session=session)

try:
dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
task = dag.get_task(ti.task_id)
except Exception:
self.log.warning(
"The DAG or task could not be found. If a failure callback exists, it will not be run.",
exc_info=True,
)
else:
if task.on_failure_callback:
if inspect(ti).detached:
ti = session.merge(ti)
request = TaskCallbackRequest(
filepath=ti.dag_model.relative_fileloc,
bundle_name=ti.dag_version.bundle_name,
bundle_version=ti.dag_version.bundle_version,
ti=ti,
msg=msg,
context_from_server=TIRunContext(
dag_run=ti.dag_run,
max_tries=ti.max_tries,
variables=[],
connections=[],
xcom_keys_to_clear=[],
),
)
executor.send_callback(request)
finally:
ti.set_state(TaskInstanceState.FAILED, session=session)
executor.fail(ti.key)

def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
session.execute(
Expand Down
18 changes: 14 additions & 4 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,15 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_
# Second executor called for ti3
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])

@staticmethod
def mock_failure_callback(context):
pass

@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_multiple_attempts(self, dag_maker, session, mock_executors):
"""Verify that tasks stuck in queued will be rescheduled up to N times."""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
EmptyOperator(task_id="op1")
EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")

def _queue_tasks(tis):
Expand Down Expand Up @@ -2090,16 +2094,19 @@ def _queue_tasks(tis):
"stuck in queued tries exceeded",
]

mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method
mock_executors[
0
].send_callback.assert_called_once() # this should only be called for the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
mock_executors[0].fail.assert_called()

@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors):
"""Reschedule sensors go in and out of running repeatedly using the same try_number
Make sure that they get three attempts per reschedule, not 3 attempts per try_number"""
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
EmptyOperator(task_id="op1")
EmptyOperator(task_id="op1", on_failure_callback=TestSchedulerJob.mock_failure_callback)
EmptyOperator(task_id="op2", executor="default_exec")

def _queue_tasks(tis):
Expand Down Expand Up @@ -2189,9 +2196,12 @@ def _add_running_event(tis):
"stuck in queued tries exceeded",
]

mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method
mock_executors[
0
].send_callback.assert_called_once() # this should only be called for the task that has a callback
states = [x.state for x in dr.get_task_instances(session=session)]
assert states == ["failed", "failed"]
mock_executors[0].fail.assert_called()

def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
"""Test that if executor no implement revoke_task then we don't blow up."""
Expand Down
Loading