Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler to handle incrementing of try_number #39336

Merged
merged 53 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
cade71e
Refactor tests re sensor reschedule mode and try_number
dstandish May 1, 2024
c04e0f5
fix
dstandish May 1, 2024
31e2c21
fix
dstandish May 1, 2024
ad2ccd0
fix
dstandish May 1, 2024
47ae9a6
Remove try_number shenanigans
dstandish Apr 30, 2024
32bb153
increment try number when running dag test
dstandish May 1, 2024
c4cb129
fix tests
dstandish May 1, 2024
e02aa1b
restore try_number increment when defer immed
dstandish May 1, 2024
566f9cd
nothing
dstandish May 1, 2024
8bad786
reduce diff
dstandish May 1, 2024
7491f3d
correct logic
dstandish May 1, 2024
d467cef
remove todos
dstandish May 1, 2024
a0dcb55
fix tests
dstandish May 1, 2024
4f94368
fix tests
dstandish May 1, 2024
867a8ba
fix tests
dstandish May 2, 2024
8a40c53
batch of test fixes
dstandish May 2, 2024
c5184b5
fix backoff logic
dstandish May 2, 2024
1e1b94b
comment
dstandish May 2, 2024
4938f40
next batch of fixes
dstandish May 2, 2024
f740a14
fix sentry
dstandish May 3, 2024
da4ba4d
--wip-- [skip ci]
dstandish May 3, 2024
4f6e2d0
handle try_number in backfill
dstandish May 6, 2024
c7805f9
fix mock executor changing try_number
dstandish May 6, 2024
e23e251
misc test fixes
dstandish May 6, 2024
c59c4bb
fix try number incrementing in backfill for non-scheduled tasks
dstandish May 6, 2024
fc83374
openlineage take actual try_number
dstandish May 6, 2024
f154941
backfill test fixes
dstandish May 6, 2024
eb8f4d5
test fixes
dstandish May 6, 2024
d4d69d2
fix tests
dstandish May 7, 2024
3efc668
revert some backfill changes
dstandish May 7, 2024
8f8ceb0
revert logging changes
dstandish May 7, 2024
c98f0c8
fix tests
dstandish May 7, 2024
fdcb6e5
revert backfill heartbeat change
dstandish May 7, 2024
d440d41
formatting
dstandish May 7, 2024
c359514
fix test
dstandish May 7, 2024
ca169c6
static check fix
dstandish May 7, 2024
42f08e8
fix tests
dstandish May 7, 2024
76c56ad
fix tests
dstandish May 7, 2024
7559b88
fix tests
dstandish May 7, 2024
017a2c4
fix test
dstandish May 7, 2024
f9205af
deal with some backcompat stuff
dstandish May 7, 2024
f570d9f
remove log adsd
dstandish May 7, 2024
d046774
fix spelling
dstandish May 7, 2024
839d15a
fix tests
dstandish May 8, 2024
635e6b4
add todo
dstandish May 8, 2024
2e2069d
news fragment
dstandish May 8, 2024
41107f6
todo
dstandish May 8, 2024
54e5daf
Apply suggestions from code review
dstandish May 8, 2024
6252900
Apply suggestions from code review
dstandish May 8, 2024
cdf549f
imports
dstandish May 8, 2024
3428b89
use try_number=1 for k8s synth ti
dstandish May 8, 2024
2277e22
fix test
dstandish May 8, 2024
da82a1c
tweaks
dstandish May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ def set_state(
qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
tis_altered += session.scalars(qry_sub_dag.with_for_update()).all()
for task_instance in tis_altered:
# The try_number was decremented when setting to up_for_reschedule and deferred.
# Increment it back when changing the state again
if task_instance.state in (TaskInstanceState.DEFERRED, TaskInstanceState.UP_FOR_RESCHEDULE):
task_instance._try_number += 1
task_instance.set_state(state, session=session)
session.flush()
else:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Meta:
end_date = auto_field()
duration = auto_field()
state = TaskInstanceStateField()
_try_number = auto_field(data_key="try_number")
try_number = auto_field()
max_tries = auto_field()
task_display_name = fields.String(attribute="task_display_name", dump_only=True)
hostname = auto_field()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

def get_weight(self, ti: TaskInstance):
return max(3 - ti._try_number + 1, 1)
return max(3 - ti.try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
Expand Down
30 changes: 26 additions & 4 deletions airflow/jobs/backfill_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import attr
import pendulum
from sqlalchemy import select, tuple_, update
from sqlalchemy import case, or_, select, tuple_, update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from tabulate import tabulate
Expand Down Expand Up @@ -245,7 +245,16 @@ def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> No
session.execute(
update(TI)
.where(filter_for_tis)
.values(state=TaskInstanceState.SCHEDULED)
.values(
state=TaskInstanceState.SCHEDULED,
try_number=case(
(
or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
dstandish marked this conversation as resolved.
Show resolved Hide resolved
TI.try_number + 1,
),
else_=TI.try_number,
),
)
.execution_options(synchronize_session=False)
)
session.flush()
Expand Down Expand Up @@ -425,6 +434,8 @@ def _task_instances_for_dag_run(
try:
for ti in dag_run.get_task_instances(session=session):
if ti in schedulable_tis:
if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
ti.try_number += 1
ti.set_state(TaskInstanceState.SCHEDULED)
if ti.state != TaskInstanceState.REMOVED:
tasks_to_run[ti.key] = ti
Expand Down Expand Up @@ -515,6 +526,7 @@ def _per_task_process(key, ti: TaskInstance, session):
if key in ti_status.running:
ti_status.running.pop(key)
# Reset the failed task in backfill to scheduled state
ti.try_number += 1
ti.set_state(TaskInstanceState.SCHEDULED, session=session)
if ti.dag_run not in ti_status.active_runs:
ti_status.active_runs.add(ti.dag_run)
Expand Down Expand Up @@ -552,6 +564,14 @@ def _per_task_process(key, ti: TaskInstance, session):
else:
self.log.debug("Sending %s to executor", ti)
# Skip scheduled state, we are executing immediately
if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
# i am not sure why this is necessary.
# seemingly a quirk of backfill runner.
# it should be handled elsewhere i think.
# seems the leaf tasks are set SCHEDULED but others not.
# but i am not going to look too closely since we need
# to nuke the current backfill approach anyway.
ti.try_number += 1
ti.state = TaskInstanceState.QUEUED
ti.queued_by_job_id = self.job.id
ti.queued_dttm = timezone.utcnow()
Expand Down Expand Up @@ -695,7 +715,9 @@ def _per_task_process(key, ti: TaskInstance, session):
self.log.debug(e)

perform_heartbeat(
job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
job=self.job,
heartbeat_callback=self.heartbeat_callback,
only_if_necessary=True,
)
# execute the tasks in the queue
executor.heartbeat()
Expand Down Expand Up @@ -725,6 +747,7 @@ def to_keep(key: TaskInstanceKey) -> bool:
ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis})

for new_ti in new_mapped_tis:
new_ti.try_number += 1
new_ti.set_state(TaskInstanceState.SCHEDULED, session=session)

# Set state to failed for running TIs that are set up for retry if disable-retry flag is set
Expand Down Expand Up @@ -930,7 +953,6 @@ def _execute(self, session: Session = NEW_SESSION) -> None:
"combination. Please adjust backfill dates or wait for this DagRun to finish.",
)
return
# picklin'
pickle_id = None

