Skip to content

Commit

Permalink
#11626 & #14131 - Lineage with other Entities & attr-based xlets (#14191
Browse files Browse the repository at this point in the history
)

* Add OMEntity model

* Test OMEntity

* Update repr

* Fix __str__

* Add entity ref map

* Test serializer for backend

* Fix tests

* Fix serializer

* Test runner

* Add runner tests

* Update docs

* Format
  • Loading branch information
pmbrull authored Dec 1, 2023
1 parent f57e429 commit 7fcdf08
Show file tree
Hide file tree
Showing 14 changed files with 928 additions and 132 deletions.
7 changes: 7 additions & 0 deletions docker/development/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ services:
DB_SCHEME: ${AIRFLOW_DB_SCHEME:-mysql+pymysql}
DB_USER: ${AIRFLOW_DB_USER:-airflow_user}
DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}

# To test the lineage backend
# AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
# AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflow
# AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://openmetadata-server:8585/api
# AIRFLOW__LINEAGE__JWT_TOKEN: ...

entrypoint: /bin/bash
command:
- "/opt/airflow/ingestion_dependency.sh"
Expand Down
21 changes: 20 additions & 1 deletion ingestion/examples/airflow/dags/airflow_lineage_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago

from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity

default_args = {
"owner": "openmetadata_airflow_example",
"depends_on_past": False,
Expand All @@ -45,15 +49,30 @@ def openmetadata_airflow_lineage_example():
inlets={
"tables": [
"sample_data.ecommerce_db.shopify.raw_order",
"sample_data.ecommerce_db.shopify.raw_customer",
],
},
outlets={"tables": ["sample_data.ecommerce_db.shopify.fact_order"]},
)
def generate_data():
pass

@task(
inlets=[
OMEntity(entity=Container, fqn="s3_storage_sample.transactions", key="test")
],
outlets=[
OMEntity(
entity=Table,
fqn="sample_data.ecommerce_db.shopify.raw_order",
key="test",
)
],
)
def generate_data2():
pass

generate_data()
generate_data2()


openmetadata_airflow_lineage_example_dag = openmetadata_airflow_lineage_example()
7 changes: 5 additions & 2 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,17 @@
"sqlalchemy>=1.4.0,<2",
"collate-sqllineage>=1.0.4",
"tabulate==0.9.0",
"typing_extensions<=4.5.0", # We need to have this fixed due to a yanked release 4.6.0
"typing_extensions>=4.8.0",
"typing-inspect",
"wheel~=0.38.4",
}


plugins: Dict[str, Set[str]] = {
"airflow": {VERSIONS["airflow"]}, # Same as ingestion container. For development.
"airflow": {
VERSIONS["airflow"],
"attrs",
}, # Same as ingestion container. For development.
"amundsen": {VERSIONS["neo4j"]},
"athena": {"pyathena==3.0.8"},
"atlas": {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ def send_lineage(
"""

try:
dag = context["dag"]
dag.log.info("Executing OpenMetadata Lineage Backend...")

config: AirflowLineageConfig = get_lineage_config()
xlet_list: List[XLets] = get_xlets_from_dag(dag)
metadata = OpenMetadata(config.metadata_config)
xlet_list: List[XLets] = get_xlets_from_dag(context["dag"])

runner = AirflowLineageRunner(
metadata=metadata,
service_name=config.airflow_service_name,
dag=context["dag"],
dag=dag,
xlets=xlet_list,
only_keep_dag_lineage=config.only_keep_dag_lineage,
max_status=config.max_status,
Expand Down
61 changes: 42 additions & 19 deletions ingestion/src/airflow_provider_openmetadata/lineage/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.helpers import clean_uri, datetime_to_ts


Expand Down Expand Up @@ -251,37 +252,47 @@ def add_lineage(self, pipeline: Pipeline, xlets: XLets) -> None:
"""

lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline.id, type="pipeline")
pipeline=EntityReference(
id=pipeline.id, type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__]
)
)

for from_fqn in xlets.inlets or []:
for from_xlet in xlets.inlets or []:
from_entity: Optional[Table] = self.metadata.get_by_name(
entity=Table, fqn=from_fqn
entity=from_xlet.entity, fqn=from_xlet.fqn
)
if from_entity:
for to_fqn in xlets.outlets or []:
for to_xlet in xlets.outlets or []:
to_entity: Optional[Table] = self.metadata.get_by_name(
entity=Table, fqn=to_fqn
entity=to_xlet.entity, fqn=to_xlet.fqn
)
if to_entity:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id, type="table"
id=from_entity.id,
type=ENTITY_REFERENCE_TYPE_MAP[
from_xlet.entity.__name__
],
),
toEntity=EntityReference(
id=to_entity.id,
type=ENTITY_REFERENCE_TYPE_MAP[
to_xlet.entity.__name__
],
),
toEntity=EntityReference(id=to_entity.id, type="table"),
lineageDetails=lineage_details,
)
)
self.metadata.add_lineage(lineage)
else:
self.dag.log.warning(
f"Could not find Table [{to_fqn}] from "
f"Could not find [{to_xlet.entity.__name__}] [{to_xlet.fqn}] from "
f"[{pipeline.fullyQualifiedName.__root__}] outlets"
)
else:
self.dag.log.warning(
f"Could not find Table [{from_fqn}] from "
f"Could not find [{from_xlet.entity.__name__}] [{from_xlet.fqn}] from "
f"[{pipeline.fullyQualifiedName.__root__}] inlets"
)

Expand All @@ -305,7 +316,8 @@ def clean_lineage(self, pipeline: Pipeline, xlets: XLets):
for node in lineage_data.get("nodes") or []
if node["id"] == upstream_edge["fromEntity"]
and node["type"] == "table"
)
),
None,
)
for upstream_edge in lineage_data.get("upstreamEdges") or []
]
Expand All @@ -316,26 +328,37 @@ def clean_lineage(self, pipeline: Pipeline, xlets: XLets):
for node in lineage_data.get("nodes") or []
if node["id"] == downstream_edge["toEntity"]
and node["type"] == "table"
)
),
None,
)
for downstream_edge in lineage_data.get("downstreamEdges") or []
]

for edge in upstream_edges:
if edge.fqn not in xlets.inlets:
for edge in upstream_edges or []:
if edge.fqn not in (inlet.fqn for inlet in xlets.inlets):
self.dag.log.info(f"Removing upstream edge with {edge.fqn}")
edge_to_remove = EntitiesEdge(
fromEntity=EntityReference(id=edge.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
fromEntity=EntityReference(
id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__]
),
toEntity=EntityReference(
id=pipeline.id,
type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__],
),
)
self.metadata.delete_lineage_edge(edge=edge_to_remove)

for edge in downstream_edges:
if edge.fqn not in xlets.outlets:
for edge in downstream_edges or []:
if edge.fqn not in (outlet.fqn for outlet in xlets.outlets):
self.dag.log.info(f"Removing downstream edge with {edge.fqn}")
edge_to_remove = EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=edge.id, type="table"),
fromEntity=EntityReference(
id=pipeline.id,
type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__],
),
toEntity=EntityReference(
id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__]
),
)
self.metadata.delete_lineage_edge(edge=edge_to_remove)

Expand Down
Loading

0 comments on commit 7fcdf08

Please sign in to comment.