diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py index cc4d7913b2244..0f315326194e5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py @@ -17,9 +17,14 @@ from __future__ import annotations +from datetime import datetime from typing import Generic, Literal, TypeVar +from pydantic import computed_field + from airflow.api_fastapi.core_api.base import BaseModel +from airflow.utils.state import TaskInstanceState +from airflow.utils.types import DagRunType class BaseEdgeResponse(BaseModel): @@ -52,8 +57,46 @@ class BaseNodeResponse(BaseModel): N = TypeVar("N", bound=BaseNodeResponse) +class GridNodeResponse(BaseModel): + """Base Node serializer for responses.""" + + id: str + label: str + children: list[GridNodeResponse] | None = None + is_mapped: bool | None + setup_teardown_type: Literal["setup", "teardown"] | None = None + + +class GridRunsResponse(BaseModel): + """Base Node serializer for responses.""" + + dag_id: str + run_id: str + queued_at: datetime | None + start_date: datetime | None + end_date: datetime | None + run_after: datetime + state: TaskInstanceState | None + run_type: DagRunType + + @computed_field + def duration(self) -> int | None: + if self.start_date and self.end_date: + return (self.end_date - self.start_date).seconds + return None + + class BaseGraphResponse(BaseModel, Generic[E, N]): """Base Graph serializer for responses.""" edges: list[E] nodes: list[N] + + +class LatestRunResponse(BaseModel): + """Base Node serializer for responses.""" + + id: int + dag_id: str + run_id: str + run_after: datetime diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py index 822eb6f3e1a89..48ea0ece79495 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/grid.py @@ -21,7 +21,6 @@ from pydantic import BaseModel, Field -from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType @@ -40,6 +39,13 @@ class GridTaskInstanceSummary(BaseModel): note: str | None +class LightGridTaskInstanceSummary(BaseModel): + """Task Instance Summary model for the Grid UI.""" + + task_id: str + state: TaskInstanceState | None + + class GridDAGRunwithTIs(BaseModel): """DAG Run model for the Grid UI.""" @@ -57,8 +63,15 @@ class GridDAGRunwithTIs(BaseModel): task_instances: list[GridTaskInstanceSummary] +class GridTISummaries(BaseModel): + """DAG Run model for the Grid UI.""" + + run_id: str + dag_id: str + task_instances: list[LightGridTaskInstanceSummary] + + class GridResponse(BaseModel): """Response model for the Grid UI.""" dag_runs: list[GridDAGRunwithTIs] - structure: StructureDataResponse diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 660580cbe8400..ffb90dcf6e03f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -604,6 +604,293 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /ui/grid/structure/{dag_id}: + get: + tags: + - Grid + summary: Get Dag Structure + description: Return dag structure for grid view. + operationId: get_dag_structure + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: order_by + in: query + required: false + schema: + type: string + default: id + title: Order By + - name: run_after_gte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Gte + - name: run_after_lte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Lte + responses: + '200': + description: Successful Response + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/GridNodeResponse' + title: Response Get Dag Structure + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /ui/grid/runs/{dag_id}: + get: + tags: + - Grid + summary: Get Grid Runs + description: Get info about a run for the grid. + operationId: get_grid_runs + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: order_by + in: query + required: false + schema: + type: string + default: id + title: Order By + - name: run_after_gte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Gte + - name: run_after_lte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Lte + responses: + '200': + description: Successful Response + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/GridRunsResponse' + title: Response Get Grid Runs + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /ui/grid/ti_summaries/{dag_id}/{run_id}: + get: + tags: + - Grid + summary: Get Grid Ti Summaries + description: 'Get states for TIs / "groups" of TIs. + + + Essentially this is to know what color to put in the squares in the grid. + + + The tricky part here is that we aggregate the state for groups and mapped + tasks. + + + We don''t add all the TIs for mapped TIs -- we only add one entry for the + mapped task and + + its state is an aggregate of its TI states. + + + And for task groups, we add a "task" for that which is not really a task but + is just + + an entry that represents the group (so that we can show a filled in box when + the group + + is not expanded) and its state is an agg of those within it.' + operationId: get_grid_ti_summaries + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: run_id + in: path + required: true + schema: + type: string + title: Run Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/GridTISummaries' + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + /ui/grid/latest_run/{dag_id}: + get: + tags: + - Grid + summary: Get Latest Run + description: 'Get information about the latest dag run by run_after. + + + This is used by the UI to figure out if it needs to rerun queries and resume + auto refresh.' + operationId: get_latest_run + security: + - OAuth2PasswordBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + responses: + '200': + description: Successful Response + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/LatestRunResponse' + - type: 'null' + title: Response Get Latest Run + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' components: schemas: BackfillCollectionResponse: @@ -1469,6 +1756,41 @@ components: - task_instances title: GridDAGRunwithTIs description: DAG Run model for the Grid UI. + GridNodeResponse: + properties: + id: + type: string + title: Id + label: + type: string + title: Label + children: + anyOf: + - items: + $ref: '#/components/schemas/GridNodeResponse' + type: array + - type: 'null' + title: Children + is_mapped: + anyOf: + - type: boolean + - type: 'null' + title: Is Mapped + setup_teardown_type: + anyOf: + - type: string + enum: + - setup + - teardown + - type: 'null' + title: Setup Teardown Type + type: object + required: + - id + - label + - is_mapped + title: GridNodeResponse + description: Base Node serializer for responses. GridResponse: properties: dag_runs: @@ -1476,14 +1798,86 @@ components: $ref: '#/components/schemas/GridDAGRunwithTIs' type: array title: Dag Runs - structure: - $ref: '#/components/schemas/StructureDataResponse' type: object required: - dag_runs - - structure title: GridResponse description: Response model for the Grid UI. + GridRunsResponse: + properties: + dag_id: + type: string + title: Dag Id + run_id: + type: string + title: Run Id + queued_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Queued At + start_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Start Date + end_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: End Date + run_after: + type: string + format: date-time + title: Run After + state: + anyOf: + - $ref: '#/components/schemas/TaskInstanceState' + - type: 'null' + run_type: + $ref: '#/components/schemas/DagRunType' + duration: + anyOf: + - type: integer + - type: 'null' + title: Duration + readOnly: true + type: object + required: + - dag_id + - run_id + - queued_at + - start_date + - end_date + - run_after + - state + - run_type + - duration + title: GridRunsResponse + description: Base Node serializer for responses. + GridTISummaries: + properties: + run_id: + type: string + title: Run Id + dag_id: + type: string + title: Dag Id + task_instances: + items: + $ref: '#/components/schemas/LightGridTaskInstanceSummary' + type: array + title: Task Instances + type: object + required: + - run_id + - dag_id + - task_instances + title: GridTISummaries + description: DAG Run model for the Grid UI. GridTaskInstanceSummary: properties: task_id: @@ -1579,6 +1973,44 @@ components: - task_instance_states title: HistoricalMetricDataResponse description: Historical Metric Data serializer for responses. + LatestRunResponse: + properties: + id: + type: integer + title: Id + dag_id: + type: string + title: Dag Id + run_id: + type: string + title: Run Id + run_after: + type: string + format: date-time + title: Run After + type: object + required: + - id + - dag_id + - run_id + - run_after + title: LatestRunResponse + description: Base Node serializer for responses. + LightGridTaskInstanceSummary: + properties: + task_id: + type: string + title: Task Id + state: + anyOf: + - $ref: '#/components/schemas/TaskInstanceState' + - type: 'null' + type: object + required: + - task_id + - state + title: LightGridTaskInstanceSummary + description: Task Instance Summary model for the Grid UI. MenuItem: type: string enum: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index 09409335c2c09..ce1a582a511f7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -42,23 +42,35 @@ datetime_range_filter_factory, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.ui.common import ( + GridNodeResponse, + GridRunsResponse, + LatestRunResponse, +) from airflow.api_fastapi.core_api.datamodels.ui.grid import ( GridDAGRunwithTIs, GridResponse, + GridTISummaries, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_access_dag from airflow.api_fastapi.core_api.services.ui.grid import ( + _find_aggregates, + _merge_node_dicts, fill_task_instance_summaries, get_child_task_map, - get_combined_structure, - get_structure_from_dag, get_task_group_map, ) -from airflow.models import DagRun, TaskInstance from airflow.models.dag_version import DagVersion +from airflow.models.dagrun import DagRun +from airflow.models.serialized_dag import SerializedDagModel +from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import ( + get_task_group_children_getter, + task_group_to_dict_grid, +) log = structlog.get_logger(logger_name=__name__) grid_router = AirflowRouter(prefix="/grid", tags=["Grid"]) @@ -71,6 +83,7 @@ Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE)), Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN)), ], + response_model_exclude_none=True, ) def grid_data( dag_id: str, @@ -124,11 +137,9 @@ def grid_data( ) dag_runs = list(session.scalars(dag_runs_select_filter).unique()) - # Check if there are any DAG Runs with given criteria to eliminate unnecessary queries/errors if not dag_runs: - structure = get_structure_from_dag(dag=dag) - return GridResponse(dag_runs=[], structure=structure) + return GridResponse(dag_runs=[]) # Retrieve, sort and encode the Task Instances tis_of_dag_runs, _ = paginated_select( @@ -257,8 +268,324 @@ def grid_data( ) for dag_run in dag_runs ] + return GridResponse(dag_runs=grid_dag_runs) + + +def _get_latest_serdag(dag_id, session): + serdag = session.scalar( + select(SerializedDagModel) + .where( + SerializedDagModel.dag_id == dag_id, + ) + .order_by(SerializedDagModel.id.desc()) + .limit(1) + ) + if not serdag: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Dag with id {dag_id} was not found", + ) + return serdag + + +def _get_serdag(dag_id, dag_version_id, session) -> SerializedDagModel | None: + # this is a simplification - we account for structure based on the first task + version = session.scalar(select(DagVersion).where(DagVersion.id == dag_version_id)) + if not version: + version = session.scalar( + select(DagVersion) + .where( + DagVersion.dag_id == dag_id, + ) + .order_by(DagVersion.id) # ascending cus this is mostly for pre-3.0 upgrade + .limit(1) + ) + if not (serdag := version.serialized_dag): + log.error( + "No serialized dag found", + dag_id=dag_id, + version_id=version.id, + version_number=version.version_number, + ) + return serdag + + +@grid_router.get( + "/structure/{dag_id}", + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE)), + Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN)), + ], + response_model_exclude_none=True, +) +def get_dag_structure( + dag_id: str, + session: SessionDep, + offset: QueryOffset, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends(SortParam(["run_after", "logical_date", "start_date", "end_date"], DagRun).dynamic_depends()), + ], + run_after: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DagRun))], +) -> list[GridNodeResponse]: + """Return dag structure for grid view.""" + latest_serdag = _get_latest_serdag(dag_id, session) + latest_dag = latest_serdag.dag - flat_tis = itertools.chain.from_iterable(tis_by_run_id.values()) - structure = get_combined_structure(task_instances=flat_tis, session=session) + # Retrieve, sort the previous DAG Runs + base_query = select(DagRun.id).where(DagRun.dag_id == dag_id) + # This comparison is to fall back to DAG timetable when no order_by is provided + if order_by.value == order_by.get_primary_key_string(): + ordering = list(latest_dag.timetable.run_ordering) + order_by = SortParam( + allowed_attrs=ordering, + model=DagRun, + ).set_value(ordering[0]) + dag_runs_select_filter, _ = paginated_select( + statement=base_query, + order_by=order_by, + offset=offset, + filters=[run_after], + limit=limit, + ) + run_ids = list(session.scalars(dag_runs_select_filter)) + + task_group_sort = get_task_group_children_getter() + if not run_ids: + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] + return nodes - return GridResponse(dag_runs=grid_dag_runs, structure=structure) + serdags = session.scalars( + select(SerializedDagModel).where( + SerializedDagModel.dag_version_id.in_( + select(TaskInstance.dag_version_id) + .join(TaskInstance.dag_run) + .where( + DagRun.id.in_(run_ids), + SerializedDagModel.id != latest_serdag.id, + ) + ) + ) + ) + merged_nodes: list[GridNodeResponse] = [] + dags = [latest_dag] + for serdag in serdags: + if serdag: + dags.append(serdag.dag) + for dag in dags: + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)] + _merge_node_dicts(merged_nodes, nodes) + + return merged_nodes + + +@grid_router.get( + "/runs/{dag_id}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), + dependencies=[ + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.TASK_INSTANCE, + ) + ), + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.RUN, + ) + ), + ], + response_model_exclude_none=True, +) +def get_grid_runs( + dag_id: str, + session: SessionDep, + offset: QueryOffset, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + [ + "run_after", + "logical_date", + "start_date", + "end_date", + ], + DagRun, + ).dynamic_depends() + ), + ], + run_after: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DagRun))], +) -> list[GridRunsResponse]: + """Get info about a run for the grid.""" + # Retrieve, sort the previous DAG Runs + base_query = select( + DagRun.dag_id, + DagRun.run_id, + DagRun.queued_at, + DagRun.start_date, + DagRun.end_date, + DagRun.run_after, + DagRun.state, + DagRun.run_type, + ).where(DagRun.dag_id == dag_id) + + # This comparison is to fall back to DAG timetable when no order_by is provided + if order_by.value == order_by.get_primary_key_string(): + latest_serdag = _get_latest_serdag(dag_id, session) + latest_dag = latest_serdag.dag + ordering = list(latest_dag.timetable.run_ordering) + order_by = SortParam( + allowed_attrs=ordering, + model=DagRun, + ).set_value(ordering[0]) + dag_runs_select_filter, _ = paginated_select( + statement=base_query, + order_by=order_by, + offset=offset, + filters=[run_after], + limit=limit, + ) + return session.execute(dag_runs_select_filter) + + +@grid_router.get( + "/ti_summaries/{dag_id}/{run_id}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), + dependencies=[ + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.TASK_INSTANCE, + ) + ), + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.RUN, + ) + ), + ], + response_model_exclude_none=True, +) +def get_grid_ti_summaries( + dag_id: str, + run_id: str, + session: SessionDep, +) -> GridTISummaries: + """ + Get states for TIs / "groups" of TIs. + + Essentially this is to know what color to put in the squares in the grid. + + The tricky part here is that we aggregate the state for groups and mapped tasks. + + We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and + its state is an aggregate of its TI states. + + And for task groups, we add a "task" for that which is not really a task but is just + an entry that represents the group (so that we can show a filled in box when the group + is not expanded) and its state is an agg of those within it. + """ + tis_of_dag_runs, _ = paginated_select( + statement=( + select( + TaskInstance.task_id, + TaskInstance.state, + TaskInstance.dag_version_id, + ) + .where(TaskInstance.dag_id == dag_id) + .where( + TaskInstance.run_id == run_id, + ) + ), + filters=[], + order_by=SortParam(allowed_attrs=["task_id", "run_id"], model=TaskInstance).set_value("task_id"), + limit=None, + return_total_entries=False, + ) + task_instances = list(session.execute(tis_of_dag_runs)) + task_id_states = collections.defaultdict(list) + for ti in task_instances: + task_id_states[ti.task_id].append(ti.state) + + serdag = _get_serdag( + dag_id=dag_id, + dag_version_id=task_instances[0].dag_version_id, + session=session, + ) + if not serdag: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found") + tis = list( + _find_aggregates( + node=serdag.dag.task_group, + parent_node=None, + ti_states=task_id_states, + ) + ) + + return { # type: ignore[return-value] + "run_id": run_id, + "dag_id": dag_id, + "task_instances": list(tis), + } + + +@grid_router.get( + "/latest_run/{dag_id}", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), + dependencies=[ + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.TASK_INSTANCE, + ) + ), + Depends( + requires_access_dag( + method="GET", + access_entity=DagAccessEntity.RUN, + ) + ), + ], + response_model_exclude_none=True, +) +def get_latest_run( + dag_id: str, + session: SessionDep, +) -> LatestRunResponse | None: + """ + Get information about the latest dag run by run_after. + + This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. + """ + return session.execute( + select( + DagRun.id, + DagRun.dag_id, + DagRun.run_id, + DagRun.run_after, + ) + .where(DagRun.dag_id == dag_id) + .order_by(DagRun.run_after.desc()) + .limit(1) + ).one_or_none() diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py index 346676e14cd48..a69cafb7bbbda 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py @@ -18,6 +18,8 @@ from __future__ import annotations import contextlib +from collections import Counter +from collections.abc import Iterable from uuid import UUID import structlog @@ -309,3 +311,61 @@ def _get_node_by_id(nodes, node_id): if node["id"] == node_id: return node return {} + + +def _is_task_node_mapped_task_group(task_node: BaseOperator | MappedTaskGroup | TaskMap | None) -> bool: + """Check if the Task Node is a Mapped Task Group.""" + return type(task_node) is MappedTaskGroup + + +def agg_state(states): + states = Counter(states) + for state in state_priority: + if state in states: + return state + return "no_status" + + +def _find_aggregates( + node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap, + parent_node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap | None, + ti_states: dict[str, list[str]], +) -> Iterable[dict]: + """Recursively fill the Task Group Map.""" + node_id = node.node_id + parent_id = parent_node.node_id if parent_node else None + + if node is None: + return + + if isinstance(node, MappedOperator): + yield { + "task_id": node_id, + "type": "mapped_task", + "parent_id": parent_id, + "state": agg_state(ti_states[node_id]), + } + + return + if isinstance(node, TaskGroup): + states = [] + for child in get_task_group_children_getter()(node): + for child_node in _find_aggregates(node=child, parent_node=node, ti_states=ti_states): + states.append(child_node["state"]) + yield child_node + if node_id: + yield { + "task_id": node_id, + "type": "group", + "parent_id": parent_id, + "state": agg_state(states), + } + return + if isinstance(node, BaseOperator): + yield { + "task_id": node_id, + "type": "task", + "parent_id": parent_id, + "state": agg_state(ti_states[node_id]), + } + return diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 4c187c5575a21..d538e3e617596 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -669,6 +669,41 @@ export const UseGridServiceGridDataKeyFn = ({ dagId, includeDownstream, includeU runType?: string[]; state?: string[]; }, queryKey?: Array) => [useGridServiceGridDataKey, ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }])]; +export type GridServiceGetDagStructureDefaultResponse = Awaited>; +export type GridServiceGetDagStructureQueryResult = UseQueryResult; +export const useGridServiceGetDagStructureKey = "GridServiceGetDagStructure"; +export const UseGridServiceGetDagStructureKeyFn = ({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: Array) => [useGridServiceGetDagStructureKey, ...(queryKey ?? [{ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }])]; +export type GridServiceGetGridRunsDefaultResponse = Awaited>; +export type GridServiceGetGridRunsQueryResult = UseQueryResult; +export const useGridServiceGetGridRunsKey = "GridServiceGetGridRuns"; +export const UseGridServiceGetGridRunsKeyFn = ({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: Array) => [useGridServiceGetGridRunsKey, ...(queryKey ?? [{ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }])]; +export type GridServiceGetGridTiSummariesDefaultResponse = Awaited>; +export type GridServiceGetGridTiSummariesQueryResult = UseQueryResult; +export const useGridServiceGetGridTiSummariesKey = "GridServiceGetGridTiSummaries"; +export const UseGridServiceGetGridTiSummariesKeyFn = ({ dagId, runId }: { + dagId: string; + runId: string; +}, queryKey?: Array) => [useGridServiceGetGridTiSummariesKey, ...(queryKey ?? [{ dagId, runId }])]; +export type GridServiceGetLatestRunDefaultResponse = Awaited>; +export type GridServiceGetLatestRunQueryResult = UseQueryResult; +export const useGridServiceGetLatestRunKey = "GridServiceGetLatestRun"; +export const UseGridServiceGetLatestRunKeyFn = ({ dagId }: { + dagId: string; +}, queryKey?: Array) => [useGridServiceGetLatestRunKey, ...(queryKey ?? [{ dagId }])]; export type AssetServiceCreateAssetEventMutationResult = Awaited>; export type AssetServiceMaterializeAssetMutationResult = Awaited>; export type BackfillServiceCreateBackfillMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index a82b28a403ac2..5ba5f5f3620c7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1259,3 +1259,82 @@ export const ensureUseGridServiceGridDataData = (queryClient: QueryClient, { dag runType?: string[]; state?: string[]; }) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }), queryFn: () => GridService.gridData({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) }); +/** +* Get Dag Structure +* Return dag structure for grid view. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridNodeResponse Successful Response +* @throws ApiError +*/ +export const ensureUseGridServiceGetDagStructureData = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) }); +/** +* Get Grid Runs +* Get info about a run for the grid. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridRunsResponse Successful Response +* @throws ApiError +*/ +export const ensureUseGridServiceGetGridRunsData = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }), queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) }); +/** +* Get Grid Ti Summaries +* Get states for TIs / "groups" of TIs. +* +* Essentially this is to know what color to put in the squares in the grid. +* +* The tricky part here is that we aggregate the state for groups and mapped tasks. +* +* We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and +* its state is an aggregate of its TI states. +* +* And for task groups, we add a "task" for that which is not really a task but is just +* an entry that represents the group (so that we can show a filled in box when the group +* is not expanded) and its state is an agg of those within it. +* @param data The data for the request. +* @param data.dagId +* @param data.runId +* @returns GridTISummaries Successful Response +* @throws ApiError +*/ +export const ensureUseGridServiceGetGridTiSummariesData = (queryClient: QueryClient, { dagId, runId }: { + dagId: string; + runId: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) }); +/** +* Get Latest Run +* Get information about the latest dag run by run_after. +* +* This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. +* @param data The data for the request. +* @param data.dagId +* @returns unknown Successful Response +* @throws ApiError +*/ +export const ensureUseGridServiceGetLatestRunData = (queryClient: QueryClient, { dagId }: { + dagId: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }), queryFn: () => GridService.getLatestRun({ dagId }) }); diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 041d8282ecf64..f878624e22eb4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1259,3 +1259,82 @@ export const prefetchUseGridServiceGridData = (queryClient: QueryClient, { dagId runType?: string[]; state?: string[]; }) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }), queryFn: () => GridService.gridData({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) }); +/** +* Get Dag Structure +* Return dag structure for grid view. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridNodeResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseGridServiceGetDagStructure = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) }); +/** +* Get Grid Runs +* Get info about a run for the grid. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridRunsResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseGridServiceGetGridRuns = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }), queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) }); +/** +* Get Grid Ti Summaries +* Get states for TIs / "groups" of TIs. +* +* Essentially this is to know what color to put in the squares in the grid. +* +* The tricky part here is that we aggregate the state for groups and mapped tasks. +* +* We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and +* its state is an aggregate of its TI states. +* +* And for task groups, we add a "task" for that which is not really a task but is just +* an entry that represents the group (so that we can show a filled in box when the group +* is not expanded) and its state is an agg of those within it. +* @param data The data for the request. +* @param data.dagId +* @param data.runId +* @returns GridTISummaries Successful Response +* @throws ApiError +*/ +export const prefetchUseGridServiceGetGridTiSummaries = (queryClient: QueryClient, { dagId, runId }: { + dagId: string; + runId: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }), queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) }); +/** +* Get Latest Run +* Get information about the latest dag run by run_after. +* +* This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. +* @param data The data for the request. +* @param data.dagId +* @returns unknown Successful Response +* @throws ApiError +*/ +export const prefetchUseGridServiceGetLatestRun = (queryClient: QueryClient, { dagId }: { + dagId: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }), queryFn: () => GridService.getLatestRun({ dagId }) }); diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index ca8a496c167e7..3e3ba37467f1c 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1260,6 +1260,85 @@ export const useGridServiceGridData = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }, queryKey), queryFn: () => GridService.gridData({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) as TData, ...options }); /** +* Get Dag Structure +* Return dag structure for grid view. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridNodeResponse Successful Response +* @throws ApiError +*/ +export const useGridServiceGetDagStructure = = unknown[]>({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) as TData, ...options }); +/** +* Get Grid Runs +* Get info about a run for the grid. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridRunsResponse Successful Response +* @throws ApiError +*/ +export const useGridServiceGetGridRuns = = unknown[]>({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }, queryKey), queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) as TData, ...options }); +/** +* Get Grid Ti Summaries +* Get states for TIs / "groups" of TIs. +* +* Essentially this is to know what color to put in the squares in the grid. +* +* The tricky part here is that we aggregate the state for groups and mapped tasks. +* +* We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and +* its state is an aggregate of its TI states. +* +* And for task groups, we add a "task" for that which is not really a task but is just +* an entry that represents the group (so that we can show a filled in box when the group +* is not expanded) and its state is an agg of those within it. +* @param data The data for the request. +* @param data.dagId +* @param data.runId +* @returns GridTISummaries Successful Response +* @throws ApiError +*/ +export const useGridServiceGetGridTiSummaries = = unknown[]>({ dagId, runId }: { + dagId: string; + runId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey), queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData, ...options }); +/** +* Get Latest Run +* Get information about the latest dag run by run_after. +* +* This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. +* @param data The data for the request. +* @param data.dagId +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useGridServiceGetLatestRun = = unknown[]>({ dagId }: { + dagId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }, queryKey), queryFn: () => GridService.getLatestRun({ dagId }) as TData, ...options }); +/** * Create Asset Event * Create asset events. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index 17e46fa8bb18f..2ebe843edb37e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1259,3 +1259,82 @@ export const useGridServiceGridDataSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseGridServiceGridDataKeyFn({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }, queryKey), queryFn: () => GridService.gridData({ dagId, includeDownstream, includeUpstream, limit, logicalDateGte, logicalDateLte, offset, orderBy, root, runAfterGte, runAfterLte, runType, state }) as TData, ...options }); +/** +* Get Dag Structure +* Return dag structure for grid view. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridNodeResponse Successful Response +* @throws ApiError +*/ +export const useGridServiceGetDagStructureSuspense = = unknown[]>({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) as TData, ...options }); +/** +* Get Grid Runs +* Get info about a run for the grid. +* @param data The data for the request. +* @param data.dagId +* @param data.offset +* @param data.limit +* @param data.orderBy +* @param data.runAfterGte +* @param data.runAfterLte +* @returns GridRunsResponse Successful Response +* @throws ApiError +*/ +export const useGridServiceGetGridRunsSuspense = = unknown[]>({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }: { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string; + runAfterLte?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseGridServiceGetGridRunsKeyFn({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }, queryKey), queryFn: () => GridService.getGridRuns({ dagId, limit, offset, orderBy, runAfterGte, runAfterLte }) as TData, ...options }); +/** +* Get Grid Ti Summaries +* Get states for TIs / "groups" of TIs. +* +* Essentially this is to know what color to put in the squares in the grid. +* +* The tricky part here is that we aggregate the state for groups and mapped tasks. +* +* We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and +* its state is an aggregate of its TI states. +* +* And for task groups, we add a "task" for that which is not really a task but is just +* an entry that represents the group (so that we can show a filled in box when the group +* is not expanded) and its state is an agg of those within it. +* @param data The data for the request. +* @param data.dagId +* @param data.runId +* @returns GridTISummaries Successful Response +* @throws ApiError +*/ +export const useGridServiceGetGridTiSummariesSuspense = = unknown[]>({ dagId, runId }: { + dagId: string; + runId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseGridServiceGetGridTiSummariesKeyFn({ dagId, runId }, queryKey), queryFn: () => GridService.getGridTiSummaries({ dagId, runId }) as TData, ...options }); +/** +* Get Latest Run +* Get information about the latest dag run by run_after. +* +* This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. +* @param data The data for the request. +* @param data.dagId +* @returns unknown Successful Response +* @throws ApiError +*/ +export const useGridServiceGetLatestRunSuspense = = unknown[]>({ dagId }: { + dagId: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseGridServiceGetLatestRunKeyFn({ dagId }, queryKey), queryFn: () => GridService.getLatestRun({ dagId }) as TData, ...options }); diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 6f5f33136e54f..fceda3e7af1e2 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -6732,6 +6732,60 @@ export const $GridDAGRunwithTIs = { description: 'DAG Run model for the Grid UI.' } as const; +export const $GridNodeResponse = { + properties: { + id: { + type: 'string', + title: 'Id' + }, + label: { + type: 'string', + title: 'Label' + }, + children: { + anyOf: [ + { + items: { + '$ref': '#/components/schemas/GridNodeResponse' + }, + type: 'array' + }, + { + type: 'null' + } + ], + title: 'Children' + }, + is_mapped: { + anyOf: [ + { + type: 'boolean' + }, + { + type: 'null' + } + ], + title: 'Is Mapped' + }, + setup_teardown_type: { + anyOf: [ + { + type: 'string', + enum: ['setup', 'teardown'] + }, + { + type: 'null' + } + ], + title: 'Setup Teardown Type' + } + }, + type: 'object', + required: ['id', 'label', 'is_mapped'], + title: 'GridNodeResponse', + description: 'Base Node serializer for responses.' +} as const; + export const $GridResponse = { properties: { dag_runs: { @@ -6740,17 +6794,121 @@ export const $GridResponse = { }, type: 'array', title: 'Dag Runs' - }, - structure: { - '$ref': '#/components/schemas/StructureDataResponse' } }, type: 'object', - required: ['dag_runs', 'structure'], + required: ['dag_runs'], title: 'GridResponse', description: 'Response model for the Grid UI.' } as const; +export const $GridRunsResponse = { + properties: { + dag_id: { + type: 'string', + title: 'Dag Id' + }, + run_id: { + type: 'string', + title: 'Run Id' + }, + queued_at: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Queued At' + }, + start_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Start Date' + }, + end_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'End Date' + }, + run_after: { + type: 'string', + format: 'date-time', + title: 'Run After' + }, + state: { + anyOf: [ + { + '$ref': '#/components/schemas/TaskInstanceState' + }, + { + type: 'null' + } + ] + }, + run_type: { + '$ref': '#/components/schemas/DagRunType' + }, + duration: { + anyOf: [ + { + type: 'integer' + }, + { + type: 'null' + } + ], + title: 'Duration', + readOnly: true + } + }, + type: 'object', + required: ['dag_id', 'run_id', 'queued_at', 'start_date', 'end_date', 'run_after', 'state', 'run_type', 'duration'], + title: 'GridRunsResponse', + description: 'Base Node serializer for responses.' +} as const; + +export const $GridTISummaries = { + properties: { + run_id: { + type: 'string', + title: 'Run Id' + }, + dag_id: { + type: 'string', + title: 'Dag Id' + }, + task_instances: { + items: { + '$ref': '#/components/schemas/LightGridTaskInstanceSummary' + }, + type: 'array', + title: 'Task Instances' + } + }, + type: 'object', + required: ['run_id', 'dag_id', 'task_instances'], + title: 'GridTISummaries', + description: 'DAG Run model for the Grid UI.' +} as const; + export const $GridTaskInstanceSummary = { properties: { task_id: { @@ -6861,6 +7019,55 @@ export const $HistoricalMetricDataResponse = { description: 'Historical Metric Data serializer for responses.' } as const; +export const $LatestRunResponse = { + properties: { + id: { + type: 'integer', + title: 'Id' + }, + dag_id: { + type: 'string', + title: 'Dag Id' + }, + run_id: { + type: 'string', + title: 'Run Id' + }, + run_after: { + type: 'string', + format: 'date-time', + title: 'Run After' + } + }, + type: 'object', + required: ['id', 'dag_id', 'run_id', 'run_after'], + title: 'LatestRunResponse', + description: 'Base Node serializer for responses.' +} as const; + +export const $LightGridTaskInstanceSummary = { + properties: { + task_id: { + type: 'string', + title: 'Task Id' + }, + state: { + anyOf: [ + { + '$ref': '#/components/schemas/TaskInstanceState' + }, + { + type: 'null' + } + ] + } + }, + type: 'object', + required: ['task_id', 'state'], + title: 'LightGridTaskInstanceSummary', + description: 'Task Instance Summary model for the Grid UI.' +} as const; + export const $MenuItem = { type: 'string', enum: ['Assets', 'Audit Log', 'Config', 'Connections', 'Dags', 'Docs', 'Plugins', 'Pools', 'Providers', 'Variables', 'XComs'], diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 08ae1f9bb0052..02acbbca34361 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetDagReportsData, GetDagReportsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutData, LogoutResponse, GetAuthMenusResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GridDataData, GridDataResponse, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetLatestRunData, GetLatestRunResponse } from './types.gen'; export class AssetService { /** @@ -3481,4 +3481,135 @@ export class GridService { }); } + /** + * Get Dag Structure + * Return dag structure for grid view. + * @param data The data for the request. + * @param data.dagId + * @param data.offset + * @param data.limit + * @param data.orderBy + * @param data.runAfterGte + * @param data.runAfterLte + * @returns GridNodeResponse Successful Response + * @throws ApiError + */ + public static getDagStructure(data: GetDagStructureData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/grid/structure/{dag_id}', + path: { + dag_id: data.dagId + }, + query: { + offset: data.offset, + limit: data.limit, + order_by: data.orderBy, + run_after_gte: data.runAfterGte, + run_after_lte: data.runAfterLte + }, + errors: { + 400: 'Bad Request', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Get Grid Runs + * Get info about a run for the grid. + * @param data The data for the request. + * @param data.dagId + * @param data.offset + * @param data.limit + * @param data.orderBy + * @param data.runAfterGte + * @param data.runAfterLte + * @returns GridRunsResponse Successful Response + * @throws ApiError + */ + public static getGridRuns(data: GetGridRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/grid/runs/{dag_id}', + path: { + dag_id: data.dagId + }, + query: { + offset: data.offset, + limit: data.limit, + order_by: data.orderBy, + run_after_gte: data.runAfterGte, + run_after_lte: data.runAfterLte + }, + errors: { + 400: 'Bad Request', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Get Grid Ti Summaries + * Get states for TIs / "groups" of TIs. + * + * Essentially this is to know what color to put in the squares in the grid. + * + * The tricky part here is that we aggregate the state for groups and mapped tasks. + * + * We don't add all the TIs for mapped TIs -- we only add one entry for the mapped task and + * its state is an aggregate of its TI states. + * + * And for task groups, we add a "task" for that which is not really a task but is just + * an entry that represents the group (so that we can show a filled in box when the group + * is not expanded) and its state is an agg of those within it. + * @param data The data for the request. + * @param data.dagId + * @param data.runId + * @returns GridTISummaries Successful Response + * @throws ApiError + */ + public static getGridTiSummaries(data: GetGridTiSummariesData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/grid/ti_summaries/{dag_id}/{run_id}', + path: { + dag_id: data.dagId, + run_id: data.runId + }, + errors: { + 400: 'Bad Request', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + + /** + * Get Latest Run + * Get information about the latest dag run by run_after. + * + * This is used by the UI to figure out if it needs to rerun queries and resume auto refresh. + * @param data The data for the request. + * @param data.dagId + * @returns unknown Successful Response + * @throws ApiError + */ + public static getLatestRun(data: GetLatestRunData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/ui/grid/latest_run/{dag_id}', + path: { + dag_id: data.dagId + }, + errors: { + 400: 'Bad Request', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + } \ No newline at end of file diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 692a290abccfe..18c4d48a09f94 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1713,12 +1713,46 @@ export type GridDAGRunwithTIs = { task_instances: Array; }; +/** + * Base Node serializer for responses. + */ +export type GridNodeResponse = { + id: string; + label: string; + children?: Array | null; + is_mapped: boolean | null; + setup_teardown_type?: 'setup' | 'teardown' | null; +}; + /** * Response model for the Grid UI. */ export type GridResponse = { dag_runs: Array; - structure: StructureDataResponse; +}; + +/** + * Base Node serializer for responses. + */ +export type GridRunsResponse = { + dag_id: string; + run_id: string; + queued_at: string | null; + start_date: string | null; + end_date: string | null; + run_after: string; + state: TaskInstanceState | null; + run_type: DagRunType; + readonly duration: number | null; +}; + +/** + * DAG Run model for the Grid UI. + */ +export type GridTISummaries = { + run_id: string; + dag_id: string; + task_instances: Array; }; /** @@ -1747,6 +1781,24 @@ export type HistoricalMetricDataResponse = { task_instance_states: TaskInstanceStateCount; }; +/** + * Base Node serializer for responses. + */ +export type LatestRunResponse = { + id: number; + dag_id: string; + run_id: string; + run_after: string; +}; + +/** + * Task Instance Summary model for the Grid UI. + */ +export type LightGridTaskInstanceSummary = { + task_id: string; + state: TaskInstanceState | null; +}; + /** * Define all menu items defined in the menu. */ @@ -2826,6 +2878,41 @@ export type GridDataData = { export type GridDataResponse = GridResponse; +export type GetDagStructureData = { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string | null; + runAfterLte?: string | null; +}; + +export type GetDagStructureResponse = Array; + +export type GetGridRunsData = { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; + runAfterGte?: string | null; + runAfterLte?: string | null; +}; + +export type GetGridRunsResponse = Array; + +export type GetGridTiSummariesData = { + dagId: string; + runId: string; +}; + +export type GetGridTiSummariesResponse = GridTISummaries; + +export type GetLatestRunData = { + dagId: string; +}; + +export type GetLatestRunResponse = LatestRunResponse | null; + export type $OpenApiTs = { '/api/v2/assets': { get: { @@ -5757,4 +5844,96 @@ export type $OpenApiTs = { }; }; }; + '/ui/grid/structure/{dag_id}': { + get: { + req: GetDagStructureData; + res: { + /** + * Successful Response + */ + 200: Array; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/ui/grid/runs/{dag_id}': { + get: { + req: GetGridRunsData; + res: { + /** + * Successful Response + */ + 200: Array; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/ui/grid/ti_summaries/{dag_id}/{run_id}': { + get: { + req: GetGridTiSummariesData; + res: { + /** + * Successful Response + */ + 200: GridTISummaries; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + '/ui/grid/latest_run/{dag_id}': { + get: { + req: GetLatestRunData; + res: { + /** + * Successful Response + */ + 200: LatestRunResponse | null; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; }; \ No newline at end of file diff --git a/airflow-core/src/airflow/ui/src/components/DurationChart.tsx b/airflow-core/src/airflow/ui/src/components/DurationChart.tsx index a6159a1f91d36..190ee70d496e6 100644 --- a/airflow-core/src/airflow/ui/src/components/DurationChart.tsx +++ b/airflow-core/src/airflow/ui/src/components/DurationChart.tsx @@ -34,7 +34,7 @@ import { Bar } from "react-chartjs-2"; import { useTranslation } from "react-i18next"; import { useNavigate } from "react-router-dom"; -import type { TaskInstanceResponse, DAGRunResponse } from "openapi/requests/types.gen"; +import type { TaskInstanceResponse, GridRunsResponse } from "openapi/requests/types.gen"; import { system } from "src/theme"; ChartJS.register( @@ -54,7 +54,7 @@ const average = (ctx: PartialEventContext, index: number) => { return values === undefined ? 0 : values.reduce((initial, next) => initial + next, 0) / values.length; }; -type RunResponse = DAGRunResponse | TaskInstanceResponse; +type RunResponse = GridRunsResponse | TaskInstanceResponse; const getDuration = (start: string, end: string | null) => dayjs.duration(dayjs(end).diff(start)).asSeconds(); @@ -115,7 +115,7 @@ export const DurationChart = ({ data: entries.map((entry: RunResponse) => { switch (kind) { case "Dag Run": { - const run = entry as DAGRunResponse; + const run = entry as GridRunsResponse; return run.queued_at !== null && run.start_date !== null && run.queued_at < run.start_date ? Number(getDuration(run.queued_at, run.start_date)) @@ -158,18 +158,19 @@ export const DurationChart = ({ return; } - const entry = entries[element.index]; - const baseUrl = `/dags/${entry?.dag_id}/runs/${entry?.dag_run_id}`; - switch (kind) { case "Dag Run": { + const entry = entries[element.index] as GridRunsResponse | undefined; + const baseUrl = `/dags/${entry?.dag_id}/runs/${entry?.run_id}`; + navigate(baseUrl); break; } case "Task Instance": { - const taskInstance = entry as TaskInstanceResponse; + const entry = entries[element.index] as TaskInstanceResponse | undefined; + const baseUrl = `/dags/${entry?.dag_id}/runs/${entry?.dag_run_id}`; - navigate(`${baseUrl}/tasks/${taskInstance.task_id}`); + navigate(`${baseUrl}/tasks/${entry?.task_id}`); break; } default: diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx index f73d9c3d49a2e..459a281ae70d2 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/DagRunSelect.tsx @@ -21,13 +21,13 @@ import { forwardRef, useMemo } from "react"; import { useTranslation } from "react-i18next"; import { useNavigate, useParams } from "react-router-dom"; -import type { GridDAGRunwithTIs } from "openapi/requests/types.gen"; +import type { GridRunsResponse } from "openapi/requests/types.gen"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; -import { useGrid } from "src/queries/useGrid"; +import { useGridRuns } from "src/queries/useGridRuns.ts"; type DagRunSelected = { - run: GridDAGRunwithTIs; + run: GridRunsResponse; value: string; }; @@ -40,34 +40,33 @@ export const DagRunSelect = forwardRef(({ lim const { t: translate } = useTranslation(["dag", "common"]); const navigate = useNavigate(); - const { data, isLoading } = useGrid(limit); - + const { data: gridRuns, isLoading } = useGridRuns({ limit }); const runOptions = useMemo( () => createListCollection({ - items: (data?.dag_runs ?? []).map((dr: GridDAGRunwithTIs) => ({ + items: (gridRuns ?? []).map((dr: GridRunsResponse) => ({ run: dr, - value: dr.dag_run_id, + value: dr.run_id, })), }), - [data], + [gridRuns], ); const selectDagRun = ({ items }: SelectValueChangeDetails) => { - const run = items.length > 0 ? `/runs/${items[0]?.run.dag_run_id}` : ""; + const runPartialPath = items.length > 0 ? `/runs/${items[0]?.run.run_id}` : ""; navigate({ - pathname: `/dags/${dagId}${run}/${taskId === undefined ? "" : `tasks/${taskId}`}`, + pathname: `/dags/${dagId}${runPartialPath}/${taskId === undefined ? "" : `tasks/${taskId}`}`, }); }; - const selectedRun = (data?.dag_runs ?? []).find((dr) => dr.dag_run_id === runId); + const selectedRun = (gridRuns ?? []).find((dr) => dr.run_id === runId); return ( ; - readonly run: RunWithDuration; + readonly run: GridRunsResponse; }; export const Bar = ({ max, nodes, run }: Props) => { const { dagId = "", runId } = useParams(); const [searchParams] = useSearchParams(); - const isSelected = runId === run.dag_run_id; + const isSelected = runId === run.run_id; const search = searchParams.toString(); + const { data: gridTISummaries } = useGridTiSummaries(run); return ( { color="white" dagId={dagId} flexDir="column" - height={`${(run.duration / max) * BAR_HEIGHT}px`} + height={`${((run.duration ?? 0) / max) * BAR_HEIGHT}px`} justifyContent="flex-end" label={run.run_after} minHeight="14px" - runId={run.dag_run_id} + runId={run.run_id} searchParams={search} state={run.state} zIndex={1} @@ -74,7 +77,11 @@ export const Bar = ({ max, nodes, run }: Props) => { {run.run_type !== "scheduled" && } - + ); }; diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx index 5a407c355e4b3..59b8d250c6407 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/Grid.tsx @@ -19,19 +19,22 @@ import { Box, Flex, IconButton } from "@chakra-ui/react"; import dayjs from "dayjs"; import dayjsDuration from "dayjs/plugin/duration"; -import { useMemo } from "react"; +import { useEffect, useMemo, useState } from "react"; import { useTranslation } from "react-i18next"; import { FiChevronsRight } from "react-icons/fi"; import { Link, useParams } from "react-router-dom"; +import type { GridRunsResponse } from "openapi/requests"; import { useOpenGroups } from "src/context/openGroups"; -import { useGrid } from "src/queries/useGrid"; +import { useGridRuns } from "src/queries/useGridRuns.ts"; +import { useGridStructure } from "src/queries/useGridStructure.ts"; +import { isStatePending } from "src/utils"; import { Bar } from "./Bar"; import { DurationAxis } from "./DurationAxis"; import { DurationTick } from "./DurationTick"; import { TaskNames } from "./TaskNames"; -import { flattenNodes, type RunWithDuration } from "./utils"; +import { flattenNodes } from "./utils"; dayjs.extend(dayjsDuration); @@ -41,36 +44,47 @@ type Props = { export const Grid = ({ limit }: Props) => { const { t: translate } = useTranslation("dag"); + + const [selectedIsVisible, setSelectedIsVisible] = useState(); + const [hasActiveRun, setHasActiveRun] = useState(); const { openGroupIds } = useOpenGroups(); - const { dagId = "" } = useParams(); + const { dagId = "", runId = "" } = useParams(); - const { data: gridData, isLoading, runAfter } = useGrid(limit); + const { data: gridRuns, isLoading } = useGridRuns({ limit }); - const runs: Array = useMemo( - () => - (gridData?.dag_runs ?? []).map((run) => { - const duration = dayjs - .duration(dayjs(run.end_date ?? undefined).diff(run.start_date ?? undefined)) - .asSeconds(); + // Check if the selected dag run is inside of the grid response, if not, we'll update the grid filters + // Eventually we should redo the api endpoint to make this work better + useEffect(() => { + if (gridRuns && runId) { + const run = gridRuns.find((dr: GridRunsResponse) => dr.run_id === runId); - return { - ...run, - duration, - }; - }), - [gridData?.dag_runs], - ); + if (!run) { + setSelectedIsVisible(false); + } + } + }, [runId, gridRuns, selectedIsVisible, setSelectedIsVisible]); + + useEffect(() => { + if (gridRuns) { + const run = gridRuns.some((dr: GridRunsResponse) => isStatePending(dr.state)); + if (!run) { + setHasActiveRun(false); + } + } + }, [gridRuns, setHasActiveRun]); + + const { data: dagStructure } = useGridStructure({ hasActiveRun, limit }); // calculate dag run bar heights relative to max const max = Math.max.apply( undefined, - runs.map((dr) => dr.duration), - ); - - const { flatNodes } = useMemo( - () => flattenNodes(gridData === undefined ? [] : gridData.structure.nodes, openGroupIds), - [gridData, openGroupIds], + gridRuns === undefined + ? [] + : gridRuns + .map((dr: GridRunsResponse) => dr.duration) + .filter((duration: number | null): duration is number => duration !== null), ); + const { flatNodes } = useMemo(() => flattenNodes(dagStructure, openGroupIds), [dagStructure, openGroupIds]); return ( @@ -83,7 +97,7 @@ export const Grid = ({ limit }: Props) => { - {Boolean(runs.length) && ( + {Boolean(gridRuns?.length) && ( <> @@ -92,11 +106,11 @@ export const Grid = ({ limit }: Props) => { )} - {runs.map((dr) => ( - + {gridRuns?.map((dr: GridRunsResponse) => ( + ))} - {runAfter === undefined ? undefined : ( + {selectedIsVisible === undefined || !selectedIsVisible ? undefined : ( ; readonly runId: string; - readonly taskInstances: Array; + readonly taskInstances: Array; }; export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => { @@ -37,6 +37,7 @@ export const TaskInstancesColumn = ({ nodes, runId, taskInstances }: Props) => { const search = searchParams.toString(); return nodes.map((node) => { + // todo: how does this work with mapped? same task id for multiple tis const taskInstance = taskInstances.find((ti) => ti.task_id === node.id); if (!taskInstance) { diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts index 4b17098faff63..d278597267a14 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Grid/utils.ts @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import type { GridDAGRunwithTIs, NodeResponse } from "openapi/requests/types.gen"; +import type { GridDAGRunwithTIs, GridNodeResponse } from "openapi/requests/types.gen"; export type RunWithDuration = { duration: number; @@ -26,31 +26,33 @@ export type GridTask = { depth: number; isGroup?: boolean; isOpen?: boolean; -} & NodeResponse; +} & GridNodeResponse; -export const flattenNodes = (nodes: Array, openGroupIds: Array, depth: number = 0) => { +export const flattenNodes = ( + nodes: Array | undefined, + openGroupIds: Array, + depth: number = 0, +) => { let flatNodes: Array = []; let allGroupIds: Array = []; - nodes.forEach((node) => { - if (node.type === "task") { - if (node.children) { - const { children, ...rest } = node; - - flatNodes.push({ ...rest, depth, isGroup: true, isOpen: openGroupIds.includes(node.id) }); - allGroupIds.push(node.id); - - const { allGroupIds: childGroupIds, flatNodes: childNodes } = flattenNodes( - children, - openGroupIds, - depth + 1, - ); - - flatNodes = [...flatNodes, ...(openGroupIds.includes(node.id) ? childNodes : [])]; - allGroupIds = [...allGroupIds, ...childGroupIds]; - } else { - flatNodes.push({ ...node, depth }); - } + nodes?.forEach((node) => { + if (node.children) { + const { children, ...rest } = node; + + flatNodes.push({ ...rest, depth, isGroup: true, isOpen: openGroupIds.includes(node.id) }); + allGroupIds.push(node.id); + + const { allGroupIds: childGroupIds, flatNodes: childNodes } = flattenNodes( + children, + openGroupIds, + depth + 1, + ); + + flatNodes = [...flatNodes, ...(openGroupIds.includes(node.id) ? childNodes : [])]; + allGroupIds = [...allGroupIds, ...childGroupIds]; + } else { + flatNodes.push({ ...node, depth }); } }); diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx index a412a99af2e95..69ffc13866a2f 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/ToggleGroups.tsx @@ -21,23 +21,22 @@ import { useMemo } from "react"; import { useTranslation } from "react-i18next"; import { MdExpand, MdCompress } from "react-icons/md"; import { useParams } from "react-router-dom"; +import { useLocalStorage } from "usehooks-ts"; -import { useStructureServiceStructureData } from "openapi/queries"; import { useOpenGroups } from "src/context/openGroups"; +import { useGridStructure } from "src/queries/useGridStructure.ts"; import { flattenNodes } from "./Grid/utils"; export const ToggleGroups = (props: ButtonGroupProps) => { const { t: translate } = useTranslation(); - const { dagId = "" } = useParams(); - const { data: structure } = useStructureServiceStructureData({ - dagId, - }); const { openGroupIds, setOpenGroupIds } = useOpenGroups(); - + const { dagId = "" } = useParams(); + const [limit] = useLocalStorage(`dag_runs_limit-${dagId}`, 10); + const { data: dagStructure } = useGridStructure({ limit }); const { allGroupIds } = useMemo( - () => flattenNodes(structure?.nodes ?? [], openGroupIds), - [structure?.nodes, openGroupIds], + () => flattenNodes(dagStructure, openGroupIds), + [dagStructure, openGroupIds], ); // Don't show button if the DAG has no task groups diff --git a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx index 20e6126de6a6e..665b47b5da492 100644 --- a/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Dag/Overview/Overview.tsx @@ -21,6 +21,7 @@ import dayjs from "dayjs"; import { lazy, useState, Suspense } from "react"; import { useTranslation } from "react-i18next"; import { useParams } from "react-router-dom"; +import { useLocalStorage } from "usehooks-ts"; import { useAssetServiceGetAssetEvents, @@ -31,7 +32,7 @@ import { AssetEvents } from "src/components/Assets/AssetEvents"; import { DurationChart } from "src/components/DurationChart"; import TimeRangeSelector from "src/components/TimeRangeSelector"; import { TrendCountButton } from "src/components/TrendCountButton"; -import { isStatePending, useAutoRefresh } from "src/utils"; +import { useGridRuns } from "src/queries/useGridRuns.ts"; const FailedLogs = lazy(() => import("./FailedLogs")); @@ -46,8 +47,6 @@ export const Overview = () => { const [endDate, setEndDate] = useState(now.toISOString()); const [assetSortBy, setAssetSortBy] = useState("-timestamp"); - const refetchInterval = useAutoRefresh({}); - const { data: failedTasks, isLoading } = useTaskInstanceServiceGetTaskInstances({ dagId: dagId ?? "", dagRunId: "~", @@ -57,28 +56,17 @@ export const Overview = () => { state: ["failed"], }); + const [limit] = useLocalStorage(`dag_runs_limit-${dagId}`, 10); const { data: failedRuns, isLoading: isLoadingFailedRuns } = useDagRunServiceGetDagRuns({ dagId: dagId ?? "", + limit, runAfterGte: startDate, runAfterLte: endDate, state: ["failed"], }); - - const { data: runs, isLoading: isLoadingRuns } = useDagRunServiceGetDagRuns( - { - dagId: dagId ?? "", - limit: 14, - orderBy: "-run_after", - }, - undefined, - { - refetchInterval: (query) => - query.state.data?.dag_runs.some((run) => isStatePending(run.state)) ? refetchInterval : false, - }, - ); - + const { data: gridRuns, isLoading: isLoadingRuns } = useGridRuns({ limit }); const { data: assetEventsData, isLoading: isLoadingAssetEvents } = useAssetServiceGetAssetEvents({ - limit: 6, + limit, orderBy: assetSortBy, sourceDagId: dagId, timestampGte: startDate, @@ -133,7 +121,7 @@ export const Overview = () => { {isLoadingRuns ? ( ) : ( - + )} {assetEventsData && assetEventsData.total_entries > 0 ? ( diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index 5bb722ecbd114..3e46a185be366 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -24,6 +24,7 @@ import type { TFunction } from "i18next"; import { useCallback } from "react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; +import { useLocalStorage } from "usehooks-ts"; import { useDagRunServiceGetDagRuns } from "openapi/queries"; import type { DAGRunResponse, DagRunState, DagRunType } from "openapi/requests/types.gen"; @@ -162,12 +163,13 @@ export const DagRuns = () => { const endDate = searchParams.get(END_DATE_PARAM); const refetchInterval = useAutoRefresh({}); + const [limit] = useLocalStorage(`dag_runs_limit-${dagId}`, 10); const { data, error, isLoading } = useDagRunServiceGetDagRuns( { dagId: dagId ?? "~", endDateLte: endDate ?? undefined, - limit: pagination.pageSize, + limit, offset: pagination.pageIndex * pagination.pageSize, orderBy, runType: filteredType === null ? undefined : [filteredType], diff --git a/airflow-core/src/airflow/ui/src/pages/Task/GroupTaskHeader.tsx b/airflow-core/src/airflow/ui/src/pages/Task/GroupTaskHeader.tsx index fc0a2f29233f5..34d55dc345846 100644 --- a/airflow-core/src/airflow/ui/src/pages/Task/GroupTaskHeader.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Task/GroupTaskHeader.tsx @@ -18,9 +18,8 @@ */ import { AiOutlineGroup } from "react-icons/ai"; -import type { NodeResponse } from "openapi/requests/types.gen"; import { HeaderCard } from "src/components/HeaderCard"; -export const GroupTaskHeader = ({ groupTask }: { readonly groupTask: NodeResponse }) => ( - } stats={[]} title={groupTask.label} /> +export const GroupTaskHeader = ({ title }: { readonly title: string }) => ( + } stats={[]} title={title} /> ); diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx b/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx index 1095642b3d2c5..e082c8dfe8ead 100644 --- a/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Task/Task.tsx @@ -22,8 +22,9 @@ import { LuChartColumn } from "react-icons/lu"; import { MdOutlineEventNote, MdOutlineTask } from "react-icons/md"; import { useParams } from "react-router-dom"; -import { useDagServiceGetDagDetails, useGridServiceGridData, useTaskServiceGetTask } from "openapi/queries"; +import { useDagServiceGetDagDetails, useTaskServiceGetTask } from "openapi/queries"; import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; +import { useGridStructure } from "src/queries/useGridStructure.ts"; import { getGroupTask } from "src/utils/groupTask"; import { GroupTaskHeader } from "./GroupTaskHeader"; @@ -49,18 +50,9 @@ export const Task = () => { enabled: groupId === undefined, }); - const { data: gridData } = useGridServiceGridData( - { - dagId, - includeDownstream: true, - includeUpstream: true, - }, - undefined, - { enabled: groupId !== undefined }, - ); + const { data: dagStructure } = useGridStructure({ limit: 1 }); - const groupTask = - groupId === undefined ? undefined : getGroupTask(gridData?.structure.nodes ?? [], groupId); + const groupTask = getGroupTask(dagStructure, groupId); const { data: dag, @@ -79,7 +71,7 @@ export const Task = () => { tabs={displayTabs} > {task === undefined ? undefined :
} - {groupTask ? : undefined} + {groupTask ? : undefined} ); diff --git a/airflow-core/src/airflow/ui/src/queries/useGrid.ts b/airflow-core/src/airflow/ui/src/queries/useGrid.ts deleted file mode 100644 index 9ebd1a02d6e47..0000000000000 --- a/airflow-core/src/airflow/ui/src/queries/useGrid.ts +++ /dev/null @@ -1,72 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import { keepPreviousData } from "@tanstack/react-query"; -import { useEffect, useState } from "react"; -import { useParams } from "react-router-dom"; - -import { useDagRunServiceGetDagRun, useGridServiceGridData } from "openapi/queries"; -import type { GridResponse } from "openapi/requests/types.gen"; -import { isStatePending, useAutoRefresh } from "src/utils"; - -export const useGrid = (limit: number) => { - const { dagId = "", runId = "" } = useParams(); - const [runAfter, setRunAfter] = useState(); - - const { data: dagRun } = useDagRunServiceGetDagRun( - { - dagId, - dagRunId: runId, - }, - undefined, - { enabled: runId !== "" }, - ); - - const refetchInterval = useAutoRefresh({ dagId }); - - // This is necessary for keepPreviousData - // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-arguments - const { data: gridData, ...rest } = useGridServiceGridData( - { - dagId, - limit, - orderBy: "-run_after", - runAfterLte: runAfter, - }, - undefined, - { - placeholderData: keepPreviousData, - refetchInterval: (query) => - query.state.data?.dag_runs.some((dr) => isStatePending(dr.state)) && refetchInterval, - }, - ); - - // Check if the selected dag run is inside of the grid response, if not, we'll update the grid filters - // Eventually we should redo the api endpoint to make this work better - useEffect(() => { - if (gridData?.dag_runs && dagRun) { - const hasRun = gridData.dag_runs.find((dr) => dr.dag_run_id === dagRun.dag_run_id); - - if (!hasRun) { - setRunAfter(dagRun.run_after); - } - } - }, [dagRun, gridData?.dag_runs, runAfter]); - - return { data: gridData, runAfter, ...rest }; -}; diff --git a/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts new file mode 100644 index 0000000000000..096630852d435 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useGridRuns.ts @@ -0,0 +1,44 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useParams } from "react-router-dom"; + +import { useGridServiceGetGridRuns } from "openapi/queries"; +import { isStatePending, useAutoRefresh } from "src/utils"; + +export const useGridRuns = ({ limit }: { limit: number }) => { + const { dagId = "" } = useParams(); + + const defaultRefetchInterval = useAutoRefresh({ dagId }); + + const { data: GridRuns, ...rest } = useGridServiceGetGridRuns( + { + dagId, + limit, + orderBy: "-run_after", + }, + undefined, + { + placeholderData: (prev) => prev, + refetchInterval: (query) => + query.state.data?.some((run) => isStatePending(run.state)) && defaultRefetchInterval, + }, + ); + + return { data: GridRuns, ...rest }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts new file mode 100644 index 0000000000000..eaeaede6050e6 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useGridStructure.ts @@ -0,0 +1,49 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useParams } from "react-router-dom"; + +import { useGridServiceGetDagStructure } from "openapi/queries"; +import { useAutoRefresh } from "src/utils"; + +export const useGridStructure = ({ + hasActiveRun = undefined, + limit, +}: { + hasActiveRun?: boolean; + limit?: number; +}) => { + const { dagId = "" } = useParams(); + const refetchInterval = useAutoRefresh({ dagId }); + + // This is necessary for keepPreviousData + const { data: dagStructure, ...rest } = useGridServiceGetDagStructure( + { + dagId, + limit, + orderBy: "-run_after", + }, + undefined, + { + placeholderData: (prev) => prev, + refetchInterval: hasActiveRun ? refetchInterval : false, + }, + ); + + return { data: dagStructure, ...rest }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts new file mode 100644 index 0000000000000..5bd3d1b85f014 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/queries/useGridTISummaries.ts @@ -0,0 +1,46 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useParams } from "react-router-dom"; + +import { useGridServiceGetGridTiSummaries } from "openapi/queries"; +import type { GridRunsResponse } from "openapi/requests"; +import { isStatePending, useAutoRefresh } from "src/utils"; + +export const useGridTiSummaries = (run: GridRunsResponse) => { + const { dagId = "" } = useParams(); + + const refetchInterval = useAutoRefresh({ dagId }); + + const { data: gridTiSummaries, ...rest } = useGridServiceGetGridTiSummaries( + { + dagId, + runId: run.run_id, + }, + undefined, + { + placeholderData: (prev) => prev, + refetchInterval: (query) => + (isStatePending(run.state) || + query.state.data?.task_instances.some((ti) => isStatePending(ti.state))) && + refetchInterval, + }, + ); + + return { data: gridTiSummaries, ...rest }; +}; diff --git a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts index 0df7402082e42..9f67c249d03c3 100644 --- a/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts +++ b/airflow-core/src/airflow/ui/src/queries/useRefreshOnNewDagRuns.ts @@ -20,13 +20,15 @@ import { useQueryClient } from "@tanstack/react-query"; import { useEffect, useRef } from "react"; import { - useDagRunServiceGetDagRuns, useDagServiceGetDagDetailsKey, UseDagRunServiceGetDagRunsKeyFn, UseDagServiceGetDagDetailsKeyFn, useDagServiceGetDagsUi, UseGridServiceGridDataKeyFn, UseTaskInstanceServiceGetTaskInstancesKeyFn, + useGridServiceGetLatestRun, + UseGridServiceGetDagStructureKeyFn, + UseGridServiceGetGridRunsKeyFn, } from "openapi/queries"; import { useConfig } from "./useConfig"; @@ -36,15 +38,15 @@ export const useRefreshOnNewDagRuns = (dagId: string, hasPendingRuns: boolean | const previousDagRunIdRef = useRef(); const autoRefreshInterval = useConfig("auto_refresh_interval") as number; - const { data } = useDagRunServiceGetDagRuns({ dagId, limit: 1, orderBy: "-run_after" }, undefined, { + const { data } = useGridServiceGetLatestRun({ dagId }, undefined, { enabled: Boolean(dagId) && !hasPendingRuns, refetchInterval: Boolean(autoRefreshInterval) ? autoRefreshInterval * 1000 : 5000, }); useEffect(() => { - const latestDagRun = data?.dag_runs[0]; + const latestDagRun = data; - const latestDagRunId = latestDagRun?.dag_run_id; + const latestDagRunId = latestDagRun?.run_id; if ((latestDagRunId ?? "") && previousDagRunIdRef.current !== latestDagRunId) { previousDagRunIdRef.current = latestDagRunId; @@ -56,6 +58,8 @@ export const useRefreshOnNewDagRuns = (dagId: string, hasPendingRuns: boolean | UseDagRunServiceGetDagRunsKeyFn({ dagId }, [{ dagId }]), UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId: "~" }, [{ dagId, dagRunId: "~" }]), UseGridServiceGridDataKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetDagStructureKeyFn({ dagId }, [{ dagId }]), + UseGridServiceGetGridRunsKeyFn({ dagId }, [{ dagId }]), ]; queryKeys.forEach((key) => { diff --git a/airflow-core/src/airflow/ui/src/utils/groupTask.ts b/airflow-core/src/airflow/ui/src/utils/groupTask.ts index 8b736dc7c8a9f..3438da3b77bed 100644 --- a/airflow-core/src/airflow/ui/src/utils/groupTask.ts +++ b/airflow-core/src/airflow/ui/src/utils/groupTask.ts @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import type { NodeResponse } from "openapi/requests/types.gen"; +import type { GridNodeResponse } from "openapi/requests/types.gen"; /** * Finds a task node by its ID in a tree of nodes @@ -24,12 +24,15 @@ import type { NodeResponse } from "openapi/requests/types.gen"; * @param targetId - ID of the node to find * @returns The found node or undefined if not found */ -export const getGroupTask = (nodes: Array, targetId: string): NodeResponse | undefined => { - if (!nodes.length || !targetId) { +export const getGroupTask = ( + nodes: Array | undefined, + targetId: string | undefined, +): GridNodeResponse | undefined => { + if (nodes === undefined || targetId === undefined || !nodes.length || !targetId) { return undefined; } - const queue: Array = [...nodes]; + const queue: Array = [...nodes]; const [root] = targetId.split("."); while (queue.length > 0) { diff --git a/airflow-core/src/airflow/utils/task_group.py b/airflow-core/src/airflow/utils/task_group.py index fa7396b85e800..bcae2baa88d6d 100644 --- a/airflow-core/src/airflow/utils/task_group.py +++ b/airflow-core/src/airflow/utils/task_group.py @@ -94,3 +94,41 @@ def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False): "children": children, "type": "task", } + + +def task_group_to_dict_grid(task_item_or_group, parent_group_is_mapped=False): + """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" + from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator + from airflow.sdk.definitions.mappedoperator import MappedOperator + + if isinstance(task := task_item_or_group, AbstractOperator): + is_mapped = None + if isinstance(task, MappedOperator) or parent_group_is_mapped: + is_mapped = True + setup_teardown_type = None + if task.is_setup is True: + setup_teardown_type = "setup" + elif task.is_teardown is True: + setup_teardown_type = "teardown" + return { + "id": task.task_id, + "label": task.label, + "is_mapped": is_mapped, + "children": None, + "setup_teardown_type": setup_teardown_type, + } + + task_group = task_item_or_group + task_group_sort = get_task_group_children_getter() + is_mapped_group = isinstance(task_group, MappedTaskGroup) + children = [ + task_group_to_dict_grid(x, parent_group_is_mapped=parent_group_is_mapped or is_mapped_group) + for x in task_group_sort(task_group) + ] + + return { + "id": task_group.group_id, + "label": task_group.label, + "is_mapped": is_mapped_group or None, + "children": children or None, + } diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py index 181d64b3a1716..908ebb24bebf4 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py @@ -42,6 +42,7 @@ DAG_ID = "test_dag" DAG_ID_2 = "test_dag_2" DAG_ID_3 = "test_dag_3" +DAG_ID_4 = "test_dag_4" TASK_ID = "task" TASK_ID_2 = "task2" TASK_ID_3 = "task3" @@ -59,11 +60,9 @@ "data_interval_start": "2024-11-29T00:00:00Z", "end_date": "2024-12-31T00:00:00Z", "logical_date": "2024-11-30T00:00:00Z", - "note": None, - "queued_at": None, + "run_after": "2024-11-30T00:00:00Z", "run_type": "scheduled", "start_date": "2016-01-01T00:00:00Z", - "run_after": "2024-11-30T00:00:00Z", "state": "success", "task_instances": [ { @@ -82,10 +81,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group", @@ -107,10 +102,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 2, "task_id": "task_group.inner_task_group", @@ -132,10 +123,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 5, "task_id": "task_group", @@ -157,10 +144,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "mapped_task_2", @@ -182,10 +165,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group.subtask", @@ -207,10 +186,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task", @@ -232,10 +207,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 2, "task_id": "task_group.inner_task_group.inner_task_group_sub_task", @@ -257,10 +228,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 4, "task_id": "task_group.mapped_task", @@ -275,8 +242,6 @@ "data_interval_start": "2024-11-29T00:00:00Z", "end_date": "2024-12-31T00:00:00Z", "logical_date": "2024-12-01T00:00:00Z", - "note": None, - "queued_at": None, "run_after": "2024-11-30T00:00:00Z", "run_type": "manual", "start_date": "2016-01-01T00:00:00Z", @@ -299,8 +264,6 @@ "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -323,11 +286,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, - "state": None, "task_count": 2, "task_id": "task_group.inner_task_group", "try_number": 0, @@ -348,11 +306,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, - "state": None, "task_count": 5, "task_id": "task_group", "try_number": 0, @@ -373,11 +326,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, - "state": None, "task_count": 1, "task_id": "mapped_task_2", "try_number": 0, @@ -399,8 +347,6 @@ "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -423,10 +369,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task", @@ -448,11 +390,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, - "state": None, "task_count": 2, "task_id": "task_group.inner_task_group.inner_task_group_sub_task", "try_number": 0, @@ -473,11 +410,6 @@ "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, - "state": None, "task_count": 4, "task_id": "task_group.mapped_task", "try_number": 0, @@ -485,103 +417,6 @@ ], } -STRUCTURE = { - "edges": [], - "nodes": [ - { - "asset_condition_type": None, - "children": [ - { - "asset_condition_type": None, - "children": None, - "id": "mapped_task_group.subtask", - "is_mapped": True, - "label": "subtask", - "operator": "MockOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], - "id": "mapped_task_group", - "is_mapped": True, - "label": "mapped_task_group", - "operator": None, - "setup_teardown_type": None, - "tooltip": "", - "type": "task", - }, - { - "asset_condition_type": None, - "children": None, - "id": "task", - "is_mapped": None, - "label": "task", - "operator": "EmptyOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - { - "asset_condition_type": None, - "children": [ - { - "asset_condition_type": None, - "children": [ - { - "asset_condition_type": None, - "children": None, - "id": "task_group.inner_task_group.inner_task_group_sub_task", - "is_mapped": True, - "label": "inner_task_group_sub_task", - "operator": "MockOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], - "id": "task_group.inner_task_group", - "is_mapped": False, - "label": "inner_task_group", - "operator": None, - "setup_teardown_type": None, - "tooltip": "", - "type": "task", - }, - { - "asset_condition_type": None, - "children": None, - "id": "task_group.mapped_task", - "is_mapped": True, - "label": "mapped_task", - "operator": "MockOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], - "id": "task_group", - "is_mapped": False, - "label": "task_group", - "operator": None, - "setup_teardown_type": None, - "tooltip": "", - "type": "task", - }, - { - "asset_condition_type": None, - "children": None, - "id": "mapped_task_2", - "is_mapped": True, - "label": "mapped_task_2", - "operator": "MockOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], -} - @pytest.fixture(autouse=True, scope="module") def examples_dag_bag(): @@ -692,6 +527,35 @@ def mapped_task_group(arg1): ti.state = TaskInstanceState.SUCCESS ti.end_date = None + # DAG 4 for testing removed task + with dag_maker(dag_id=DAG_ID_4, serialized=True, session=session) as dag_4: + t1 = EmptyOperator(task_id="t1") + t2 = EmptyOperator(task_id="t2") + with TaskGroup(group_id=f"{TASK_GROUP_ID}-1") as tg1: + with TaskGroup(group_id=f"{TASK_GROUP_ID}-2") as tg2: + EmptyOperator(task_id="t3") + EmptyOperator(task_id="t4") + EmptyOperator(task_id="t5") + t6 = EmptyOperator(task_id="t6") + tg2 >> t6 + t7 = EmptyOperator(task_id="t7") + t1 >> t2 >> tg1 >> t7 + + logical_date = timezone.datetime(2024, 11, 30) + data_interval = dag_4.timetable.infer_manual_data_interval(run_after=logical_date) + run_4 = dag_maker.create_dagrun( + run_id="run_4-1", + state=DagRunState.SUCCESS, + run_type=DagRunType.SCHEDULED, + start_date=logical_date, + logical_date=logical_date, + data_interval=data_interval, + **triggered_by_kwargs, + ) + for ti in run_4.task_instances: + ti.state = "success" + session.commit() + @pytest.fixture(autouse=True) def _clean(): @@ -714,7 +578,6 @@ def test_should_response_200(self, test_client): response = test_client.get(f"/grid/{DAG_ID}") assert response.status_code == 200 assert response.json() == { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1, GRID_RUN_2], } @@ -724,7 +587,6 @@ def test_should_response_200(self, test_client): ( "logical_date", { - "structure": STRUCTURE, "dag_runs": [ GRID_RUN_1, GRID_RUN_2, @@ -734,7 +596,6 @@ def test_should_response_200(self, test_client): ( "-logical_date", { - "structure": STRUCTURE, "dag_runs": [ GRID_RUN_2, GRID_RUN_1, @@ -744,7 +605,6 @@ def test_should_response_200(self, test_client): ( "run_after", { - "structure": STRUCTURE, "dag_runs": [ GRID_RUN_1, GRID_RUN_2, @@ -754,7 +614,6 @@ def test_should_response_200(self, test_client): ( "-run_after", { - "structure": STRUCTURE, "dag_runs": [ GRID_RUN_2, GRID_RUN_1, @@ -775,7 +634,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "true", "false", { - "structure": STRUCTURE, "dag_runs": [ { **GRID_RUN_1, @@ -796,10 +654,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group", @@ -821,10 +675,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group.subtask", @@ -852,8 +702,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -877,8 +725,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -894,7 +740,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "false", "true", { - "structure": STRUCTURE, "dag_runs": [ { **GRID_RUN_1, @@ -915,10 +760,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group", @@ -940,10 +781,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 3, "task_id": "mapped_task_group.subtask", @@ -971,8 +808,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -996,8 +831,6 @@ def test_should_response_200_order_by(self, test_client, order_by, expected): "upstream_failed": 0, }, "end_date": "2024-12-30T01:02:03Z", - "note": None, - "queued_dttm": None, "start_date": "2024-12-30T01:00:00Z", "state": "running", "task_count": 3, @@ -1031,14 +864,12 @@ def test_should_response_200_include_upstream_downstream( ( 1, { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1], }, ), ( 2, { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1, GRID_RUN_2], }, ), @@ -1058,7 +889,6 @@ def test_should_response_200_limit(self, test_client, limit, expected): "logical_date_lte": timezone.datetime(2024, 11, 30), }, { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1], }, ), @@ -1067,7 +897,7 @@ def test_should_response_200_limit(self, test_client, limit, expected): "logical_date_gte": timezone.datetime(2024, 10, 30), "logical_date_lte": timezone.datetime(2024, 10, 30), }, - {"dag_runs": [], "structure": STRUCTURE}, + {"dag_runs": []}, ), ( { @@ -1075,7 +905,6 @@ def test_should_response_200_limit(self, test_client, limit, expected): "run_after_lte": timezone.datetime(2024, 11, 30), }, { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1, GRID_RUN_2], }, ), @@ -1084,7 +913,7 @@ def test_should_response_200_limit(self, test_client, limit, expected): "run_after_gte": timezone.datetime(2024, 10, 30), "run_after_lte": timezone.datetime(2024, 10, 30), }, - {"dag_runs": [], "structure": STRUCTURE}, + {"dag_runs": []}, ), ], ) @@ -1102,14 +931,12 @@ def test_should_response_200_date_filters(self, test_client, params, expected): ( ["manual"], { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_2], }, ), ( ["scheduled"], { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1], }, ), @@ -1140,20 +967,18 @@ def test_should_response_200_run_types_invalid(self, test_client, run_type, expe ( ["success"], { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_1], }, ), ( ["failed"], { - "structure": STRUCTURE, "dag_runs": [GRID_RUN_2], }, ), ( ["running"], - {"dag_runs": [], "structure": STRUCTURE}, + {"dag_runs": []}, ), ], ) @@ -1194,22 +1019,6 @@ def test_should_response_200_without_dag_run(self, test_client): assert response.status_code == 200 assert response.json() == { "dag_runs": [], - "structure": { - "nodes": [ - { - "asset_condition_type": None, - "children": None, - "id": "task2", - "is_mapped": None, - "label": "task2", - "operator": "EmptyOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], - "edges": [], - }, } def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test_client): @@ -1225,68 +1034,15 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test response = test_client.get(f"/grid/{DAG_ID_3}") assert response.status_code == 200 assert response.json() == { - "structure": { - "edges": [], - "nodes": [ - { - "asset_condition_type": None, - "children": None, - "id": "task3", - "is_mapped": None, - "label": "task3", - "operator": "EmptyOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - { - "asset_condition_type": None, - "children": None, - "id": "task4", - "is_mapped": None, - "label": "task4", - "operator": "EmptyOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - { - "asset_condition_type": None, - "children": [ - { - "asset_condition_type": None, - "children": None, - "id": "task_group.inner_task", - "is_mapped": None, - "label": "inner_task", - "operator": "EmptyOperator", - "setup_teardown_type": None, - "tooltip": None, - "type": "task", - }, - ], - "id": "task_group", - "is_mapped": False, - "label": "task_group", - "operator": None, - "setup_teardown_type": None, - "tooltip": "", - "type": "task", - }, - ], - }, "dag_runs": [ { "dag_run_id": "run_3", "data_interval_end": "2024-11-30T00:00:00Z", "data_interval_start": "2024-11-29T00:00:00Z", - "end_date": None, "logical_date": "2024-11-30T00:00:00Z", - "note": None, "queued_at": "2024-12-31T00:00:00Z", "run_after": "2024-11-30T00:00:00Z", "run_type": "scheduled", - "start_date": None, "state": "queued", "task_instances": [ { @@ -1305,10 +1061,6 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task_group", @@ -1330,10 +1082,6 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task3", @@ -1355,10 +1103,6 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task_group.inner_task", @@ -1372,8 +1116,6 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test "data_interval_start": "2024-11-29T00:00:00Z", "end_date": "2024-12-31T00:00:00Z", "logical_date": "2024-12-01T00:00:00Z", - "note": None, - "queued_at": None, "run_after": "2024-11-30T00:00:00Z", "run_type": "manual", "start_date": "2024-11-30T00:00:00Z", @@ -1395,10 +1137,6 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test "up_for_retry": 0, "upstream_failed": 0, }, - "end_date": None, - "note": None, - "queued_dttm": None, - "start_date": None, "state": "success", "task_count": 1, "task_id": "task3", @@ -1408,3 +1146,119 @@ def test_should_response_200_with_deleted_task_and_taskgroup(self, session, test }, ], } + + def test_get_dag_structure(self, session, test_client): + session.commit() + response = test_client.get(f"/grid/structure/{DAG_ID}?limit=5") + assert response.status_code == 200 + assert response.json() == [ + { + "children": [{"id": "mapped_task_group.subtask", "is_mapped": True, "label": "subtask"}], + "id": "mapped_task_group", + "is_mapped": True, + "label": "mapped_task_group", + }, + {"id": "task", "label": "task"}, + { + "children": [ + { + "children": [ + { + "id": "task_group.inner_task_group.inner_task_group_sub_task", + "is_mapped": True, + "label": "inner_task_group_sub_task", + } + ], + "id": "task_group.inner_task_group", + "label": "inner_task_group", + }, + {"id": "task_group.mapped_task", "is_mapped": True, "label": "mapped_task"}, + ], + "id": "task_group", + "label": "task_group", + }, + {"id": "mapped_task_2", "is_mapped": True, "label": "mapped_task_2"}, + ] + + def test_get_grid_runs(self, session, test_client): + session.commit() + response = test_client.get(f"/grid/runs/{DAG_ID}?limit=5") + assert response.status_code == 200 + assert response.json() == [ + { + "dag_id": "test_dag", + "duration": 0, + "end_date": "2024-12-31T00:00:00Z", + "run_after": "2024-11-30T00:00:00Z", + "run_id": "run_1", + "run_type": "scheduled", + "start_date": "2016-01-01T00:00:00Z", + "state": "success", + }, + { + "dag_id": "test_dag", + "duration": 0, + "end_date": "2024-12-31T00:00:00Z", + "run_after": "2024-11-30T00:00:00Z", + "run_id": "run_2", + "run_type": "manual", + "start_date": "2016-01-01T00:00:00Z", + "state": "failed", + }, + ] + + def test_grid_ti_summaries_group(self, session, test_client): + run_id = "run_4-1" + session.commit() + response = test_client.get(f"/grid/ti_summaries/{DAG_ID_4}/{run_id}") + assert response.status_code == 200 + actual = response.json() + expected = { + "dag_id": "test_dag_4", + "run_id": "run_4-1", + "task_instances": [ + {"state": "success", "task_id": "t1"}, + {"state": "success", "task_id": "t2"}, + {"state": "success", "task_id": "t7"}, + {"state": "success", "task_id": "task_group-1"}, + {"state": "success", "task_id": "task_group-1.t6"}, + {"state": "success", "task_id": "task_group-1.task_group-2"}, + {"state": "success", "task_id": "task_group-1.task_group-2.t3"}, + {"state": "success", "task_id": "task_group-1.task_group-2.t4"}, + {"state": "success", "task_id": "task_group-1.task_group-2.t5"}, + ], + } + for obj in actual, expected: + tis = obj["task_instances"] + tis[:] = sorted(tis, key=lambda x: x["task_id"]) + assert actual == expected + + def test_grid_ti_summaries_mapped(self, session, test_client): + run_id = "run_2" + session.commit() + response = test_client.get(f"/grid/ti_summaries/{DAG_ID}/{run_id}") + assert response.status_code == 200 + data = response.json() + actual = data["task_instances"] + + def sort_dict(in_dict): + in_dict = sorted(in_dict, key=lambda x: x["task_id"]) + out = [] + for d in in_dict: + n = {k: d[k] for k in sorted(d, reverse=True)} + out.append(n) + return out + + expected = [ + {"task_id": "mapped_task_group", "state": "running"}, + {"task_id": "task_group.inner_task_group"}, + {"task_id": "task_group"}, + {"task_id": "mapped_task_2"}, + {"task_id": "mapped_task_group.subtask", "state": "running"}, + {"task_id": "task", "state": "success"}, + {"task_id": "task_group.inner_task_group.inner_task_group_sub_task"}, + {"task_id": "task_group.mapped_task"}, + ] + expected = sort_dict(expected) + actual = sort_dict(actual) + assert actual == expected