Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,28 @@ paths:
schema:
type: string
title: Dag Id
- name: include_upstream
in: query
required: false
schema:
type: boolean
default: false
title: Include Upstream
- name: include_downstream
in: query
required: false
schema:
type: boolean
default: false
title: Include Downstream
- name: root
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Root
- name: offset
in: query
required: false
Expand Down
49 changes: 18 additions & 31 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
QueryDagRunRunTypesFilter,
QueryDagRunStateFilter,
QueryDagRunTriggeringUserSearch,
QueryIncludeDownstream,
QueryIncludeUpstream,
QueryLimit,
QueryOffset,
RangeFilter,
Expand Down Expand Up @@ -133,11 +135,20 @@ def get_dag_structure(
run_type: QueryDagRunRunTypesFilter,
state: QueryDagRunStateFilter,
triggering_user: QueryDagRunTriggeringUserSearch,
include_upstream: QueryIncludeUpstream = False,
include_downstream: QueryIncludeDownstream = False,
root: str | None = None,
) -> list[GridNodeResponse]:
"""Return dag structure for grid view."""
latest_serdag = _get_latest_serdag(dag_id, session)
latest_dag = latest_serdag.dag

# Apply filtering if root task is specified
if root:
latest_dag = latest_dag.partial_subset(
task_ids=root, include_upstream=include_upstream, include_downstream=include_downstream
)

# Retrieve, sort the previous DAG Runs
base_query = select(DagRun.id).where(DagRun.dag_id == dag_id)
# This comparison is to fall back to DAG timetable when no order_by is provided
Expand Down Expand Up @@ -181,41 +192,17 @@ def get_dag_structure(
dags = [latest_dag]
for serdag in serdags:
if serdag:
dags.append(serdag.dag)
filtered_dag = serdag.dag
# Apply the same filtering to historical DAG versions
if root:
filtered_dag = filtered_dag.partial_subset(
task_ids=root, include_upstream=include_upstream, include_downstream=include_downstream
)
dags.append(filtered_dag)
for dag in dags:
nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)]
_merge_node_dicts(merged_nodes, nodes)

# Ensure historical tasks (e.g. removed) that exist in TIs for the selected runs are represented
def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
ids: set[str] = set()
for n in nodes:
nid = n.get("id")
if nid:
ids.add(nid)
children = n.get("children")
if children:
ids |= _collect_ids(children) # recurse
return ids

existing_ids = _collect_ids(merged_nodes)
historical_tasks = session.execute(
select(TaskInstance.task_id, TaskInstance.task_display_name)
.join(TaskInstance.dag_run)
.where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
.distinct()
)
for task_id, task_display_name in historical_tasks:
if task_id not in existing_ids:
merged_nodes.append(
{
"id": task_id,
"label": task_display_name,
"is_mapped": None,
"children": None,
}
)

return [GridNodeResponse(**n) for n in merged_nodes]


