Skip to content

Commit

Permalink
AIP-84 add external dependencies asset condition (#44877)
Browse files Browse the repository at this point in the history
* Structure endpoint add asset-condition

* Fix CI
  • Loading branch information
pierrejeambrun authored Dec 16, 2024
1 parent dbff6e3 commit 13a18c3
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 28 deletions.
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class EdgeResponse(BaseModel):
label: str | None = None
source_id: str
target_id: str
is_source_asset: bool | None = None


class NodeResponse(BaseModel):
Expand All @@ -41,6 +42,7 @@ class NodeResponse(BaseModel):
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "task", "asset-condition", "asset", "asset-alias", "dag", "sensor", "trigger"]
operator: str | None = None
asset_condition_type: Literal["or-gate", "and-gate"] | None = None


class StructureDataResponse(BaseModel):
Expand Down
13 changes: 13 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7734,6 +7734,11 @@ components:
target_id:
type: string
title: Target Id
is_source_asset:
anyOf:
- type: boolean
- type: 'null'
title: Is Source Asset
type: object
required:
- source_id
Expand Down Expand Up @@ -8079,6 +8084,14 @@ components:
- type: string
- type: 'null'
title: Operator
asset_condition_type:
anyOf:
- type: string
enum:
- or-gate
- and-gate
- type: 'null'
title: Asset Condition Type
type: object
required:
- id
Expand Down
32 changes: 23 additions & 9 deletions airflow/api_fastapi/core_api/routes/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.dag_edges import dag_edges
from airflow.utils.task_group import task_group_to_dict
Expand Down Expand Up @@ -71,17 +72,16 @@ def structure_data(

for dependency_dag_id, dependencies in SerializedDagModel.get_dag_dependencies().items():
for dependency in dependencies:
# Dependencies not related to `dag_id` are ignored
if dependency_dag_id != dag_id and dependency.target != dag_id:
continue

# Add nodes
nodes.append(
{
"id": dependency.node_id,
"label": dependency.dependency_id,
"type": dependency.dependency_type,
}
)
# upstream assets are handled by the `get_upstream_assets` function.
if dependency.target != dependency.dependency_type and dependency.dependency_type in [
"asset-alias",
"asset",
]:
continue

# Add edges
# start dependency
Expand All @@ -96,6 +96,20 @@ def structure_data(
) and exit_node_ref:
end_edges.append({"source_id": exit_node_ref["id"], "target_id": dependency.node_id})

data["edges"] = start_edges + edges + end_edges
# Add nodes
nodes.append(
{
"id": dependency.node_id,
"label": dependency.dependency_id,
"type": dependency.dependency_type,
}
)

upstream_asset_nodes, upstream_asset_edges = get_upstream_assets(
dag.timetable.asset_condition, entry_node_ref["id"]
)

data["nodes"] += upstream_asset_nodes
data["edges"] = upstream_asset_edges + start_edges + edges + end_edges

return StructureDataResponse(**data)
16 changes: 16 additions & 0 deletions airflow/api_fastapi/core_api/services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/api_fastapi/core_api/services/ui/__init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
94 changes: 94 additions & 0 deletions airflow/api_fastapi/core_api/services/ui/structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Private service for dag structure.
:meta private:
"""

from __future__ import annotations

from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, BaseAsset


def get_upstream_assets(
asset_condition: BaseAsset, entry_node_ref: str, level=0
) -> tuple[list[dict], list[dict]]:
edges: list[dict] = []
nodes: list[dict] = []
asset_condition_type: str | None = None

assets: list[Asset | AssetAlias] = []

nested_expression: AssetAll | AssetAny | None = None

if isinstance(asset_condition, AssetAny):
asset_condition_type = "or-gate"

elif isinstance(asset_condition, AssetAll):
asset_condition_type = "and-gate"

if hasattr(asset_condition, "objects"):
for obj in asset_condition.objects:
if isinstance(obj, (AssetAll, AssetAny)):
nested_expression = obj
elif isinstance(obj, (Asset, AssetAlias)):
assets.append(obj)
else:
raise TypeError(f"Unsupported type: {type(obj)}")

if asset_condition_type and assets:
asset_condition_id = f"{asset_condition_type}-{level}"
edges.append(
{
"source_id": asset_condition_id,
"target_id": entry_node_ref,
"is_source_asset": level == 0,
}
)
nodes.append(
{
"id": asset_condition_id,
"label": asset_condition_id,
"type": "asset-condition",
"asset_condition_type": asset_condition_type,
}
)

for asset in assets:
edges.append(
{
"source_id": asset.name,
"target_id": asset_condition_id,
}
)
nodes.append(
{
"id": asset.name,
"label": asset.name,
"type": "asset-alias" if isinstance(asset, AssetAlias) else "asset",
}
)

if nested_expression is not None:
n, e = get_upstream_assets(nested_expression, asset_condition_id, level=level + 1)

nodes = nodes + n
edges = edges + e

return nodes, edges
23 changes: 23 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,17 @@ export const $EdgeResponse = {
type: "string",
title: "Target Id",
},
is_source_asset: {
anyOf: [
{
type: "boolean",
},
{
type: "null",
},
],
title: "Is Source Asset",
},
},
type: "object",
required: ["source_id", "target_id"],
Expand Down Expand Up @@ -3208,6 +3219,18 @@ export const $NodeResponse = {
],
title: "Operator",
},
asset_condition_type: {
anyOf: [
{
type: "string",
enum: ["or-gate", "and-gate"],
},
{
type: "null",
},
],
title: "Asset Condition Type",
},
},
type: "object",
required: ["id", "label", "type"],
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ export type EdgeResponse = {
label?: string | null;
source_id: string;
target_id: string;
is_source_asset?: boolean | null;
};

/**
Expand Down Expand Up @@ -783,6 +784,7 @@ export type NodeResponse = {
| "sensor"
| "trigger";
operator?: string | null;
asset_condition_type?: "or-gate" | "and-gate" | null;
};

export type type =
Expand Down
Loading

0 comments on commit 13a18c3

Please sign in to comment.