From 5f293d05715e7fcdd4b4cc58cb399f34573cc381 Mon Sep 17 00:00:00 2001 From: Enya-Yx Date: Tue, 9 Aug 2022 20:55:03 +0800 Subject: [PATCH 1/3] optimize purview search logic --- registry/purview-registry/registry/models.py | 20 ++++-- .../registry/purview_registry.py | 67 +++++++++++++++++-- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/registry/purview-registry/registry/models.py b/registry/purview-registry/registry/models.py index d1e174e0d..667e7bddd 100644 --- a/registry/purview-registry/registry/models.py +++ b/registry/purview-registry/registry/models.py @@ -113,9 +113,18 @@ def __str__(self): class RelationshipType(Enum): Contains = 1 BelongsTo = 2 - Consumes = 3 - Produces = 4 + Consumes = 4 + Produces = 8 + @staticmethod + def new(r): + return { + "CONTAINS": RelationshipType.Contains, + "CONTAIN": RelationshipType.Contains, + "BELONGSTO": RelationshipType.BelongsTo, + "CONSUMES": RelationshipType.Consumes, + "PRODUCES": RelationshipType.Produces, + }[r] class ToDict(ABC): """ @@ -522,8 +531,10 @@ def to_dict(self) -> dict: "features": list([e.get_ref().to_dict() for e in self.features]), "tags": self.tags, } - if self.source is not None: - ret["source"] = self.source.get_ref().to_dict() + if self.source is not None and isinstance(self.source, Attributes): + source_ref = self.source.get_ref() + if source_ref is not None: + ret["source"] = source_ref.to_dict() return ret @@ -655,6 +666,7 @@ def to_dict(self) -> dict: "fromEntityId": str(self.from_id), "toEntityId": str(self.to_id), "relationshipType": self.conn_type.name, + "relationshipTypeValue": self.conn_type.value, } diff --git a/registry/purview-registry/registry/purview_registry.py b/registry/purview-registry/registry/purview_registry.py index d1922bc94..7b2decb4b 100644 --- a/registry/purview-registry/registry/purview_registry.py +++ b/registry/purview-registry/registry/purview_registry.py @@ -1,4 +1,4 @@ - +import copy from http.client import CONFLICT, HTTPException import itertools from typing import Any, Optional, Tuple, Union @@ -15,7 +15,7 @@ from pyhocon import ConfigFactory from registry.interface import Registry -from registry.models import AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EntitiesAndRelations, Entity, EntityRef, EntityType, ProjectDef, RelationshipType, SourceDef, _to_uuid +from registry.models import AnchorDef, AnchorFeatureDef, DerivedFeatureDef, Edge, EntitiesAndRelations, Entity, EntityRef, EntityType, ProjectDef, RelationshipType, SourceDef, Attributes, _to_uuid Label_Contains = "CONTAINS" Label_BelongsTo = "BELONGSTO" Label_Consumes = "CONSUMES" @@ -38,7 +38,7 @@ def __init__(self,azure_purview_name: str, registry_delimiter: str = "__", crede self.guid = GuidTracker(starting=-1000) if register_types: self._register_feathr_feature_types() - + def get_projects(self) -> list[str]: """ Returns the names of all projects @@ -47,7 +47,7 @@ def get_projects(self) -> list[str]: result = self.purview_client.discovery.query(filter=searchTerm) result_entities = result['value'] return [x['qualifiedName'] for x in result_entities] - + def get_entity(self, id_or_name: Union[str, UUID],recursive = False) -> Entity: id = self.get_entity_id(id_or_name) if not id: @@ -137,6 +137,7 @@ def get_all_neighbours(self,id_or_name: Union[str, UUID]) -> list[Edge]: relation_lookup[x['attributes']['qualifiedName'].split(self.registry_delimiter)[0]]) for x in out_edges]) return result_edges + def get_neighbors(self, id_or_name: Union[str, UUID], relationship: RelationshipType) -> list[Edge]: """ Get list of edges with specified type that connect to this entity. @@ -184,7 +185,63 @@ def _get_edges(self, ids: list[UUID]) -> list[Edge]: all_edges.add(neighbour) return list(all_edges) + def _create_edge_from_process(self, name:str, guid: str) -> Edge: + names = name.split(self.registry_delimiter) + return Edge(guid, names[1], names[2], RelationshipType.new(names[0])) + def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: + project_id = self.get_entity_id(id_or_name) + if not project_id: + return None + lineage = self.purview_client.get_entity_lineage(project_id) + guidAtlasEntityMap = lineage['guidEntityMap'] + + guidEntityMap = {} + finalGuidEntityMap = {} + edges = [] + targetsRelationships = { + EntityType.Project: RelationshipType.Contains.value, + EntityType.Anchor: RelationshipType.Produces.value | RelationshipType.Consumes.value, + EntityType.DerivedFeature: RelationshipType.Consumes.value + } + + # Build edges and entities from guidEntityMap + for id,entity in guidAtlasEntityMap.items(): + if entity['typeName'] == 'Process': + name = entity['attributes']['qualifiedName'] + if not name.startswith('BELONGSTO'): + edges.append(self._create_edge_from_process(name, id)) + else: + guidEntityMap[id] = self._atlasEntity_to_entity(entity) + finalGuidEntityMap = copy.deepcopy(guidEntityMap) + + # Add relationships among each entity + for edge in edges: + edge_dict = edge.to_dict() + relationship = edge_dict['relationshipTypeValue'] + + fromId = edge_dict['fromEntityId'] + fromEntity = guidEntityMap[fromId] + fromEntityType = EntityType.new(fromEntity.to_dict()['typeName']) + if fromEntityType in targetsRelationships and targetsRelationships[fromEntityType] & relationship != 0: + toId = edge_dict['toEntityId'] + toEntity = guidEntityMap[toId] + toEntitytype = EntityType.new(toEntity.to_dict()['typeName']) + if fromEntityType == EntityType.Project: + finalGuidEntityMap[fromId].attributes.children.append(toEntity) + elif fromEntityType == EntityType.Anchor: + if toEntitytype == EntityType.Source: + finalGuidEntityMap[fromId].attributes.source = toEntity + else: + finalGuidEntityMap[fromId].attributes.features.append(toEntity) + else: + curr_input_features = finalGuidEntityMap[fromId].attributes.input_features + curr_input_features.append(toEntity) + finalGuidEntityMap[fromId].attributes.input_features = curr_input_features + + return EntitiesAndRelations(list(finalGuidEntityMap.values()), list(edges)) + + def get_project_bac(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: """ Get a project and everything inside of it, both entities and edges """ @@ -231,8 +288,6 @@ def search_entity(self, continue result.append(EntityRef(UUID(entity_id),entity_type,qualified_name)) return result - - def create_project(self, definition: ProjectDef) -> UUID: attrs = definition.to_attr().to_dict() From fe113b5027596753b38daa6871ed15de1e20c59e Mon Sep 17 00:00:00 2001 From: Enya-Yx Date: Tue, 9 Aug 2022 20:59:39 +0800 Subject: [PATCH 2/3] remove extra get_project method --- .../registry/purview_registry.py | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/registry/purview-registry/registry/purview_registry.py b/registry/purview-registry/registry/purview_registry.py index 7b2decb4b..b059bdf34 100644 --- a/registry/purview-registry/registry/purview_registry.py +++ b/registry/purview-registry/registry/purview_registry.py @@ -241,34 +241,6 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: return EntitiesAndRelations(list(finalGuidEntityMap.values()), list(edges)) - def get_project_bac(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: - """ - Get a project and everything inside of it, both entities and edges - """ - project = self.get_entity(id_or_name) - edges = set(self.get_neighbors(id_or_name, RelationshipType.Contains)) - ids = list([e.to_id for e in edges]) - all_edges = self._get_edges(ids) - children = self.get_entities(ids) - child_map = dict([(e.id, e) for e in children]) - project.attributes.children = children - for anchor in project.attributes.anchors: - conn = self.get_neighbors(anchor.id, RelationshipType.Contains) - feature_ids = [e.to_id for e in conn] - edges = edges.union(conn) - features = list([child_map[id] for id in feature_ids]) - anchor.attributes.features = features - source_id = self.get_neighbors( - anchor.id, RelationshipType.Consumes)[0].to_id - anchor.attributes.source = child_map[source_id] - for df in project.attributes.derived_features: - conn = self.get_neighbors(anchor.id, RelationshipType.Consumes) - input_ids = [e.to_id for e in conn] - edges = edges.union(conn) - features = list([child_map[id] for id in input_ids]) - df.attributes.input_features = features - return EntitiesAndRelations([project] + children, list(edges.union(all_edges))) - def search_entity(self, keyword: str, type: list[EntityType], From b72b93bd183f88a2419ad8e38d4a8388be027e13 Mon Sep 17 00:00:00 2001 From: Enya-Yx Date: Fri, 12 Aug 2022 15:16:39 +0800 Subject: [PATCH 3/3] minor issues fix --- registry/purview-registry/registry/models.py | 2 +- registry/purview-registry/registry/purview_registry.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/registry/purview-registry/registry/models.py b/registry/purview-registry/registry/models.py index 667e7bddd..e54e33c82 100644 --- a/registry/purview-registry/registry/models.py +++ b/registry/purview-registry/registry/models.py @@ -531,7 +531,7 @@ def to_dict(self) -> dict: "features": list([e.get_ref().to_dict() for e in self.features]), "tags": self.tags, } - if self.source is not None and isinstance(self.source, Attributes): + if self.source is not None and isinstance(self.source, EntityRef): source_ref = self.source.get_ref() if source_ref is not None: ret["source"] = source_ref.to_dict() diff --git a/registry/purview-registry/registry/purview_registry.py b/registry/purview-registry/registry/purview_registry.py index b059bdf34..3e78ece80 100644 --- a/registry/purview-registry/registry/purview_registry.py +++ b/registry/purview-registry/registry/purview_registry.py @@ -201,7 +201,7 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: edges = [] targetsRelationships = { EntityType.Project: RelationshipType.Contains.value, - EntityType.Anchor: RelationshipType.Produces.value | RelationshipType.Consumes.value, + EntityType.Anchor: RelationshipType.Contains.value | RelationshipType.Consumes.value, EntityType.DerivedFeature: RelationshipType.Consumes.value } @@ -209,7 +209,7 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: for id,entity in guidAtlasEntityMap.items(): if entity['typeName'] == 'Process': name = entity['attributes']['qualifiedName'] - if not name.startswith('BELONGSTO'): + if not (name.startswith('BELONGSTO') and name.endswith(str(project_id))): edges.append(self._create_edge_from_process(name, id)) else: guidEntityMap[id] = self._atlasEntity_to_entity(entity)