Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,16 @@ paths:
- type: string
- type: 'null'
title: Node Id
- name: dependency_type
in: query
required: false
schema:
enum:
- scheduling
- data
type: string
default: scheduling
title: Dependency Type
responses:
'200':
description: Successful Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import annotations

from typing import Literal

from fastapi import Depends, status
from fastapi.exceptions import HTTPException

Expand All @@ -26,8 +28,11 @@
from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_dag
from airflow.api_fastapi.core_api.services.ui.dependencies import extract_single_connected_component
from airflow.models.serialized_dag import SerializedDagModel
from airflow.api_fastapi.core_api.services.ui.dependencies import (
extract_single_connected_component,
get_data_dependencies,
get_scheduling_dependencies,
)

dependencies_router = AirflowRouter(tags=["Dependencies"])

Expand All @@ -41,44 +46,27 @@
),
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.DEPENDENCIES))],
)
def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGraphResponse:
def get_dependencies(
session: SessionDep,
node_id: str | None = None,
dependency_type: Literal["scheduling", "data"] = "scheduling",
) -> BaseGraphResponse:
"""Dependencies graph."""
nodes_dict: dict[str, dict] = {}
edge_tuples: set[tuple[str, str]] = set()

for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
dag_node_id = f"dag:{dag}"
if dag_node_id not in nodes_dict:
for dep in dependencies:
# Add nodes
nodes_dict[dag_node_id] = {"id": dag_node_id, "label": dag, "type": "dag"}
if dep.node_id not in nodes_dict:
nodes_dict[dep.node_id] = {
"id": dep.node_id,
"label": dep.label,
"type": dep.dependency_type,
}
if dependency_type == "data":
if node_id is None or not node_id.startswith("asset:"):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "Data dependencies require an asset node_id (e.g., 'asset:123')"
)

# Add edges
# not start dep
if dep.source != dep.dependency_type:
source = dep.source if ":" in dep.source else f"dag:{dep.source}"
target = dep.node_id
edge_tuples.add((source, target))

# not end dep
if dep.target != dep.dependency_type:
source = dep.node_id
target = dep.target if ":" in dep.target else f"dag:{dep.target}"
edge_tuples.add((source, target))
try:
asset_id = int(node_id.replace("asset:", ""))
except ValueError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Invalid asset node_id: {node_id}")

nodes = list(nodes_dict.values())
edges = [{"source_id": source, "target_id": target} for source, target in sorted(edge_tuples)]
data = get_data_dependencies(asset_id, session)
return BaseGraphResponse(**data)

data = {
"nodes": nodes,
"edges": edges,
}
data = get_scheduling_dependencies()

if node_id is not None:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

from __future__ import annotations

from collections import defaultdict
from collections import defaultdict, deque
from typing import TYPE_CHECKING

from airflow.models.asset import AssetModel

if TYPE_CHECKING:
from sqlalchemy.orm import Session