executor_class, _ = ExecutorLoader.import_default_executor_cls()
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2948,6 +2948,8 @@ def add_logger_if_needed(ti: TaskInstance):
session.expire_all()
schedulable_tis, _ = dr.update_state(session=session)
for s in schedulable_tis:
if s.state != TaskInstanceState.UP_FOR_RESCHEDULE:
s.try_number += 1
s.state = TaskInstanceState.SCHEDULED
session.commit()
# triggerer may mark tasks scheduled so we read from DB
Expand Down
16 changes: 13 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates
from sqlalchemy.sql.expression import false, select, true
from sqlalchemy.sql.expression import case, false, select, true

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
Expand Down Expand Up @@ -1545,7 +1545,8 @@ def schedule_tis(
and not ti.task.on_success_callback
and not ti.task.outlets
):
ti._try_number += 1
if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
ti.try_number += 1
ti.defer_task(
defer=TaskDeferred(trigger=ti.task.start_trigger, method_name=ti.task.next_method),
session=session,
Expand All @@ -1567,7 +1568,16 @@ def schedule_tis(
TI.run_id == self.run_id,
tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk),
)
.values(state=TaskInstanceState.SCHEDULED)
.values(
state=TaskInstanceState.SCHEDULED,
try_number=case(
(
or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
dstandish marked this conversation as resolved.
Show resolved Hide resolved
TI.try_number + 1,
),
else_=TI.try_number,
),
)
.execution_options(synchronize_session=False)
).rowcount

Expand Down
Loading