-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not fail requeued TIs #23846
Do not fail requeued TIs #23846
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
But need others confirmation too |
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Anyone? |
Taking a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way try_number
is handled is complex (bad. Internally to TaskInstance class) so could you also test that behaviour is right too?
LGTM once that is added.
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
# ti is queued with another try number - do not fail it | ||
ti1.state = State.QUEUED | ||
ti1.queued_by_job_id = 1 | ||
ti1.try_number = 2 | ||
session.merge(ti1) | ||
session.commit() | ||
|
||
executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None | ||
|
||
self.scheduler_job._process_executor_events(session=session) | ||
ti1.refresh_from_db(session=session) | ||
assert ti1.state == State.QUEUED | ||
self.scheduler_job.executor.callback_sink.send.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased a this should check try number handling
(cherry picked from commit 66ffe39)
By the time time we process executor events we might have requeued the TI - we should not mark the TI failed in this case.
One way this can happen is deferable operator with quick trigger.
closes: #23824
related: #21316
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named
{pr_number}.significant.rst
, in newsfragments.