def _dfs_connected_components(
Expand Down Expand Up @@ -76,3 +82,147 @@ def extract_single_connected_component(
]

return {"nodes": nodes, "edges": edges}


def get_scheduling_dependencies() -> dict[str, list[dict]]:
"""Get scheduling dependencies between DAGs."""
from airflow.models.serialized_dag import SerializedDagModel

nodes_dict: dict[str, dict] = {}
edge_tuples: set[tuple[str, str]] = set()

for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
dag_node_id = f"dag:{dag}"
if dag_node_id not in nodes_dict:
for dep in dependencies:
# Add nodes
nodes_dict[dag_node_id] = {"id": dag_node_id, "label": dag, "type": "dag"}
if dep.node_id not in nodes_dict:
nodes_dict[dep.node_id] = {
"id": dep.node_id,
"label": dep.label,
"type": dep.dependency_type,
}

# Add edges
# not start dep
if dep.source != dep.dependency_type:
source = dep.source if ":" in dep.source else f"dag:{dep.source}"
target = dep.node_id
edge_tuples.add((source, target))

# not end dep
if dep.target != dep.dependency_type:
source = dep.node_id
target = dep.target if ":" in dep.target else f"dag:{dep.target}"
edge_tuples.add((source, target))

return {
"nodes": list(nodes_dict.values()),
"edges": [{"source_id": source, "target_id": target} for source, target in sorted(edge_tuples)],
}


def get_data_dependencies(asset_id: int, session: Session) -> dict[str, list[dict]]:
"""Get full task dependencies for an asset."""
from sqlalchemy import select
from sqlalchemy.orm import selectinload

from airflow.models.asset import TaskInletAssetReference, TaskOutletAssetReference

SEPARATOR = "__SEPARATOR__"

nodes_dict: dict[str, dict] = {}
edge_set: set[tuple[str, str]] = set()

# BFS to trace full dependencies
assets_to_process: deque[int] = deque([asset_id])
processed_assets: set[int] = set()
processed_tasks: set[tuple[str, str]] = set() # (dag_id, task_id)

while assets_to_process:
current_asset_id = assets_to_process.popleft()
if current_asset_id in processed_assets:
continue
processed_assets.add(current_asset_id)

# Eagerload producing_tasks and consuming_tasks to avoid lazy queries
asset = session.scalar(
select(AssetModel)
.where(AssetModel.id == current_asset_id)
.options(
selectinload(AssetModel.producing_tasks),
selectinload(AssetModel.consuming_tasks),
)
)
if not asset:
continue

asset_node_id = f"asset:{current_asset_id}"

# Add asset node
if asset_node_id not in nodes_dict:
nodes_dict[asset_node_id] = {"id": asset_node_id, "label": asset.name, "type": "asset"}

# Process producing tasks (tasks that output this asset)
for ref in asset.producing_tasks:
task_key = (ref.dag_id, ref.task_id)
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"

# Add task node with dag_id.task_id label for disambiguation
if task_node_id not in nodes_dict:
nodes_dict[task_node_id] = {
"id": task_node_id,
"label": f"{ref.dag_id}.{ref.task_id}",
"type": "task",
}

# Add edge: task → asset
edge_set.add((task_node_id, asset_node_id))

# Find other assets this task consumes (inlets) to trace upstream
if task_key not in processed_tasks:
processed_tasks.add(task_key)
inlet_refs = session.scalars(
select(TaskInletAssetReference).where(
TaskInletAssetReference.dag_id == ref.dag_id,
TaskInletAssetReference.task_id == ref.task_id,
)
).all()
for inlet_ref in inlet_refs:
if inlet_ref.asset_id not in processed_assets:
assets_to_process.append(inlet_ref.asset_id)

# Process consuming tasks (tasks that input this asset)
for ref in asset.consuming_tasks:
task_key = (ref.dag_id, ref.task_id)
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"

# Add task node with dag_id.task_id label for disambiguation
if task_node_id not in nodes_dict:
nodes_dict[task_node_id] = {
"id": task_node_id,
"label": f"{ref.dag_id}.{ref.task_id}",
"type": "task",
}

# Add edge: asset → task
edge_set.add((asset_node_id, task_node_id))

# Find other assets this task produces (outlets) to trace downstream
if task_key not in processed_tasks:
processed_tasks.add(task_key)
outlet_refs = session.scalars(
select(TaskOutletAssetReference).where(
TaskOutletAssetReference.dag_id == ref.dag_id,
TaskOutletAssetReference.task_id == ref.task_id,
)
).all()
for outlet_ref in outlet_refs:
if outlet_ref.asset_id not in processed_assets:
assets_to_process.append(outlet_ref.asset_id)

return {
"nodes": list(nodes_dict.values()),
"edges": [{"source_id": source, "target_id": target} for source, target in edge_set],
}
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,10 @@ export const UseAuthLinksServiceGetCurrentUserInfoKeyFn = (queryKey?: Array<unkn
export type DependenciesServiceGetDependenciesDefaultResponse = Awaited<ReturnType<typeof DependenciesService.getDependencies>>;
export type DependenciesServiceGetDependenciesQueryResult<TData = DependenciesServiceGetDependenciesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDependenciesServiceGetDependenciesKey = "DependenciesServiceGetDependencies";
export const UseDependenciesServiceGetDependenciesKeyFn = ({ nodeId }: {
export const UseDependenciesServiceGetDependenciesKeyFn = ({ dependencyType, nodeId }: {
dependencyType?: "scheduling" | "data";
nodeId?: string;
} = {}, queryKey?: Array<unknown>) => [useDependenciesServiceGetDependenciesKey, ...(queryKey ?? [{ nodeId }])];
} = {}, queryKey?: Array<unknown>) => [useDependenciesServiceGetDependenciesKey, ...(queryKey ?? [{ dependencyType, nodeId }])];
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<ReturnType<typeof DashboardService.historicalMetrics>>;
export type DashboardServiceHistoricalMetricsQueryResult<TData = DashboardServiceHistoricalMetricsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDashboardServiceHistoricalMetricsKey = "DashboardServiceHistoricalMetrics";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1501,12 +1501,14 @@ export const ensureUseAuthLinksServiceGetCurrentUserInfoData = (queryClient: Que
* Dependencies graph.
* @param data The data for the request.
* @param data.nodeId
* @param data.dependencyType
* @returns BaseGraphResponse Successful Response
* @throws ApiError
*/
export const ensureUseDependenciesServiceGetDependenciesData = (queryClient: QueryClient, { nodeId }: {
export const ensureUseDependenciesServiceGetDependenciesData = (queryClient: QueryClient, { dependencyType, nodeId }: {
dependencyType?: "scheduling" | "data";
nodeId?: string;
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }), queryFn: () => DependenciesService.getDependencies({ nodeId }) });
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) });
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1501,12 +1501,14 @@ export const prefetchUseAuthLinksServiceGetCurrentUserInfo = (queryClient: Query
* Dependencies graph.
* @param data The data for the request.
* @param data.nodeId
* @param data.dependencyType
* @returns BaseGraphResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDependenciesServiceGetDependencies = (queryClient: QueryClient, { nodeId }: {
export const prefetchUseDependenciesServiceGetDependencies = (queryClient: QueryClient, { dependencyType, nodeId }: {
dependencyType?: "scheduling" | "data";
nodeId?: string;
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }), queryFn: () => DependenciesService.getDependencies({ nodeId }) });
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) });
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1501,12 +1501,14 @@ export const useAuthLinksServiceGetCurrentUserInfo = <TData = Common.AuthLinksSe
* Dependencies graph.
* @param data The data for the request.
* @param data.nodeId
* @param data.dependencyType
* @returns BaseGraphResponse Successful Response
* @throws ApiError
*/
export const useDependenciesServiceGetDependencies = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ nodeId }: {
export const useDependenciesServiceGetDependencies = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dependencyType, nodeId }: {
dependencyType?: "scheduling" | "data";
nodeId?: string;
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ nodeId }) as TData, ...options });
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) as TData, ...options });
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1501,12 +1501,14 @@ export const useAuthLinksServiceGetCurrentUserInfoSuspense = <TData = Common.Aut
* Dependencies graph.
* @param data The data for the request.
* @param data.nodeId
* @param data.dependencyType
* @returns BaseGraphResponse Successful Response
* @throws ApiError
*/
export const useDependenciesServiceGetDependenciesSuspense = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ nodeId }: {
export const useDependenciesServiceGetDependenciesSuspense = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dependencyType, nodeId }: {
dependencyType?: "scheduling" | "data";
nodeId?: string;
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ nodeId }) as TData, ...options });
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) as TData, ...options });
/**
* Historical Metrics
* Return cluster activity historical metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3852,6 +3852,7 @@ export class DependenciesService {
* Dependencies graph.
* @param data The data for the request.
* @param data.nodeId
* @param data.dependencyType
* @returns BaseGraphResponse Successful Response
* @throws ApiError
*/
Expand All @@ -3860,7 +3861,8 @@ export class DependenciesService {
method: 'GET',
url: '/ui/dependencies',
query: {
node_id: data.nodeId
node_id: data.nodeId,
dependency_type: data.dependencyType
},
errors: {
404: 'Not Found',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3425,6 +3425,7 @@ export type GetAuthMenusResponse = MenuItemCollectionResponse;
export type GetCurrentUserInfoResponse = AuthenticatedMeResponse;

export type GetDependenciesData = {
dependencyType?: 'scheduling' | 'data';
nodeId?: string | null;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@
"name": "Name",
"producingTasks": "Producing Tasks",
"scheduledDags": "Scheduled Dags",
"searchPlaceholder": "Search Assets"
"scheduling": "Scheduling",
"searchPlaceholder": "Search Assets",
"taskDependencies": "Task Dependencies"
}
Loading
Loading