diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index de6eac73a9b65..489b7127e2d60 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -38,13 +38,8 @@ datetime_range_filter_factory, ) from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.core_api.datamodels.ui.common import ( - GridNodeResponse, - GridRunsResponse, -) -from airflow.api_fastapi.core_api.datamodels.ui.grid import ( - GridTISummaries, -) +from airflow.api_fastapi.core_api.datamodels.ui.common import GridNodeResponse, GridRunsResponse +from airflow.api_fastapi.core_api.datamodels.ui.grid import GridTISummaries from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_access_dag from airflow.api_fastapi.core_api.services.ui.grid import ( @@ -68,9 +63,7 @@ def _get_latest_serdag(dag_id, session): serdag = session.scalar( select(SerializedDagModel) - .where( - SerializedDagModel.dag_id == dag_id, - ) + .where(SerializedDagModel.dag_id == dag_id) .order_by(SerializedDagModel.id.desc()) .limit(1) ) @@ -92,9 +85,7 @@ def _get_serdag(dag_id, dag_version_id, session) -> SerializedDagModel | None: if not version: version = session.scalar( select(DagVersion) - .where( - DagVersion.dag_id == dag_id, - ) + .where(DagVersion.dag_id == dag_id) .options(joinedload(DagVersion.serialized_dag)) .order_by(DagVersion.id) # ascending cus this is mostly for pre-3.0 upgrade .limit(1) @@ -166,11 +157,7 @@ def get_dag_structure( SerializedDagModel.dag_id == dag_id, SerializedDagModel.id != latest_serdag.id, SerializedDagModel.dag_version_id.in_( - select(TaskInstance.dag_version_id) - .join(TaskInstance.dag_run) - .where( - DagRun.id.in_(run_ids), - ) + select(TaskInstance.dag_version_id).join(TaskInstance.dag_run).where(DagRun.id.in_(run_ids)) ), ) ) @@ -346,31 +333,44 @@ def get_grid_ti_summaries( TaskInstance.dag_version_id, TaskInstance.start_date, TaskInstance.end_date, + TaskInstance.duration, ) .where(TaskInstance.dag_id == dag_id) - .where( - TaskInstance.run_id == run_id, - ) + .where(TaskInstance.run_id == run_id) ), filters=[], order_by=SortParam(allowed_attrs=["task_id", "run_id"], model=TaskInstance).set_value(["task_id"]), limit=None, return_total_entries=False, ) - task_instances = list(session.execute(tis_of_dag_runs)) + + task_instances = session.scalars(tis_of_dag_runs).all() if not task_instances: raise HTTPException( status.HTTP_404_NOT_FOUND, f"No task instances for dag_id={dag_id} run_id={run_id}" ) - ti_details = collections.defaultdict(list) + + ti_details: dict[str, list[dict[str, object | None]]] = collections.defaultdict(list) for ti in task_instances: ti_details[ti.task_id].append( { "state": ti.state, "start_date": ti.start_date, "end_date": ti.end_date, + "duration": ti.duration, } ) + + # Pre-compute min start / max end per leaf task (for tooltip) and pick a representative duration. + task_min_max: dict[str, tuple[object | None, object | None]] = {} + task_first_duration: dict[str, float | None] = {} + for task_id, items in ti_details.items(): + starts = [i["start_date"] for i in items if i.get("start_date") is not None] + ends = [i["end_date"] for i in items if i.get("end_date") is not None] + task_min_max[task_id] = (min(starts) if starts else None, max(ends) if ends else None) + dur = next((i["duration"] for i in items if i.get("duration") is not None), None) + task_first_duration[task_id] = dur # may be None + serdag = _get_serdag( dag_id=dag_id, dag_version_id=task_instances[0].dag_version_id, @@ -379,10 +379,10 @@ def get_grid_ti_summaries( if TYPE_CHECKING: assert serdag - def get_node_sumaries(): + def gen_nodes(): yielded_task_ids: set[str] = set() - # Yield all nodes discoverable from the serialized DAG structure + # Emit nodes discovered from the serialized DAG structure for node in _find_aggregates( node=serdag.dag.task_group, parent_node=None, @@ -391,33 +391,34 @@ def get_node_sumaries(): if node["type"] in {"task", "mapped_task"}: yielded_task_ids.add(node["task_id"]) if node["type"] == "task": + # Attach min/max and DB-backed duration for leaf tasks. + min_start, max_end = task_min_max.get(node["task_id"], (None, None)) + node["min_start_date"] = min_start + node["max_end_date"] = max_end + node["duration"] = task_first_duration.get(node["task_id"]) node["child_states"] = None - node["min_start_date"] = None - node["max_end_date"] = None yield node - # For good history: add synthetic leaf nodes for task_ids that have TIs in this run - # but are not present in the current DAG structure (e.g. removed tasks) - missing_task_ids = set(ti_details.keys()) - yielded_task_ids - for task_id in sorted(missing_task_ids): - detail = ti_details[task_id] - # Create a leaf task node with aggregated state from its TIs - agg = _get_aggs_for_node(detail) + # Add synthetic leaves for historical/removed tasks present only in TI table + missing = set(ti_details.keys()) - yielded_task_ids + for task_id in sorted(missing): + agg = _get_aggs_for_node(ti_details[task_id]) + min_start, max_end = task_min_max.get(task_id, (None, None)) yield { "task_id": task_id, "type": "task", "parent_id": None, **agg, - # Align with leaf behavior "child_states": None, - "min_start_date": None, - "max_end_date": None, + "min_start_date": min_start, + "max_end_date": max_end, + "duration": task_first_duration.get(task_id), } - task_instances = list(get_node_sumaries()) + task_nodes = list(gen_nodes()) # If a group id and a task id collide, prefer the group record - group_ids = {n.get("task_id") for n in task_instances if n.get("type") == "group"} - filtered = [n for n in task_instances if not (n.get("type") == "task" and n.get("task_id") in group_ids)] + group_ids = {n.get("task_id") for n in task_nodes if n.get("type") == "group"} + filtered = [n for n in task_nodes if not (n.get("type") == "task" and n.get("task_id") in group_ids)] return { # type: ignore[return-value] "run_id": run_id, diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/GridTI.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/GridTI.tsx index 645728236dfee..69d36d519f7bb 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/GridTI.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/GridTI.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Badge, Flex } from "@chakra-ui/react"; +import { Badge, Flex, Box, Text } from "@chakra-ui/react"; import type { MouseEvent } from "react"; import React, { useCallback } from "react"; import { useTranslation } from "react-i18next"; @@ -27,26 +27,25 @@ import { StateIcon } from "src/components/StateIcon"; import Time from "src/components/Time"; import { Tooltip } from "src/components/ui"; import { type HoverContextType, useHover } from "src/context/hover"; +import { getDuration, renderDuration } from "src/utils"; import { buildTaskInstanceUrl } from "src/utils/links"; const handleMouseEnter = (setHoveredTaskId: HoverContextType["setHoveredTaskId"]) => (event: MouseEvent) => { const tasks = document.querySelectorAll(`#${event.currentTarget.id}`); - tasks.forEach((task) => { - task.style.backgroundColor = "var(--chakra-colors-info-subtle)"; + tasks.forEach((taskEl) => { + taskEl.style.backgroundColor = "var(--chakra-colors-info-subtle)"; }); - setHoveredTaskId(event.currentTarget.id.replaceAll("-", ".")); }; const handleMouseLeave = (taskId: string, setHoveredTaskId: HoverContextType["setHoveredTaskId"]) => () => { const tasks = document.querySelectorAll(`#${taskId.replaceAll(".", "-")}`); - tasks.forEach((task) => { - task.style.backgroundColor = ""; + tasks.forEach((taskEl) => { + taskEl.style.backgroundColor = ""; }); - setHoveredTaskId(undefined); }; @@ -65,8 +64,8 @@ type Props = { const Instance = ({ dagId, instance, isGroup, isMapped, onClick, runId, search, taskId }: Props) => { const { setHoveredTaskId } = useHover(); const { groupId: selectedGroupId, taskId: selectedTaskId } = useParams(); - const { t: translate } = useTranslation(); const location = useLocation(); + const { t: translate } = useTranslation("common"); const onMouseEnter = handleMouseEnter(setHoveredTaskId); const onMouseLeave = handleMouseLeave(taskId, setHoveredTaskId); @@ -84,10 +83,25 @@ const Instance = ({ dagId, instance, isGroup, isMapped, onClick, runId, search, [dagId, isGroup, isMapped, location.pathname, runId, taskId], ); + const start: string | undefined = instance?.min_start_date + const end: string | undefined = instance?.max_end_date + const hasStart = start !== undefined; + const hasEnd = end !== undefined; + + const serverDurationUnknown = (instance as unknown as { duration?: unknown }).duration; + const serverDurationSeconds = typeof serverDurationUnknown === "number" ? serverDurationUnknown : undefined; + + const durationText = + serverDurationSeconds === undefined ? getDuration(start, end) : renderDuration(serverDurationSeconds); + + const isSelected = + ((selectedTaskId ?? undefined) !== undefined && selectedTaskId === taskId) || + ((selectedGroupId ?? undefined) !== undefined && selectedGroupId === taskId); + return ( - {translate("taskId")}: {taskId} -
- {translate("state")}: {instance.state} - {instance.min_start_date !== null && ( - <> -
- {translate("startDate")}: