diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 2bdc2939bfd61..0a7de05abffb1 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -128,39 +128,38 @@ def get_mapped_summary(parent_instance, task_instances): } -def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]: - task_instance = ( - session.query(TaskInstance) - .filter( - TaskInstance.dag_id == task.dag_id, - TaskInstance.run_id == dag_run.run_id, - TaskInstance.task_id == task.task_id, - # Only get normal task instances or the first mapped task - TaskInstance.map_index <= 0, - ) - .first() +def get_task_summaries(task, dag_runs: List[DagRun], session: Session) -> List[Dict[str, Any]]: + tis = session.query(TaskInstance).filter( + TaskInstance.dag_id == task.dag_id, + TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), + TaskInstance.task_id == task.task_id, + # Only get normal task instances or the first mapped task + TaskInstance.map_index <= 0, ) - if not task_instance: - return None + def _get_summary(task_instance): + if task_instance.map_index > -1: + return get_mapped_summary( + task_instance, task_instances=get_mapped_instances(task_instance, session) + ) - if task_instance.map_index > -1: - return get_mapped_summary(task_instance, task_instances=get_mapped_instances(task_instance, session)) + try_count = ( + task_instance.prev_attempted_tries + if task_instance.prev_attempted_tries != 0 + else task_instance.try_number + ) - try_count = ( - task_instance.prev_attempted_tries - if task_instance.prev_attempted_tries != 0 - else task_instance.try_number - ) - return { - 'task_id': task_instance.task_id, - 'run_id': task_instance.run_id, - 'map_index': task_instance.map_index, - 'state': task_instance.state, - 'start_date': datetime_to_string(task_instance.start_date), - 'end_date': datetime_to_string(task_instance.end_date), - 'try_number': try_count, - } + return { + 'task_id': task_instance.task_id, + 'run_id': task_instance.run_id, + 'map_index': task_instance.map_index, + 'state': task_instance.state, + 'start_date': datetime_to_string(task_instance.start_date), + 'end_date': datetime_to_string(task_instance.end_date), + 'try_number': try_count, + } + + return [_get_summary(ti) for ti in tis] def encode_dag_run(dag_run: Optional[models.DagRun]) -> Optional[Dict[str, Any]]: diff --git a/airflow/www/views.py b/airflow/www/views.py index 4c3d33a6b1e05..29e15bfe16bd6 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -258,7 +258,7 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session): if isinstance(task_item_or_group, AbstractOperator): return { 'id': task_item_or_group.task_id, - 'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs], + 'instances': wwwutils.get_task_summaries(task_item_or_group, dag_runs, session), 'label': task_item_or_group.label, 'extra_links': task_item_or_group.extra_links, 'is_mapped': task_item_or_group.is_mapped,