diff --git a/FeathrRegistry.Dockerfile b/FeathrRegistry.Dockerfile index f3c2d6792..c127b81c6 100644 --- a/FeathrRegistry.Dockerfile +++ b/FeathrRegistry.Dockerfile @@ -11,7 +11,7 @@ RUN npm install && npm run build FROM python:3.9 ## Install dependencies -RUN apt-get update -y && apt-get install -y nginx +RUN apt-get update -y && apt-get install -y nginx freetds-dev COPY ./registry /usr/src/registry WORKDIR /usr/src/registry/sql-registry RUN pip install -r requirements.txt diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 63cd07c1e..673e488df 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -244,6 +244,18 @@ def list_registered_features(self, project_name: str = None) -> List[str]: `project_name` must not be None or empty string because it violates the RBAC policy """ return self.registry.list_registered_features(project_name) + + def list_dependent_entities(self, qualified_name: str): + """ + Lists all dependent/downstream entities for a given entity + """ + return self.registry.list_dependent_entities(qualified_name) + + def delete_entity(self, qualified_name: str): + """ + Deletes a single entity if it has no downstream/dependent entities + """ + return self.registry.delete_entity(qualified_name) def _get_registry_client(self): """ diff --git a/feathr_project/feathr/registry/_feathr_registry_client.py b/feathr_project/feathr/registry/_feathr_registry_client.py index 98397627a..6bcc61869 100644 --- a/feathr_project/feathr/registry/_feathr_registry_client.py +++ b/feathr_project/feathr/registry/_feathr_registry_client.py @@ -136,6 +136,23 @@ def list_registered_features(self, project_name: str) -> List[str]: "id": r["guid"], "qualifiedName": r["attributes"]["qualifiedName"], } for r in resp] + + def list_dependent_entities(self, qualified_name: str): + """ + Returns list of dependent entities for provided entity + """ + resp = self._get(f"/dependent/{qualified_name}") + return [{ + "name": r["attributes"]["name"], + "id": r["guid"], + "qualifiedName": r["attributes"]["qualifiedName"], + } for r in resp] + + def delete_entity(self, qualified_name: str): + """ + Deletes entity if it has no dependent entities + """ + self._delete(f"/entity/{qualified_name}") def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]: """ @@ -187,6 +204,10 @@ def _create_derived_feature(self, s: DerivedFeature) -> UUID: def _get(self, path: str) -> dict: logging.debug("PATH: ", path) return check(requests.get(f"{self.endpoint}{path}", headers=self._get_auth_header())).json() + + def _delete(self, path: str) -> dict: + logging.debug("PATH: ", path) + return check(requests.delete(f"{self.endpoint}{path}", headers=self._get_auth_header())).json() def _post(self, path: str, body: dict) -> dict: logging.debug("PATH: ", path) diff --git a/feathr_project/feathr/registry/_feature_registry_purview.py b/feathr_project/feathr/registry/_feature_registry_purview.py index 395c305b1..f6b1a4244 100644 --- a/feathr_project/feathr/registry/_feature_registry_purview.py +++ b/feathr_project/feathr/registry/_feature_registry_purview.py @@ -1083,6 +1083,18 @@ def list_registered_features(self, project_name: str, limit=1000, starting_offse feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']}) return feature_list + + def list_dependent_entities(self, qualified_name: str): + """ + Returns list of dependent entities for provided entity + """ + raise NotImplementedError("Delete functionality supported through API") + + def delete_entity(self, qualified_name: str): + """ + Deletes entity if it has no dependent entities + """ + raise NotImplementedError("Delete functionality supported through API") def get_feature_by_fqdn_type(self, qualifiedName, typeName): """ diff --git a/feathr_project/feathr/registry/feature_registry.py b/feathr_project/feathr/registry/feature_registry.py index 3f10fb3fb..a5433db48 100644 --- a/feathr_project/feathr/registry/feature_registry.py +++ b/feathr_project/feathr/registry/feature_registry.py @@ -28,6 +28,20 @@ def list_registered_features(self, project_name: str) -> List[str]: """ pass + @abstractmethod + def list_dependent_entities(self, qualified_name: str): + """ + Returns list of dependent entities for provided entity + """ + pass + + @abstractmethod + def delete_entity(self, qualified_name: str): + """ + Deletes entity if it has no dependent entities + """ + pass + @abstractmethod def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]: """[Sync Features from registry to local workspace, given a project_name, will write project's features from registry to to user's local workspace] diff --git a/registry/access_control/api.py b/registry/access_control/api.py index 8a95d28ad..208ea5b31 100644 --- a/registry/access_control/api.py +++ b/registry/access_control/api.py @@ -25,6 +25,11 @@ async def get_project(project: str, access: UserAccess = Depends(project_read_a headers=get_api_header(access.user_name)).content.decode('utf-8') return json.loads(response) +@router.get("/dependent/{entity}", name="Get downstream/dependent entitites for a given entity [Read Access Required]") +def get_dependent_entities(entity: str, access: UserAccess = Depends(project_read_access)): + response = requests.get(url=f"{registry_url}/dependent/{entity}", + headers=get_api_header(access.user_name)).content.decode('utf-8') + return json.loads(response) @router.get("/projects/{project}/datasources", name="Get data sources of my project [Read Access Required]") def get_project_datasources(project: str, access: UserAccess = Depends(project_read_access)) -> list: @@ -58,6 +63,10 @@ def get_feature(feature: str, requestor: User = Depends(get_user)) -> dict: feature_qualifiedName, requestor, AccessType.READ) return ret +@router.delete("/entity/{entity}", name="Deletes a single entity by qualified name [Write Access Required]") +def delete_entity(entity: str, access: UserAccess = Depends(project_write_access)) -> str: + requests.delete(url=f"{registry_url}/entity/{feature}", + headers=get_api_header(access.user_name)).content.decode('utf-8') @router.get("/features/{feature}/lineage", name="Get Feature Lineage [Read Access Required]") def get_feature_lineage(feature: str, requestor: User = Depends(get_user)) -> dict: diff --git a/registry/purview-registry/api-spec.md b/registry/purview-registry/api-spec.md index d2e82a878..52172f6df 100644 --- a/registry/purview-registry/api-spec.md +++ b/registry/purview-registry/api-spec.md @@ -287,6 +287,9 @@ Get everything defined in the project Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships) +### `GET /dependent/{entity}` +Gets downstream/dependent entities for given entity + ### `GET /projects/{project}/datasources` Get all sources defined in the project. @@ -320,6 +323,9 @@ Response Type: Object | entity | [`Entity`](#entity) | | | referredEntities| `map` | For compatibility, not used | +### `DELETE /entity/{entity}` +Deletes entity + ### `POST /projects` Create new project diff --git a/registry/purview-registry/main.py b/registry/purview-registry/main.py index 5d38adf74..3b3441142 100644 --- a/registry/purview-registry/main.py +++ b/registry/purview-registry/main.py @@ -56,6 +56,22 @@ def get_projects_ids() -> dict: def get_projects(project: str) -> dict: return to_camel(registry.get_project(project).to_dict()) +@router.get("/dependent/{entity}") +def get_dependent_entities(entity: str) -> list: + entity_id = registry.get_entity_id(entity) + downstream_entities = registry.get_dependent_entities(entity_id) + return list([e.to_dict() for e in downstream_entities]) + +@router.delete("/entity/{entity}") +def delete_entity(entity: str): + entity_id = registry.get_entity_id(entity) + downstream_entities = registry.get_dependent_entities(entity_id) + if len(downstream_entities) > 0: + raise HTTPException( + status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities. + Entities: {list([e.qualified_name for e in downstream_entities])}""" + ) + registry.delete_entity(entity_id) @router.get("/projects/{project}/datasources",tags=["Project"]) def get_project_datasources(project: str) -> list: @@ -90,7 +106,6 @@ def get_feature(feature: str) -> dict: status_code=404, detail=f"Feature {feature} not found") return to_camel(e.to_dict()) - @router.get("/features/{feature}/lineage",tags=["Feature"]) def get_feature_lineage(feature: str) -> dict: lineage = registry.get_lineage(feature) diff --git a/registry/purview-registry/registry/interface.py b/registry/purview-registry/registry/interface.py index 7559a3f27..2e60cc32d 100644 --- a/registry/purview-registry/registry/interface.py +++ b/registry/purview-registry/registry/interface.py @@ -92,3 +92,17 @@ def create_project_anchor_feature(self, project_id: UUID, anchor_id: UUID, defin @abstractmethod def create_project_derived_feature(self, project_id: UUID, definition: DerivedFeatureDef) -> UUID: pass + + @abstractmethod + def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]: + """ + Given entity id, returns list of all entities that are downstream/dependent on given entity + """ + pass + + @abstractmethod + def delete_entity(self, entity_id: Union[str, UUID]): + """ + Deletes given entity + """ + pass diff --git a/registry/purview-registry/registry/purview_registry.py b/registry/purview-registry/registry/purview_registry.py index 15a650167..2924cc93f 100644 --- a/registry/purview-registry/registry/purview_registry.py +++ b/registry/purview-registry/registry/purview_registry.py @@ -191,6 +191,35 @@ def get_lineage(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: return EntitiesAndRelations( upstream_entities + downstream_entities, upstream_edges + downstream_edges) + + def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]: + """ + Given entity id, returns list of all entities that are downstream/dependent on given entity + """ + entity_id = self.get_entity_id(entity_id) + entity = self.get_entity(entity_id) + downstream_entities = [] + if entity.entity_type == EntityType.Project: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains) + if entity.entity_type == EntityType.Source: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces) + if entity.entity_type == EntityType.Anchor: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains) + if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature): + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces) + return [e for e in downstream_entities if str(e.id) != str(entity_id)] + + def delete_entity(self, entity_id: Union[str, UUID]): + """ + Deletes given entity + """ + entity_id = self.get_entity_id(entity_id) + neighbors = self.get_all_neighbours(entity_id) + edge_guids = [str(x.id) for x in neighbors] + # Delete all edges associated with entity + self.purview_client.delete_entity(edge_guids) + #Delete entity + self.purview_client.delete_entity(str(entity_id)) def _get_edges(self, ids: list[UUID]) -> list[Edge]: all_edges = set() @@ -201,7 +230,7 @@ def _get_edges(self, ids: list[UUID]) -> list[Edge]: and neighbour.to_id in ids: all_edges.add(neighbour) return list(all_edges) - + def _create_edge_from_process(self, name:str, guid: str) -> Edge: names = name.split(self.registry_delimiter) return Edge(guid, names[1], names[2], RelationshipType.new(names[0])) diff --git a/registry/purview-registry/test/test_creation.py b/registry/purview-registry/test/test_creation.py index d99364cfc..71696fc9e 100644 --- a/registry/purview-registry/test/test_creation.py +++ b/registry/purview-registry/test/test_creation.py @@ -21,3 +21,24 @@ name="df1", feature_type=ft1, transformation=t1, key=[k], input_anchor_features=[feature1], input_derived_features=[])) print(proj_id,source_id,anchor1_id,feature1,derived) + +derived_downstream_entities = registry.get_dependent_entities(derived) +assert len(derived_downstream_entities) == 0 + +feature1_downstream_entities = registry.get_dependent_entities(feature1) +assert len(feature1_downstream_entities) == 1 + +registry.delete_entity(derived) + +# Try getting derived feature but KeyError exception should be thrown +derived_exists = 1 +try: + df1 = registry.get_entity(derived) +except KeyError: + derived_exists = 0 +assert derived_exists == 0 + +feature1_downstream_entities = registry.get_dependent_entities(feature1) +assert len(feature1_downstream_entities) == 0 + +# cleanup() diff --git a/registry/sql-registry/api-spec.md b/registry/sql-registry/api-spec.md index d2e82a878..b4ec243dc 100644 --- a/registry/sql-registry/api-spec.md +++ b/registry/sql-registry/api-spec.md @@ -285,6 +285,9 @@ Response Type: `dict` ### `GET /projects/{project}` Get everything defined in the project +### `GET /dependent/{entity}` +Gets downstream/dependent entities for given entity + Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships) ### `GET /projects/{project}/datasources` @@ -320,6 +323,9 @@ Response Type: Object | entity | [`Entity`](#entity) | | | referredEntities| `map` | For compatibility, not used | +### `DELETE /entity/{entity}` +Deletes entity + ### `POST /projects` Create new project diff --git a/registry/sql-registry/main.py b/registry/sql-registry/main.py index 46cefbb34..dcb4d79cb 100644 --- a/registry/sql-registry/main.py +++ b/registry/sql-registry/main.py @@ -86,6 +86,22 @@ def get_projects_ids() -> dict: def get_projects(project: str) -> dict: return registry.get_project(project).to_dict() +@router.get("/dependent/{entity}") +def get_dependent_entities(entity: str) -> list: + entity_id = registry.get_entity_id(entity) + downstream_entities = registry.get_dependent_entities(entity_id) + return list([e.to_dict() for e in downstream_entities]) + +@router.delete("/entity/{entity}") +def delete_entity(entity: str): + entity_id = registry.get_entity_id(entity) + downstream_entities = registry.get_dependent_entities(entity_id) + if len(downstream_entities) > 0: + raise HTTPException( + status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities. + Entities: {list([e.qualified_name for e in downstream_entities])}""" + ) + registry.delete_entity(entity_id) @router.get("/projects/{project}/datasources") def get_project_datasources(project: str) -> list: @@ -135,13 +151,11 @@ def get_feature(feature: str) -> dict: status_code=404, detail=f"Feature {feature} not found") return e.to_dict() - @router.get("/features/{feature}/lineage") def get_feature_lineage(feature: str) -> dict: lineage = registry.get_lineage(feature) return lineage.to_dict() - @router.post("/projects") def new_project(definition: dict) -> dict: id = registry.create_project(ProjectDef(**to_snake(definition))) diff --git a/registry/sql-registry/registry/db_registry.py b/registry/sql-registry/registry/db_registry.py index 1553508d8..d0b4c75c5 100644 --- a/registry/sql-registry/registry/db_registry.py +++ b/registry/sql-registry/registry/db_registry.py @@ -105,6 +105,32 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: df.attributes.input_features = features all_edges = self._get_edges(ids) return EntitiesAndRelations([project] + children, list(edges.union(all_edges))) + + def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]: + """ + Given entity id, returns list of all entities that are downstream/dependant on the given entity + """ + entity_id = self.get_entity_id(entity_id) + entity = self.get_entity(entity_id) + downstream_entities = [] + if entity.entity_type == EntityType.Project: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains) + if entity.entity_type == EntityType.Source: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces) + if entity.entity_type == EntityType.Anchor: + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains) + if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature): + downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces) + return [e for e in downstream_entities if str(e.id) != str(entity_id)] + + def delete_entity(self, entity_id: Union[str, UUID]): + """ + Deletes given entity + """ + entity_id = self.get_entity_id(entity_id) + with self.conn.transaction() as c: + self._delete_all_entity_edges(c, entity_id) + self._delete_entity(c, entity_id) def search_entity(self, keyword: str, @@ -386,6 +412,20 @@ def _create_edge(self, cursor, from_id: UUID, to_id: UUID, type: RelationshipTyp "to_id": str(to_id), "type": type.name }) + + def _delete_all_entity_edges(self, cursor, entity_id: UUID): + """ + Deletes all edges associated with an entity + """ + sql = fr'''DELETE FROM edges WHERE from_id = %s OR to_id = %s''' + cursor.execute(sql, (str(entity_id), str(entity_id))) + + def _delete_entity(self, cursor, entity_id: UUID): + """ + Deletes entity from entities table + """ + sql = fr'''DELETE FROM entities WHERE entity_id = %s''' + cursor.execute(sql, str(entity_id)) def _fill_entity(self, e: Entity) -> Entity: """ diff --git a/registry/sql-registry/registry/interface.py b/registry/sql-registry/registry/interface.py index 7f1439079..62f6071cd 100644 --- a/registry/sql-registry/registry/interface.py +++ b/registry/sql-registry/registry/interface.py @@ -111,3 +111,17 @@ def create_project_derived_feature(self, project_id: UUID, definition: DerivedFe Create a new derived feature under the project """ pass + + @abstractmethod + def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]: + """ + Given entity id, returns list of all entities that are downstream/dependant on the given entity + """ + pass + + @abstractmethod + def delete_entity(self, entity_id: Union[str, UUID]): + """ + Deletes given entity + """ + pass \ No newline at end of file diff --git a/registry/sql-registry/test/test_create.py b/registry/sql-registry/test/test_create.py index d3077698b..fd6ba74df 100644 --- a/registry/sql-registry/test/test_create.py +++ b/registry/sql-registry/test/test_create.py @@ -55,4 +55,24 @@ def cleanup(): # df1 has only 1 input anchor feature "af1" assert df1.attributes.input_anchor_features[0].id == af1_id +df1_downstream_entities = r.get_dependent_entities(df1_id) +assert len(df1_downstream_entities) == 0 + +af1_downstream_entities = r.get_dependent_entities(af1_id) +assert len(af1_downstream_entities) == 1 + +#Delete derived feature +r.delete_entity(df1_id) + +# Try getting derived feature but KeyError exception should be thrown +derived_exists = 1 +try: + df1 = r.get_entity(df1_id) +except KeyError: + derived_exists = 0 +assert derived_exists == 0 + +af1_downstream_entities = r.get_dependent_entities(af1_id) +assert len(af1_downstream_entities) == 0 + # cleanup()