Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2357,8 +2357,16 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
reset_tis_message = []
for ti in to_reset:
reset_tis_message.append(repr(ti))
# If we reset a TI, it will be eligible to be scheduled again.
# This can cause the scheduler to increase the try_number on the TI.
# Record the current try to TaskInstanceHistory first so users have an audit trail for
# the attempt that was abandoned.
ti.prepare_db_for_next_try(session=session)

ti.state = None
ti.queued_by_job_id = None
ti.external_executor_id = None
ti.clear_next_method_args()

for ti in set(tis_to_adopt_or_reset) - set(to_reset):
ti.queued_by_job_id = self.job.id
Expand Down
20 changes: 20 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3955,6 +3955,8 @@ def test_adopt_or_reset_orphaned_tasks_nothing(self):
list(sorted(State.adoptable_states)),
)
def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, session):
from airflow.models.taskinstancehistory import TaskInstanceHistory

dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
Expand All @@ -3969,13 +3971,31 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi
ti = dr1.get_task_instances(session=session)[0]
ti.state = adoptable_state
ti.queued_by_job_id = old_job.id
old_ti_id = ti.id
old_try_number = ti.try_number
session.merge(ti)
session.merge(dr1)
session.commit()

num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
assert num_reset_tis == 1

ti.refresh_from_db(session=session)
assert ti.id != old_ti_id
assert (
session.scalar(
select(TaskInstanceHistory).where(
TaskInstanceHistory.dag_id == ti.dag_id,
TaskInstanceHistory.task_id == ti.task_id,
TaskInstanceHistory.run_id == ti.run_id,
TaskInstanceHistory.map_index == ti.map_index,
TaskInstanceHistory.try_number == old_try_number,
TaskInstanceHistory.task_instance_id == old_ti_id,
)
)
is not None
)

def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session):
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
with dag_maker(dag_id=dag_id, schedule="@daily"):
Expand Down
Loading