-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Order triggers by - TI priority_weight when assign unassigned triggers #32318
Conversation
Signed-off-by: Hussein Awala <hussein@awala.fr>
airflow/models/trigger.py
Outdated
.filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids))) | ||
.order_by(cls.created_date) | ||
.order_by(-TaskInstance.priority_weight, cls.created_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.order_by(-TaskInstance.priority_weight, cls.created_date) | |
.order_by(TaskInstance.priority_weight.desc(), cls.created_date) |
Preferred style.
Can this be NULL and how do we want to handle them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For NULL, I can use TaskInstance.priority_weight.desc().nulls_last()
, but it seems like the lock (with_for_update) doesn't work with the outer join, I will try to find a solution for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use CASE
instead? We don’t really need to put them last, just probably not at the very beginning.
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Do we still want/need to deal with NULL values in the ORDER BY clause? I wonder if |
@@ -244,8 +245,9 @@ def assign_unassigned( | |||
def get_sorted_triggers(cls, capacity, alive_triggerer_ids, session): | |||
query = with_row_locks( | |||
select(cls.id) | |||
.join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is no Triggers to process without being associated to a TI (if there is no TI with the trigger id, it means that the trigger is already executed and has returned an event) , it should be fine to use inner join here.
apache#32318) * Order triggers by - TI priority_weight when assign unassigned triggers Signed-off-by: Hussein Awala <hussein@awala.fr> * Update airflow/models/trigger.py Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> * Replace outer join by inner join and use coalesce to handle None values * fix unit tests --------- Signed-off-by: Hussein Awala <hussein@awala.fr> Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com> Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com>
This PR updates the
get_sorted_triggers
method used to retrieve the top X triggers that are unassigned or assigned to inactive triggerers. Currently, the triggers are ordered based on theircreated_date
, with the oldest X triggers being returned. However, this PR introduces an update that prioritizes triggers associated with TIs of higher priority. In cases where multiple triggers share the same priority, the older triggers will be included.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.