Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,35 @@ def fetch_handle_failure_context(
if not test_mode:
ti.refresh_from_db(session)

# Check for orphaned task before setting end_date
if (
ti.is_eligible_to_retry()
and ti.state is None
and ti.start_date is not None
and ti.end_date is None
):
# If the task instance state is None but has a start_date without end_date,
# it likely means the task was running but became orphaned and its state was reset.
# This can happen during scheduler restarts when executors fail to adopt running tasks
# (e.g., due to Kubernetes API 429 errors). We should still record the task instance
# history to maintain complete log history for troubleshooting.
from airflow.models.taskinstancehistory import TaskInstanceHistory

log.info(
"Recording task instance history for orphaned task %s that was previously running "
"(start_date: %s, state reset to None)",
ti.key,
ti.start_date,
)
# Temporarily set state to RUNNING to trigger proper history recording
original_state = ti.state
ti.state = TaskInstanceState.RUNNING
try:
TaskInstanceHistory.record_ti(ti, session=session)
finally:
# Restore the original state
ti.state = original_state

ti.end_date = timezone.utcnow()
ti.set_duration()

Expand Down
84 changes: 84 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2505,6 +2505,90 @@ def test_handle_failure_task_undefined(self, create_task_instance):
del ti.task
ti.handle_failure("test ti.task undefined")

@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
def test_fetch_handle_failure_context_orphaned_task_records_history(
self, mock_record_ti, dag_maker, session
):
"""
Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded.
This scenario occurs when tasks are running but become orphaned due to executor failures.
"""
with dag_maker(dag_id="test_orphaned_task"):
task = EmptyOperator(task_id="orphaned_task", retries=2)

dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task.task_id, session=session)
ti.task = task

# Simulate an orphaned task: state=None but has start_date (was running) and no end_date
start_time = timezone.utcnow() - datetime.timedelta(minutes=5)
ti.state = None
ti.start_date = start_time
ti.end_date = None
ti.try_number = 1
ti.max_tries = 3

session.merge(ti)
session.commit()

failure_context = TaskInstance.fetch_handle_failure_context(
ti=ti,
error="Test orphaned task error",
test_mode=False,
session=session,
fail_fast=False,
)

# Verify that TaskInstanceHistory.record_ti was called for the orphaned task
mock_record_ti.assert_called_once()
call_args = mock_record_ti.call_args
recorded_ti = call_args[0][0]

assert recorded_ti.task_id == ti.task_id
assert recorded_ti.dag_id == ti.dag_id
assert recorded_ti.run_id == ti.run_id
assert recorded_ti.start_date == start_time
assert ti.state == State.UP_FOR_RETRY
assert failure_context == ti

@patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti")
def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history(
self, mock_record_ti, dag_maker, session
):
"""
Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording.
This ensures we only record history for tasks that were actually running.
"""
with dag_maker(dag_id="test_not_orphaned_task"):
task = EmptyOperator(task_id="not_orphaned_task", retries=2)

dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task.task_id, session=session)
ti.task = task

# Simulate a task that was never started: state=None and no start_date
ti.state = None
ti.start_date = None
ti.end_date = None
ti.try_number = 1
ti.max_tries = 3

session.merge(ti)
session.commit()

# Call fetch_handle_failure_context
failure_context = TaskInstance.fetch_handle_failure_context(
ti=ti,
error="Test non-orphaned task error",
test_mode=False,
session=session,
fail_fast=False,
)

mock_record_ti.assert_not_called()
assert ti.state == State.UP_FOR_RETRY
assert failure_context == ti

def test_handle_failure_fail_fast(self, dag_maker, session):
start_date = timezone.datetime(2016, 6, 1)

Expand Down