Expand Down
7 changes: 5 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,19 +808,22 @@ export const UseStructureServiceStructureDataKeyFn = ({ dagId, externalDependenc
export type GridServiceGetDagStructureDefaultResponse = Awaited<ReturnType<typeof GridService.getDagStructure>>;
export type GridServiceGetDagStructureQueryResult<TData = GridServiceGetDagStructureDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useGridServiceGetDagStructureKey = "GridServiceGetDagStructure";
export const UseGridServiceGetDagStructureKeyFn = ({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
export const UseGridServiceGetDagStructureKeyFn = ({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
orderBy?: string[];
root?: string;
runAfterGt?: string;
runAfterGte?: string;
runAfterLt?: string;
runAfterLte?: string;
runType?: string[];
state?: string[];
triggeringUser?: string;
}, queryKey?: Array<unknown>) => [useGridServiceGetDagStructureKey, ...(queryKey ?? [{ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }])];
}, queryKey?: Array<unknown>) => [useGridServiceGetDagStructureKey, ...(queryKey ?? [{ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }])];
export type GridServiceGetGridRunsDefaultResponse = Awaited<ReturnType<typeof GridService.getGridRuns>>;
export type GridServiceGetGridRunsQueryResult<TData = GridServiceGetGridRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useGridServiceGetGridRunsKey = "GridServiceGetGridRuns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,9 @@ export const ensureUseStructureServiceStructureDataData = (queryClient: QueryCli
* Return dag structure for grid view.
* @param data The data for the request.
* @param data.dagId
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.root
* @param data.offset
* @param data.limit
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
Expand All @@ -1545,19 +1548,22 @@ export const ensureUseStructureServiceStructureDataData = (queryClient: QueryCli
* @returns GridNodeResponse Successful Response
* @throws ApiError
*/
export const ensureUseGridServiceGetDagStructureData = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
export const ensureUseGridServiceGetDagStructureData = (queryClient: QueryClient, { dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
orderBy?: string[];
root?: string;
runAfterGt?: string;
runAfterGte?: string;
runAfterLt?: string;
runAfterLte?: string;
runType?: string[];
state?: string[];
triggeringUser?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) });
}) => queryClient.ensureQueryData({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }), queryFn: () => GridService.getDagStructure({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) });
/**
* Get Grid Runs
* Get info about a run for the grid.
Expand Down
10 changes: 8 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,9 @@ export const prefetchUseStructureServiceStructureData = (queryClient: QueryClien
* Return dag structure for grid view.
* @param data The data for the request.
* @param data.dagId
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.root
* @param data.offset
* @param data.limit
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
Expand All @@ -1545,19 +1548,22 @@ export const prefetchUseStructureServiceStructureData = (queryClient: QueryClien
* @returns GridNodeResponse Successful Response
* @throws ApiError
*/
export const prefetchUseGridServiceGetDagStructure = (queryClient: QueryClient, { dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
export const prefetchUseGridServiceGetDagStructure = (queryClient: QueryClient, { dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
orderBy?: string[];
root?: string;
runAfterGt?: string;
runAfterGte?: string;
runAfterLt?: string;
runAfterLte?: string;
runType?: string[];
state?: string[];
triggeringUser?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) });
}) => queryClient.prefetchQuery({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }), queryFn: () => GridService.getDagStructure({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) });
/**
* Get Grid Runs
* Get info about a run for the grid.
Expand Down
10 changes: 8 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,9 @@ export const useStructureServiceStructureData = <TData = Common.StructureService
* Return dag structure for grid view.
* @param data The data for the request.
* @param data.dagId
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.root
* @param data.offset
* @param data.limit
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
Expand All @@ -1545,19 +1548,22 @@ export const useStructureServiceStructureData = <TData = Common.StructureService
* @returns GridNodeResponse Successful Response
* @throws ApiError
*/
export const useGridServiceGetDagStructure = <TData = Common.GridServiceGetDagStructureDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
export const useGridServiceGetDagStructure = <TData = Common.GridServiceGetDagStructureDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
orderBy?: string[];
root?: string;
runAfterGt?: string;
runAfterGte?: string;
runAfterLt?: string;
runAfterLte?: string;
runType?: string[];
state?: string[];
triggeringUser?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) as TData, ...options });
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) as TData, ...options });
/**
* Get Grid Runs
* Get info about a run for the grid.
Expand Down
10 changes: 8 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,9 @@ export const useStructureServiceStructureDataSuspense = <TData = Common.Structur
* Return dag structure for grid view.
* @param data The data for the request.
* @param data.dagId
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.root
* @param data.offset
* @param data.limit
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
Expand All @@ -1545,19 +1548,22 @@ export const useStructureServiceStructureDataSuspense = <TData = Common.Structur
* @returns GridNodeResponse Successful Response
* @throws ApiError
*/
export const useGridServiceGetDagStructureSuspense = <TData = Common.GridServiceGetDagStructureDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
export const useGridServiceGetDagStructureSuspense = <TData = Common.GridServiceGetDagStructureDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }: {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
orderBy?: string[];
root?: string;
runAfterGt?: string;
runAfterGte?: string;
runAfterLt?: string;
runAfterLte?: string;
runType?: string[];
state?: string[];
triggeringUser?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, limit, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) as TData, ...options });
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseGridServiceGetDagStructureKeyFn({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }, queryKey), queryFn: () => GridService.getDagStructure({ dagId, includeDownstream, includeUpstream, limit, offset, orderBy, root, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runType, state, triggeringUser }) as TData, ...options });
/**
* Get Grid Runs
* Get info about a run for the grid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3883,6 +3883,9 @@ export class GridService {
* Return dag structure for grid view.
* @param data The data for the request.
* @param data.dagId
* @param data.includeUpstream
* @param data.includeDownstream
* @param data.root
* @param data.offset
* @param data.limit
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
Expand All @@ -3904,6 +3907,9 @@ export class GridService {
dag_id: data.dagId
},
query: {
include_upstream: data.includeUpstream,
include_downstream: data.includeDownstream,
root: data.root,
offset: data.offset,
limit: data.limit,
order_by: data.orderBy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3398,12 +3398,15 @@ export type StructureDataResponse2 = StructureDataResponse;

export type GetDagStructureData = {
dagId: string;
includeDownstream?: boolean;
includeUpstream?: boolean;
limit?: number;
offset?: number;
/**
* Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `run_after, logical_date, start_date, end_date`
*/
orderBy?: Array<(string)>;
root?: string | null;
runAfterGt?: string | null;
runAfterGte?: string | null;
runAfterLt?: string | null;
Expand Down
Loading
Loading