From fe23fe08435f3670558056a0ccd239d300d561f1 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Fri, 1 Dec 2023 06:29:44 +0100 Subject: [PATCH] #11626 & #14131 - Lineage with other Entities & attr-based xlets (#14191) * Add OMEntity model * Test OMEntity * Update repr * Fix __str__ * Add entity ref map * Test serializer for backend * Fix tests * Fix serializer * Test runner * Add runner tests * Update docs * Format --- docker/development/docker-compose.yml | 7 + .../airflow/dags/airflow_lineage_example.py | 21 +- ingestion/setup.py | 7 +- .../lineage/backend.py | 7 +- .../lineage/runner.py | 61 +++-- .../source/pipeline/airflow/lineage_parser.py | 229 ++++++++++++++-- .../source/pipeline/airflow/metadata.py | 44 ++- ingestion/src/metadata/utils/constants.py | 46 ++++ ingestion/src/metadata/utils/logger.py | 4 +- .../tests/integration/airflow/__init__.py | 0 .../airflow/test_lineage_runner.py | 218 +++++++++++++++ .../tests/unit/airflow/test_lineage_parser.py | 252 ++++++++++++++++-- .../pipeline/airflow/configuring-lineage.md | 142 +++++++--- .../pipeline/airflow/lineage-backend.md | 22 +- 14 files changed, 928 insertions(+), 132 deletions(-) create mode 100644 ingestion/tests/integration/airflow/__init__.py create mode 100644 ingestion/tests/integration/airflow/test_lineage_runner.py diff --git a/docker/development/docker-compose.yml b/docker/development/docker-compose.yml index 7be9052bacde..5107c6ab9c8b 100644 --- a/docker/development/docker-compose.yml +++ b/docker/development/docker-compose.yml @@ -358,6 +358,13 @@ services: DB_SCHEME: ${AIRFLOW_DB_SCHEME:-mysql+pymysql} DB_USER: ${AIRFLOW_DB_USER:-airflow_user} DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass} + + # To test the lineage backend + # AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend + # AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflow + # AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://openmetadata-server:8585/api + # AIRFLOW__LINEAGE__JWT_TOKEN: ... + entrypoint: /bin/bash command: - "/opt/airflow/ingestion_dependency.sh" diff --git a/ingestion/examples/airflow/dags/airflow_lineage_example.py b/ingestion/examples/airflow/dags/airflow_lineage_example.py index eb7caf5c9dff..a6f9e78b285c 100644 --- a/ingestion/examples/airflow/dags/airflow_lineage_example.py +++ b/ingestion/examples/airflow/dags/airflow_lineage_example.py @@ -24,6 +24,10 @@ from airflow.decorators import dag, task from airflow.utils.dates import days_ago +from metadata.generated.schema.entity.data.container import Container +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity + default_args = { "owner": "openmetadata_airflow_example", "depends_on_past": False, @@ -45,7 +49,6 @@ def openmetadata_airflow_lineage_example(): inlets={ "tables": [ "sample_data.ecommerce_db.shopify.raw_order", - "sample_data.ecommerce_db.shopify.raw_customer", ], }, outlets={"tables": ["sample_data.ecommerce_db.shopify.fact_order"]}, @@ -53,7 +56,23 @@ def openmetadata_airflow_lineage_example(): def generate_data(): pass + @task( + inlets=[ + OMEntity(entity=Container, fqn="s3_storage_sample.transactions", key="test") + ], + outlets=[ + OMEntity( + entity=Table, + fqn="sample_data.ecommerce_db.shopify.raw_order", + key="test", + ) + ], + ) + def generate_data2(): + pass + generate_data() + generate_data2() openmetadata_airflow_lineage_example_dag = openmetadata_airflow_lineage_example() diff --git a/ingestion/setup.py b/ingestion/setup.py index c4318ea776e5..9bfe232fa280 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -114,14 +114,17 @@ "sqlalchemy>=1.4.0,<2", "collate-sqllineage>=1.0.4", "tabulate==0.9.0", - "typing_extensions<=4.5.0", # We need to have this fixed due to a yanked release 4.6.0 + "typing_extensions>=4.8.0", "typing-inspect", "wheel~=0.38.4", } plugins: Dict[str, Set[str]] = { - "airflow": {VERSIONS["airflow"]}, # Same as ingestion container. For development. + "airflow": { + VERSIONS["airflow"], + "attrs", + }, # Same as ingestion container. For development. "amundsen": {VERSIONS["neo4j"]}, "athena": {"pyathena==3.0.8"}, "atlas": {}, diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/backend.py b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py index af770d753b58..2124af6b3fbb 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/backend.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py @@ -64,14 +64,17 @@ def send_lineage( """ try: + dag = context["dag"] + dag.log.info("Executing OpenMetadata Lineage Backend...") + config: AirflowLineageConfig = get_lineage_config() + xlet_list: List[XLets] = get_xlets_from_dag(dag) metadata = OpenMetadata(config.metadata_config) - xlet_list: List[XLets] = get_xlets_from_dag(context["dag"]) runner = AirflowLineageRunner( metadata=metadata, service_name=config.airflow_service_name, - dag=context["dag"], + dag=dag, xlets=xlet_list, only_keep_dag_lineage=config.only_keep_dag_lineage, max_status=config.max_status, diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py index 1515fab02c28..7fc2ab85817a 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py @@ -47,6 +47,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.pipeline.airflow.lineage_parser import XLets +from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP from metadata.utils.helpers import clean_uri, datetime_to_ts @@ -251,37 +252,47 @@ def add_lineage(self, pipeline: Pipeline, xlets: XLets) -> None: """ lineage_details = LineageDetails( - pipeline=EntityReference(id=pipeline.id, type="pipeline") + pipeline=EntityReference( + id=pipeline.id, type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__] + ) ) - for from_fqn in xlets.inlets or []: + for from_xlet in xlets.inlets or []: from_entity: Optional[Table] = self.metadata.get_by_name( - entity=Table, fqn=from_fqn + entity=from_xlet.entity, fqn=from_xlet.fqn ) if from_entity: - for to_fqn in xlets.outlets or []: + for to_xlet in xlets.outlets or []: to_entity: Optional[Table] = self.metadata.get_by_name( - entity=Table, fqn=to_fqn + entity=to_xlet.entity, fqn=to_xlet.fqn ) if to_entity: lineage = AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference( - id=from_entity.id, type="table" + id=from_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[ + from_xlet.entity.__name__ + ], + ), + toEntity=EntityReference( + id=to_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[ + to_xlet.entity.__name__ + ], ), - toEntity=EntityReference(id=to_entity.id, type="table"), lineageDetails=lineage_details, ) ) self.metadata.add_lineage(lineage) else: self.dag.log.warning( - f"Could not find Table [{to_fqn}] from " + f"Could not find [{to_xlet.entity.__name__}] [{to_xlet.fqn}] from " f"[{pipeline.fullyQualifiedName.__root__}] outlets" ) else: self.dag.log.warning( - f"Could not find Table [{from_fqn}] from " + f"Could not find [{from_xlet.entity.__name__}] [{from_xlet.fqn}] from " f"[{pipeline.fullyQualifiedName.__root__}] inlets" ) @@ -305,7 +316,8 @@ def clean_lineage(self, pipeline: Pipeline, xlets: XLets): for node in lineage_data.get("nodes") or [] if node["id"] == upstream_edge["fromEntity"] and node["type"] == "table" - ) + ), + None, ) for upstream_edge in lineage_data.get("upstreamEdges") or [] ] @@ -316,26 +328,37 @@ def clean_lineage(self, pipeline: Pipeline, xlets: XLets): for node in lineage_data.get("nodes") or [] if node["id"] == downstream_edge["toEntity"] and node["type"] == "table" - ) + ), + None, ) for downstream_edge in lineage_data.get("downstreamEdges") or [] ] - for edge in upstream_edges: - if edge.fqn not in xlets.inlets: + for edge in upstream_edges or []: + if edge.fqn not in (inlet.fqn for inlet in xlets.inlets): self.dag.log.info(f"Removing upstream edge with {edge.fqn}") edge_to_remove = EntitiesEdge( - fromEntity=EntityReference(id=edge.id, type="table"), - toEntity=EntityReference(id=pipeline.id, type="pipeline"), + fromEntity=EntityReference( + id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__] + ), + toEntity=EntityReference( + id=pipeline.id, + type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__], + ), ) self.metadata.delete_lineage_edge(edge=edge_to_remove) - for edge in downstream_edges: - if edge.fqn not in xlets.outlets: + for edge in downstream_edges or []: + if edge.fqn not in (outlet.fqn for outlet in xlets.outlets): self.dag.log.info(f"Removing downstream edge with {edge.fqn}") edge_to_remove = EntitiesEdge( - fromEntity=EntityReference(id=pipeline.id, type="pipeline"), - toEntity=EntityReference(id=edge.id, type="table"), + fromEntity=EntityReference( + id=pipeline.id, + type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__], + ), + toEntity=EntityReference( + id=edge.id, type=ENTITY_REFERENCE_TYPE_MAP[Table.__name__] + ), ) self.metadata.delete_lineage_edge(edge=edge_to_remove) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py index 7f62bc24d3a6..8d34ffa38633 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/lineage_parser.py @@ -62,13 +62,23 @@ ] and we'll treat this as independent sets of lineage """ +import json import logging import traceback +from collections import defaultdict +from copy import deepcopy from enum import Enum -from typing import Dict, List, Optional, Set +from functools import singledispatch +from typing import Any, DefaultDict, Dict, List, Optional, Type +import attr from pydantic import BaseModel +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.ometa.models import T +from metadata.utils.deprecation import deprecated +from metadata.utils.importer import import_from_module + logger = logging.getLogger("airflow.task") @@ -85,41 +95,207 @@ class XLetsAttr(Enum): PRIVATE_OUTLETS = "_outlets" +@attr.s(auto_attribs=True, kw_only=True) +class OMEntity: + """ + Identifies one entity in OpenMetadata. + We use attr annotated object similar to https://github.com/apache/airflow/blob/main/airflow/lineage/entities.py + based on https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html + """ + + # Entity Type, such as Table, Container or Dashboard. + entity: Type[T] = attr.ib() + # Entity Fully Qualified Name, e.g., service.database.schema.table + fqn: str = attr.ib() + # We will use the key in case we need to group different lineages from the same DAG + key: str = "default" + + def __str__(self): + """Custom serialization""" + _dict = deepcopy(self.__dict__) + _dict["entity"] = f"{self.entity.__module__}.{self.entity.__name__}" + return json.dumps(_dict) + + def serialize(self) -> str: + """Custom serialization to be called in airflow internals""" + return str(self) + + class XLets(BaseModel): """ Group inlets and outlets from all tasks in a DAG """ - inlets: Set[str] - outlets: Set[str] + inlets: List[OMEntity] + outlets: List[OMEntity] + class Config: + arbitrary_types_allowed = True -def parse_xlets(xlet: List[dict]) -> Optional[Dict[str, List[str]]]: + +def concat_dict_values( + d1: DefaultDict[str, List[Any]], d2: Optional[Dict[str, List[Any]]] +) -> DefaultDict[str, List[Any]]: + """ + Update d1 based on d2 values concatenating their results. + """ + if d2: + for key, value in d2.items(): + d1[key] = d1[key] + value + + return d1 + + +def parse_xlets(xlet: List[Any]) -> Optional[Dict[str, List[OMEntity]]]: """ - Parse airflow xlets for V1 :param xlet: airflow v2 xlet dict :return: dictionary of xlet list or None - [{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']}, - '__type': 'dict'}] - + If our operators are like + ``` + BashOperator( + task_id="print_date", + bash_command="date", + inlets={"tables": ["A"]}, + ) + ``` + the inlets/outlets will still be processed in airflow as a `List`. + + Note that when picking them up from Serialized DAGs, the shape is: + ``` + [{'__var': {'tables': ['sample_data.ecommerce_db.shopify.fact_order']}, '__type': 'dict'}] + ``` + + If using Datasets, we get something like: + ``` + [Dataset(uri='s3://dataset-bucket/input.csv', extra=None)] + ``` + We need to figure out how we want to handle information coming in this format. """ # This branch is for lineage parser op - if isinstance(xlet, list) and len(xlet) and isinstance(xlet[0], dict): - xlet_dict = xlet[0] - # This is how the Serialized DAG is giving us the info from _inlets & _outlets - if isinstance(xlet_dict, dict) and xlet_dict.get("__var"): - xlet_dict = xlet_dict["__var"] - return { - key: value for key, value in xlet_dict.items() if isinstance(value, list) - } + if isinstance(xlet, list) and len(xlet): + _parsed_xlets = defaultdict(list) + for element in xlet: + parsed_element = _parse_xlets(element) or {} + + # Update our xlet dict based on each parsed element + # Since we can get a list of elements, concatenate the results from multiple xlets + _parsed_xlets = concat_dict_values(_parsed_xlets, parsed_element) + + return _parsed_xlets return None +@singledispatch +def _parse_xlets(xlet: Any) -> None: + """ + Default behavior to handle lineage. + + We can use this function to register further inlets/outlets + representations, e.g., https://github.com/open-metadata/OpenMetadata/issues/11626 + """ + logger.warning(f"Inlet/Outlet type {type(xlet)} is not supported.") + + +@_parse_xlets.register +@deprecated( + message="Please update your inlets/outlets to follow ", + release="1.4.0", +) +def dictionary_lineage_annotation(xlet: dict) -> Dict[str, List[OMEntity]]: + """ + Handle OM specific inlet/outlet information. E.g., + + ``` + BashOperator( + task_id="print_date", + bash_command="date", + inlets={ + "tables": ["A", "A"], + "more_tables": ["X", "Y"], + "this is a bit random": "foo", + }, + ) + ``` + """ + xlet_dict = xlet + # This is how the Serialized DAG is giving us the info from _inlets & _outlets + if isinstance(xlet_dict, dict) and xlet_dict.get("__var"): + xlet_dict = xlet_dict["__var"] + + return { + key: [ + # We will convert the old dict lineage method into Tables + OMEntity(entity=Table, fqn=fqn) + for fqn in set(value) # Remove duplicates + ] + for key, value in xlet_dict.items() + if isinstance(value, list) + } + + +@_parse_xlets.register +def _(xlet: OMEntity) -> Optional[Dict[str, List[OMEntity]]]: + """ + Handle OM specific inlet/outlet information. E.g., + + ``` + BashOperator( + task_id="sleep", + bash_command=SLEEP, + outlets=[OMEntity(entity=Table, fqn="B")], + ) + ``` + """ + return {xlet.key: [xlet]} + + +@_parse_xlets.register +def _(xlet: str) -> Optional[Dict[str, List[OMEntity]]]: + """ + Handle OM specific inlet/outlet information. E.g., + + ``` + BashOperator( + task_id="sleep", + bash_command=SLEEP, + outlets=[OMEntity(entity=Table, fqn="B")], + ) + ``` + + Once a DAG is serialized, the xlet info will be stored as: + ``` + ['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'] + ``` + based on our custom serialization logic. + + In this method, we need to revert this back to the actual instance of OMEntity. + Note that we need to properly validate that the string is following the constraints of: + - Being a JSON representation + - Following the structure of an OMEntity + + Otherwise, we could be having any other attr-based xlet native from Airflow. + """ + try: + body = json.loads(xlet) + om_entity = OMEntity( + entity=import_from_module(body.get("entity")), + fqn=body.get("fqn"), + key=body.get("key"), + ) + + return {om_entity.key: [om_entity]} + except Exception as exc: + logger.error( + f"We could not parse the inlet/outlet information from [{xlet}] due to [{exc}]" + ) + return None + + def get_xlets_from_operator( operator: "BaseOperator", xlet_mode: XLetsMode -) -> Optional[Dict[str, List[str]]]: +) -> Optional[Dict[str, List[OMEntity]]]: """ Given an Airflow DAG Task, obtain the tables set in inlets or outlets. @@ -166,25 +342,26 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]: Fill the inlets and outlets of the Pipeline by iterating over all its tasks """ - _inlets = {} - _outlets = {} + _inlets = defaultdict(list) + _outlets = defaultdict(list) # First, grab all the inlets and outlets from all tasks grouped by keys for task in dag.tasks: try: - _inlets.update( + _inlets = concat_dict_values( + _inlets, get_xlets_from_operator( operator=task, xlet_mode=XLetsMode.INLETS, - ) - or [] + ), ) - _outlets.update( + + _outlets = concat_dict_values( + _outlets, get_xlets_from_operator( operator=task, xlet_mode=XLetsMode.OUTLETS, - ) - or [] + ), ) except Exception as exc: @@ -197,7 +374,7 @@ def get_xlets_from_dag(dag: "DAG") -> List[XLets]: # We expect to have the same keys in both inlets and outlets dicts # We will then iterate over the inlet keys to build the list of XLets return [ - XLets(inlets=set(value), outlets=set(_outlets[key])) + XLets(inlets=value, outlets=_outlets[key]) for key, value in _inlets.items() if value and _outlets.get(key) ] diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index ea6452f16079..f5a060e9cf1a 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -30,7 +30,6 @@ Task, TaskStatus, ) -from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( AirflowConnection, ) @@ -45,7 +44,10 @@ from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.pipeline.airflow.lineage_parser import get_xlets_from_dag +from metadata.ingestion.source.pipeline.airflow.lineage_parser import ( + XLets, + get_xlets_from_dag, +) from metadata.ingestion.source.pipeline.airflow.models import ( AirflowDag, AirflowDagDetails, @@ -53,6 +55,7 @@ from metadata.ingestion.source.pipeline.airflow.utils import get_schedule_interval from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn +from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP from metadata.utils.helpers import clean_uri, datetime_to_ts from metadata.utils.logger import ingestion_logger @@ -264,7 +267,7 @@ def get_pipelines_list(self) -> Iterable[AirflowDagDetails]: SerializedDagModel.dag_id, json_data_column, SerializedDagModel.fileloc, - ).all(): + ).yield_per(100): try: data = serialized_dag[1]["dag"] dag = AirflowDagDetails( @@ -429,25 +432,40 @@ def yield_pipeline_lineage_details( return lineage_details = LineageDetails( - pipeline=EntityReference(id=pipeline_entity.id.__root__, type="pipeline"), + pipeline=EntityReference( + id=pipeline_entity.id.__root__, + type=ENTITY_REFERENCE_TYPE_MAP[Pipeline.__name__], + ), source=LineageSource.PipelineLineage, ) - xlets = get_xlets_from_dag(dag=pipeline_details) if pipeline_details else [] + xlets: List[XLets] = ( + get_xlets_from_dag(dag=pipeline_details) if pipeline_details else [] + ) for xlet in xlets: - for from_fqn in xlet.inlets or []: - from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) + for from_xlet in xlet.inlets or []: + from_entity = self.metadata.get_by_name( + entity=from_xlet.entity, fqn=from_xlet.fqn + ) if from_entity: - for to_fqn in xlet.outlets or []: - to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) + for to_xlet in xlet.outlets or []: + to_entity = self.metadata.get_by_name( + entity=to_xlet.entity, fqn=to_xlet.fqn + ) if to_entity: lineage = AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference( - id=from_entity.id, type="table" + id=from_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[ + from_xlet.entity.__name__ + ], ), toEntity=EntityReference( - id=to_entity.id, type="table" + id=to_entity.id, + type=ENTITY_REFERENCE_TYPE_MAP[ + to_xlet.entity.__name__ + ], ), lineageDetails=lineage_details, ) @@ -455,12 +473,12 @@ def yield_pipeline_lineage_details( yield Either(right=lineage) else: logger.warning( - f"Could not find Table [{to_fqn}] from " + f"Could not find [{to_xlet.entity.__name__}] [{to_xlet.fqn}] from " f"[{pipeline_entity.fullyQualifiedName.__root__}] outlets" ) else: logger.warning( - f"Could not find Table [{from_fqn}] from " + f"Could not find [{from_xlet.entity.__name__}] [{from_xlet.fqn}] from " f"[{pipeline_entity.fullyQualifiedName.__root__}] inlets" ) diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index 33b9ac70376c..12011e34b1ee 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -12,6 +12,26 @@ """ Define constants useful for the metadata ingestion """ +from metadata.generated.schema.entity.data.chart import Chart +from metadata.generated.schema.entity.data.container import Container +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.mlmodel import MlModel +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.searchIndex import SearchIndex +from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.dashboardService import DashboardService +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.metadataService import MetadataService +from metadata.generated.schema.entity.services.mlmodelService import MlModelService +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.services.searchService import SearchService +from metadata.generated.schema.entity.services.storageService import StorageService DOT = "_DOT_" TEN_MIN = 10 * 60 @@ -47,3 +67,29 @@ NO_ACCESS_TOKEN = "no_token" SAMPLE_DATA_DEFAULT_COUNT = 50 + +# Mainly used for lineage +ENTITY_REFERENCE_TYPE_MAP = { + # Service Entities + DatabaseService.__name__: "databaseService", + MessagingService.__name__: "messagingService", + DashboardService.__name__: "dashboardService", + PipelineService.__name__: "pipelineService", + StorageService.__name__: "storageService", + MlModelService.__name__: "mlmodelService", + MetadataService.__name__: "metadataService", + SearchService.__name__: "searchService", + # Data Asset Entities + Table.__name__: "table", + StoredProcedure.__name__: "storedProcedure", + Database.__name__: "database", + DatabaseSchema.__name__: "databaseSchema", + Dashboard.__name__: "dashboard", + DashboardDataModel.__name__: "dashboardDataModel", + Pipeline.__name__: "pipeline", + Chart.__name__: "chart", + Topic.__name__: "topic", + SearchIndex.__name__: "searchIndex", + MlModel.__name__: "mlmodel", + Container.__name__: "container", +} diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index d14832d0892b..281f846776c8 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -187,7 +187,9 @@ def log_ansi_encoded_string( @singledispatch def get_log_name(record: Entity) -> Optional[str]: try: - return f"{type(record).__name__} [{getattr(record, 'name', record.entity.name).__root__}]" + if hasattr(record, "name"): + return f"{type(record).__name__} [{getattr(record, 'name').__root__}]" + return f"{type(record).__name__} [{record.entity.name.__root__}]" except Exception: return str(record) diff --git a/ingestion/tests/integration/airflow/__init__.py b/ingestion/tests/integration/airflow/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/tests/integration/airflow/test_lineage_runner.py b/ingestion/tests/integration/airflow/test_lineage_runner.py new file mode 100644 index 000000000000..fbe1ecd547e9 --- /dev/null +++ b/ingestion/tests/integration/airflow/test_lineage_runner.py @@ -0,0 +1,218 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test lineage parser to get inlets and outlets information +""" +from datetime import datetime +from unittest import TestCase +from unittest.mock import patch + +from airflow import DAG +from airflow.operators.bash import BashOperator + +from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.connections.database.common.basicAuth import ( + BasicAuth, +) +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.airflow.lineage_parser import ( + OMEntity, + get_xlets_from_dag, +) + +SLEEP = "sleep 1" +PIPELINE_SERVICE_NAME = "test-lineage-runner" +DB_SERVICE_NAME = "test-service-lineage-runner" +OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + + +class TestAirflowLineageRuner(TestCase): + """ + Validate AirflowLineageRunner + """ + + server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig(jwtToken=OM_JWT), + ) + metadata = OpenMetadata(server_config) + + assert metadata.health_check() + + service = CreateDatabaseServiceRequest( + name=DB_SERVICE_NAME, + serviceType=DatabaseServiceType.Mysql, + connection=DatabaseConnection( + config=MysqlConnection( + username="username", + authType=BasicAuth(password="password"), + hostPort="http://localhost:1234", + ) + ), + ) + service_type = "databaseService" + + @classmethod + def setUpClass(cls) -> None: + """ + Prepare ingredients: Table Entity + DAG + """ + + service_entity = cls.metadata.create_or_update(data=cls.service) + + create_db = CreateDatabaseRequest( + name="test-db", + service=service_entity.fullyQualifiedName, + ) + + create_db_entity = cls.metadata.create_or_update(data=create_db) + + create_schema = CreateDatabaseSchemaRequest( + name="test-schema", + database=create_db_entity.fullyQualifiedName, + ) + + create_schema_entity = cls.metadata.create_or_update(data=create_schema) + + create_inlet = CreateTableRequest( + name="lineage-test-inlet", + databaseSchema=create_schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + create_inlet_2 = CreateTableRequest( + name="lineage-test-inlet2", + databaseSchema=create_schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + create_outlet = CreateTableRequest( + name="lineage-test-outlet", + databaseSchema=create_schema_entity.fullyQualifiedName, + columns=[Column(name="id", dataType=DataType.BIGINT)], + ) + + cls.table_inlet1: Table = cls.metadata.create_or_update(data=create_inlet) + cls.table_inlet2: Table = cls.metadata.create_or_update(data=create_inlet_2) + cls.table_outlet: Table = cls.metadata.create_or_update(data=create_outlet) + + @classmethod + def tearDownClass(cls) -> None: + """ + Clean up + """ + + service_id = str( + cls.metadata.get_by_name( + entity=DatabaseService, fqn=DB_SERVICE_NAME + ).id.__root__ + ) + + cls.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + # Service ID created from the Airflow Lineage Operator in the + # example DAG + pipeline_service_id = str( + cls.metadata.get_by_name( + entity=PipelineService, fqn=PIPELINE_SERVICE_NAME + ).id.__root__ + ) + + cls.metadata.delete( + entity=PipelineService, + entity_id=pipeline_service_id, + recursive=True, + hard_delete=True, + ) + + def test_lineage_runner(self): + + with DAG("test_runner", start_date=datetime(2021, 1, 1)) as dag: + BashOperator( + task_id="print_date", + bash_command="date", + inlets=[ + OMEntity( + entity=Table, + fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-inlet", + ), + OMEntity( + entity=Table, + fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-inlet2", + ), + ], + ) + + BashOperator( + task_id="sleep", + bash_command=SLEEP, + outlets=[ + OMEntity( + entity=Table, + fqn="test-service-lineage-runner.test-db.test-schema.lineage-test-outlet", + ) + ], + ) + + # skip the statuses since they require getting data from airflow's db + with patch.object( + AirflowLineageRunner, "add_all_pipeline_status", return_value=None + ): + runner = AirflowLineageRunner( + metadata=self.metadata, + service_name=PIPELINE_SERVICE_NAME, + dag=dag, + xlets=get_xlets_from_dag(dag), + only_keep_dag_lineage=True, + ) + + runner.execute() + + lineage_data = self.metadata.get_lineage_by_name( + entity=Table, + fqn=self.table_outlet.fullyQualifiedName.__root__, + up_depth=1, + down_depth=1, + ) + + upstream_ids = [edge["fromEntity"] for edge in lineage_data["upstreamEdges"]] + self.assertIn(str(self.table_inlet1.id.__root__), upstream_ids) + self.assertIn(str(self.table_inlet2.id.__root__), upstream_ids) diff --git a/ingestion/tests/unit/airflow/test_lineage_parser.py b/ingestion/tests/unit/airflow/test_lineage_parser.py index 9ae383241705..75e578bf6418 100644 --- a/ingestion/tests/unit/airflow/test_lineage_parser.py +++ b/ingestion/tests/unit/airflow/test_lineage_parser.py @@ -12,25 +12,68 @@ Test lineage parser to get inlets and outlets information """ from datetime import datetime +from typing import List, Set from unittest import TestCase from airflow import DAG from airflow.operators.bash import BashOperator +from airflow.serialization.serde import serialize +from metadata.generated.schema.entity.data.container import Container +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.table import Table from metadata.ingestion.source.pipeline.airflow.lineage_parser import ( + OMEntity, XLets, XLetsMode, + _parse_xlets, get_xlets_from_dag, get_xlets_from_operator, parse_xlets, ) +SLEEP = "sleep 1" + + +def xlet_fqns(xlet: XLets, xlet_mode: XLetsMode) -> Set[str]: + """Helper method to get a set of FQNs out of the xlet""" + return set(elem.fqn for elem in getattr(xlet, xlet_mode.value)) + class TestAirflowLineageParser(TestCase): """ Handle airflow lineage parser validations """ + def assertXLetsEquals(self, first: List[XLets], second: List[XLets]): + """ + Check that both XLet lists are the same + + Even if they are lists, we don't care about the order. + + Note that we cannot use sets since `OMEntity` is not hashable. + + For this test, we will assume that by having the same FQN, the + entity type will also be the same. + """ + self.assertEquals(len(first), len(second)) + + for xlet1 in first: + match = False + + first_inlets = xlet_fqns(xlet1, XLetsMode.INLETS) + first_outlets = xlet_fqns(xlet1, XLetsMode.OUTLETS) + + for xlet2 in second: + second_inlets = xlet_fqns(xlet2, XLetsMode.INLETS) + second_outlets = xlet_fqns(xlet2, XLetsMode.OUTLETS) + + if first_inlets == second_inlets and first_outlets == second_outlets: + match = True + break + + self.assertTrue(match) + def test_parse_xlets(self): """ Handle the shape validation of inlets and outlets, e.g., @@ -40,13 +83,19 @@ def test_parse_xlets(self): }], """ raw_xlet = [{"tables": ["A"], "more_tables": ["X"]}] - self.assertEqual(parse_xlets(raw_xlet), {"tables": ["A"], "more_tables": ["X"]}) + self.assertEqual( + parse_xlets(raw_xlet), + { + "tables": [OMEntity(entity=Table, fqn="A")], + "more_tables": [OMEntity(entity=Table, fqn="X")], + }, + ) raw_xlet_without_list = [{"tables": ["A"], "more_tables": "random"}] self.assertEqual( parse_xlets(raw_xlet_without_list), { - "tables": ["A"], + "tables": [OMEntity(entity=Table, fqn="A")], }, ) @@ -67,7 +116,7 @@ def test_get_xlets_from_operator(self): # But the outlets are parsed correctly self.assertEqual( get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS), - {"tables": ["A"]}, + {"tables": [OMEntity(entity=Table, fqn="A")]}, ) operator = BashOperator( @@ -78,20 +127,21 @@ def test_get_xlets_from_operator(self): self.assertEqual( get_xlets_from_operator(operator, xlet_mode=XLetsMode.INLETS), - {"tables": ["A"], "more_tables": ["X"]}, + { + "tables": [OMEntity(entity=Table, fqn="A")], + "more_tables": [OMEntity(entity=Table, fqn="X")], + }, ) self.assertIsNone( get_xlets_from_operator(operator, xlet_mode=XLetsMode.OUTLETS) ) - def test_get_xlets_from_dag(self): + def test_get_string_xlets_from_dag(self): """ Check that we can properly join the xlet information from all operators in the DAG """ - sleep_1 = "sleep 1" - with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag: BashOperator( task_id="print_date", @@ -101,12 +151,18 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command=sleep_1, + bash_command=SLEEP, outlets={"tables": ["B"]}, ) - self.assertEqual( - get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})] + self.assertXLetsEquals( + get_xlets_from_dag(dag), + [ + XLets( + inlets=[OMEntity(entity=Table, fqn="A")], + outlets=[OMEntity(entity=Table, fqn="B")], + ) + ], ) with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag: @@ -118,12 +174,18 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command=sleep_1, + bash_command=SLEEP, outlets={"tables": ["B"]}, ) - self.assertEqual( - get_xlets_from_dag(dag), [XLets(inlets={"A"}, outlets={"B"})] + self.assertXLetsEquals( + get_xlets_from_dag(dag), + [ + XLets( + inlets=[OMEntity(entity=Table, fqn="A")], + outlets=[OMEntity(entity=Table, fqn="B")], + ) + ], ) with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag: @@ -139,18 +201,27 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command=sleep_1, + bash_command=SLEEP, outlets={ "tables": ["B"], "more_tables": ["Z"], }, ) - self.assertEqual( + self.assertXLetsEquals( get_xlets_from_dag(dag), [ - XLets(inlets={"A"}, outlets={"B"}), - XLets(inlets={"X", "Y"}, outlets={"Z"}), + XLets( + inlets=[OMEntity(entity=Table, fqn="A")], + outlets=[OMEntity(entity=Table, fqn="B")], + ), + XLets( + inlets=[ + OMEntity(entity=Table, fqn="X"), + OMEntity(entity=Table, fqn="Y"), + ], + outlets=[OMEntity(entity=Table, fqn="Z")], + ), ], ) @@ -165,15 +236,156 @@ def test_get_xlets_from_dag(self): BashOperator( task_id="sleep", - bash_command=sleep_1, + bash_command=SLEEP, outlets={ "tables": ["B"], }, ) - self.assertEqual( + self.assertXLetsEquals( + get_xlets_from_dag(dag), + [ + XLets( + inlets=[ + OMEntity(entity=Table, fqn="A"), + OMEntity(entity=Table, fqn="B"), + ], + outlets=[OMEntity(entity=Table, fqn="B")], + ), + ], + ) + + def test_get_attrs_xlets_from_dag(self): + """ + Check that we can properly join the xlet information from + all operators in the DAG + """ + with DAG("test_dag", start_date=datetime(2021, 1, 1)) as dag: + BashOperator( + task_id="print_date", + bash_command="date", + inlets=[ + OMEntity(entity=Table, fqn="A"), + OMEntity(entity=Table, fqn="B"), + ], + ) + + BashOperator( + task_id="sleep", + bash_command=SLEEP, + outlets=[OMEntity(entity=Table, fqn="C")], + ) + + BashOperator( + task_id="sleep2", + bash_command=SLEEP, + outlets=[OMEntity(entity=Container, fqn="D")], + ) + + self.assertXLetsEquals( get_xlets_from_dag(dag), [ - XLets(inlets={"A", "B"}, outlets={"B"}), + XLets( + inlets=[ + OMEntity(entity=Table, fqn="A"), + OMEntity(entity=Table, fqn="B"), + ], + outlets=[ + OMEntity(entity=Table, fqn="C"), + OMEntity(entity=Container, fqn="D"), + ], + ) ], ) + + def test_om_entity_serializer(self): + """To ensure the serialized DAGs will have the right shape""" + om_entity = OMEntity( + entity=Table, + fqn="FQN", + key="test", + ) + self.assertEquals( + str(om_entity), + '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}', + ) + + om_entity = OMEntity( + entity=Container, + fqn="FQN", + key="test", + ) + self.assertEquals( + str(om_entity), + '{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}', + ) + + def test_str_deserializer(self): + """ + Once a DAG is serialized, the xlet info will be stored as: + ``` + ['{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}'] + ``` + based on our custom serialization logic. + + Validate the deserialization process. + """ + self.assertIsNone(_parse_xlets("random")) + + self.assertEquals( + _parse_xlets( + '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}' + ), + { + "test": [ + OMEntity( + entity=Table, + fqn="FQN", + key="test", + ) + ] + }, + ) + + self.assertEquals( + _parse_xlets( + '{"entity": "metadata.generated.schema.entity.data.container.Container", "fqn": "FQN", "key": "test"}' + ), + { + "test": [ + OMEntity( + entity=Container, + fqn="FQN", + key="test", + ) + ] + }, + ) + + self.assertEquals( + _parse_xlets( + '{"entity": "metadata.generated.schema.entity.data.dashboard.Dashboard", "fqn": "FQN", "key": "test"}' + ), + { + "test": [ + OMEntity( + entity=Dashboard, + fqn="FQN", + key="test", + ) + ] + }, + ) + + def test_airflow_serializer(self): + """It should be able to serialize our models""" + om_entity = OMEntity( + entity=Table, + fqn="FQN", + key="test", + ) + + self.assertEquals( + serialize(om_entity).get("__data__"), + '{"entity": "metadata.generated.schema.entity.data.table.Table", "fqn": "FQN", "key": "test"}', + ) diff --git a/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/configuring-lineage.md b/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/configuring-lineage.md index 4d8e4a68d756..b02001ee51ef 100644 --- a/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/configuring-lineage.md +++ b/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/configuring-lineage.md @@ -9,6 +9,24 @@ Regardless of the Airflow ingestion process you follow ([Workflow](/connectors/p [Lineage Backend](/connectors/pipeline/airflow/lineage-backend) or [Lineage Operator](/connectors/pipeline/airflow/lineage-operator)), OpenMetadata will try to extract the lineage information based on the tasks `inlets` and `outlets`. +What it's important to consider here is that when we are ingesting Airflow lineage, we are actually building a graph: + +``` +Table A (node) -> DAG (edge) -> Table B (node) +``` + +Where tables are nodes and DAGs (Pipelines) are considered edges. This means that the correct way of setting these +parameters is by making sure that we are informing both `inlets` and `outlets`, so that we have the nodes to build +the relationship. + +## Configuring Lineage + +{% note %} + +This lineage configuration method is available for OpenMetadata release 1.2.3 or higher. + +{% /note %} + Let's take a look at the following example: ```python @@ -18,6 +36,10 @@ from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.dates import days_ago +from metadata.generated.schema.entity.data.container import Container +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.source.pipeline.airflow.lineage_parser import OMEntity + default_args = { 'owner': 'airflow', @@ -31,9 +53,9 @@ default_args = { with DAG( - "test-multiple-inlet-keys", + "test-lineage", default_args=default_args, - description="An example DAG which runs a a task group lineage test", + description="An example DAG which runs a lineage test", start_date=days_ago(1), is_paused_upon_creation=False, catchup=False, @@ -42,53 +64,39 @@ with DAG( t0 = DummyOperator( task_id='task0', - inlets={ - "tables": ["Table A"], - "more_tables": ["Table X"] - } + inlets=[ + OMEntity(entity=Container, fqn="Container A", key="group_A"), + OMEntity(entity=Table, fqn="Table X", key="group_B"), + ] ) t1 = DummyOperator( task_id='task10', - outlets={ - "tables": ["Table B"], - "more_tables": ["Table Y"] - } + outlets=[ + OMEntity(entity=Table, fqn="Table B", key="group_A"), + OMEntity(entity=Table, fqn="Table Y", key="group_B"), + ] ) t0 >> t1 ``` -Note how we have two tasks: -- `t0`: Informing the `inlets`, with keys `tables` and `more_tables`. -- `t1`: Informing the `outlets` with keys `tables` and `more_tables`. - -{% note %} - -Make sure to add the table Fully Qualified Name (FQN), which is the unique name of the table in OpenMetadata. - -This name is composed as `serviceName.databaseName.schemaName.tableName`. +We are passing inlets and outlets as a list of the `OMEntity` class, that lets us specify: +1. The type of the asset we are using: Table, Container,... following our SDK +2. The FQN of the asset, which is the unique name of each asset in OpenMetadata, e.g., `serviceName.databaseName.schemaName.tableName`. +3. The key to group the lineage if needed. -{% /note %} - -What it's important to consider here is that when we are ingesting Airflow lineage, we are actually building a graph: - -``` -Table A (node) -> DAG (edge) -> Table B (node) -``` - -Where tables are nodes and DAGs (Pipelines) are considered edges. This means that the correct way of setting this -parameters is by making sure that we are informing both `inlets` and `outlets`, so that we have the nodes to build -the relationship. +This `OMEntity` class is defined following the example of Airflow's internal lineage +[models](https://github.com/apache/airflow/blob/main/airflow/lineage/entities.py). ## Keys We can inform the lineage dependencies among different groups of tables. In the example above, we are not building the -lineage from all inlets to all outlets, but rather grouping the tables by the dictionary key (`tables` and `more_tables`). +lineage from all inlets to all outlets, but rather grouping the tables by key (`group_A` and `group_B`). This means that after this lineage is processed, the relationship will be: ``` -Table A (node) -> DAG (edge) -> Table B (node) +Container A (node) -> DAG (edge) -> Table B (node) ``` and @@ -99,3 +107,73 @@ Table X (node) -> DAG (edge) -> Table Y (node) It does not matter in which task of the DAG these inlet/outlet information is specified. During the ingestion process we group all these details at the DAG level. + + +## Configuring Lineage between Tables + +{% note %} + +Note that this method only allows lineage between Tables. + +We will deprecate it in OpenMetadata 1.4 + +{% /note %} + +Let's take a look at the following example: + +```python +from datetime import timedelta + +from airflow import DAG +from airflow.operators.dummy import DummyOperator +from airflow.utils.dates import days_ago + + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(seconds=1), +} + + +with DAG( + "test-multiple-inlet-keys", + default_args=default_args, + description="An example DAG which runs a lineage test", + start_date=days_ago(1), + is_paused_upon_creation=False, + catchup=False, +) as dag: + + + t0 = DummyOperator( + task_id='task0', + inlets={ + "group_A": ["Table A"], + "group_B": ["Table X"] + } + ) + + t1 = DummyOperator( + task_id='task10', + outlets={ + "group_A": ["Table B"], + "group_B": ["Table Y"] + } + ) + + t0 >> t1 +``` + + +{% note %} + +Make sure to add the table Fully Qualified Name (FQN), which is the unique name of the table in OpenMetadata. + +This name is composed as `serviceName.databaseName.schemaName.tableName`. + +{% /note %} diff --git a/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/lineage-backend.md b/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/lineage-backend.md index 9b4de654d5f4..5f3cf21d323f 100644 --- a/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/lineage-backend.md +++ b/openmetadata-docs/content/v1.2.x/connectors/pipeline/airflow/lineage-backend.md @@ -32,20 +32,11 @@ distribution: pip3 install "openmetadata-ingestion==x.y.z" ``` -Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client -versions match. +Where `x.y.z` is the version of your OpenMetadata server, e.g., 1.2.2. **It is important that server and client +versions match**. ### Adding Lineage Config - - -If using OpenMetadata version 0.13.0 or lower, the import for the lineage backend is -`airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend`. - -For 0.13.1 or higher, the import has been renamed to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`. - - - After the installation, we need to update the Airflow configuration. This can be done following this example on `airflow.cfg`: @@ -81,11 +72,10 @@ max_status = 10 ``` - `only_keep_dag_lineage` will remove any table lineage not present in the inlets or outlets. This will ensure -that any lineage in OpenMetadata comes from your code. +that any lineage in OpenMetadata comes only from your code. - `max_status` controls the number of status to ingest in each run. By default, we'll pick the last 10. - In the following sections, we'll show how to adapt our pipelines to help us build the lineage information. ## Lineage Backend @@ -139,13 +129,13 @@ and downstream for outlets) between the Pipeline and Table Entities. It is important to get the naming right, as we will fetch the Table Entity by its FQN. If no information is specified in terms of lineage, we will just ingest the Pipeline Entity without adding further information. - +{% note %} While we are showing here how to parse the lineage using the Lineage Backend, the setup of `inlets` and `outlets` is supported as well through external metadata ingestion from Airflow, be it via the UI, CLI or directly running an extraction DAG from Airflow itself. - +{% /note %} ## Example @@ -246,7 +236,7 @@ backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBacke airflow_service_name = local_airflow openmetadata_api_endpoint = http://localhost:8585/api auth_provider_type = openmetadata -jwt_token = eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg +jwt_token = ... ``` After running the DAG, you should be able to see the following information in the ingested Pipeline: