From fa50004b7dff12bf8431ac1833ea949b43351ad0 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 13 Jun 2022 09:04:07 +0200 Subject: [PATCH] Fix flaky order of returned dag runs (#24405) There was no ordering on a query returning dag_runs when it comes to grid view. This caused flaky tests but also it would have caused problems with random reordering of reported dagruns in the UI (it seems). This change adds stable ordering on returned Dag Runs: * by dag_run_id (ascending) asc No need to filter by map_index as there will be always max one returned TI from each dag run (cherry picked from commit 2edab57d4e8ccbd5b8f66c3951615c169fb0543e) --- airflow/www/utils.py | 34 +++++++++++++++++++--------------- airflow/www/views.py | 2 -- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 3c63584e88ad1..feeedd0cefce3 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -130,21 +130,25 @@ def get_mapped_summary(parent_instance, task_instances): def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]: - tis = session.query( - TaskInstance.dag_id, - TaskInstance.task_id, - TaskInstance.run_id, - TaskInstance.map_index, - TaskInstance.state, - TaskInstance.start_date, - TaskInstance.end_date, - TaskInstance._try_number, - ).filter( - TaskInstance.dag_id == task.dag_id, - TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), - TaskInstance.task_id == task.task_id, - # Only get normal task instances or the first mapped task - TaskInstance.map_index <= 0, + tis = ( + session.query( + TaskInstance.dag_id, + TaskInstance.task_id, + TaskInstance.run_id, + TaskInstance.map_index, + TaskInstance.state, + TaskInstance.start_date, + TaskInstance.end_date, + TaskInstance._try_number, + ) + .filter( + TaskInstance.dag_id == task.dag_id, + TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), + TaskInstance.task_id == task.task_id, + # Only get normal task instances or the first mapped task + TaskInstance.map_index <= 0, + ) + .order_by(TaskInstance.run_id.asc()) ) def _get_summary(task_instance): diff --git a/airflow/www/views.py b/airflow/www/views.py index 0cea403fb9316..fe4217c4525a6 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3543,7 +3543,6 @@ def grid_data(self): 'groups': task_group_to_grid(dag.task_group, dag, dag_runs, session), 'dag_runs': encoded_runs, } - # avoid spaces to reduce payload size return htmlsafe_json_dumps(data, separators=(',', ':')) @@ -3581,7 +3580,6 @@ def audit_log(self, session=None): query = query.filter(Log.event.notin_(excluded_events)) dag_audit_logs = query.all() - content = self.render_template( 'airflow/dag_audit_log.html', dag=dag,