diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 7cfca8c9f870..a7f097efc33e 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -22,13 +22,14 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Uuid from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT from metadata.ingestion.models.patch_request import build_patch from metadata.ingestion.ometa.client import REST, APIError -from metadata.ingestion.ometa.utils import get_entity_type, quote +from metadata.ingestion.ometa.utils import get_entity_type, model_str, quote from metadata.utils.logger import ometa_logger from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache @@ -248,7 +249,7 @@ def patch_lineage_edge( def get_lineage_by_id( self, entity: Union[Type[T], str], - entity_id: str, + entity_id: Union[str, Uuid], up_depth: int = 1, down_depth: int = 1, ) -> Optional[Dict[str, Any]]: @@ -260,13 +261,16 @@ def get_lineage_by_id( :param down_depth: Downstream depth of lineage (default=1, min=0, max=3) """ return self._get_lineage( - entity=entity, path=entity_id, up_depth=up_depth, down_depth=down_depth + entity=entity, + path=model_str(entity_id), + up_depth=up_depth, + down_depth=down_depth, ) def get_lineage_by_name( self, entity: Union[Type[T], str], - fqn: str, + fqn: Union[str, FullyQualifiedEntityName], up_depth: int = 1, down_depth: int = 1, ) -> Optional[Dict[str, Any]]: @@ -279,7 +283,7 @@ def get_lineage_by_name( """ return self._get_lineage( entity=entity, - path=f"name/{quote(fqn)}", + path=f"name/{quote(model_str(fqn))}", up_depth=up_depth, down_depth=down_depth, ) diff --git a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py index b661d6e2bad3..f083a50f6cc5 100644 --- a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py @@ -288,6 +288,30 @@ def test_create(self): len(res["downstreamEdges"][0]["lineageDetails"]["columnsLineage"]), 2 ) + # We can get lineage by ID + lineage_id = self.metadata.get_lineage_by_id( + entity=Table, entity_id=self.table2_entity.id.root + ) + assert lineage_id["entity"]["id"] == str(self.table2_entity.id.root) + + # Same thing works if we pass directly the Uuid + lineage_uuid = self.metadata.get_lineage_by_id( + entity=Table, entity_id=self.table2_entity.id + ) + assert lineage_uuid["entity"]["id"] == str(self.table2_entity.id.root) + + # We can also get lineage by name + lineage_str = self.metadata.get_lineage_by_name( + entity=Table, fqn=self.table2_entity.fullyQualifiedName.root + ) + assert lineage_str["entity"]["id"] == str(self.table2_entity.id.root) + + # Or passing the FQN + lineage_fqn = self.metadata.get_lineage_by_name( + entity=Table, fqn=self.table2_entity.fullyQualifiedName + ) + assert lineage_fqn["entity"]["id"] == str(self.table2_entity.id.root) + def test_delete_by_source(self): """ Test case for deleting lineage by source.