Skip to content

Commit

Permalink
Expose deletion API for projects/features (#852)
Browse files Browse the repository at this point in the history
* registry-changes

* update purview

* remove delete functionality for now

* update tests

* remove unused import

* update endpoints

* fix locking issue

* Update _feature_registry_purview.py

* remove cascading delete

* Update feature_registry.py

* update access control

* update status code to 412
  • Loading branch information
aabbasi-hbo authored Dec 1, 2022
1 parent cae6a99 commit 858f88f
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 5 deletions.
2 changes: 1 addition & 1 deletion FeathrRegistry.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN npm install && npm run build
FROM python:3.9

## Install dependencies
RUN apt-get update -y && apt-get install -y nginx
RUN apt-get update -y && apt-get install -y nginx freetds-dev
COPY ./registry /usr/src/registry
WORKDIR /usr/src/registry/sql-registry
RUN pip install -r requirements.txt
Expand Down
12 changes: 12 additions & 0 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ def list_registered_features(self, project_name: str = None) -> List[str]:
`project_name` must not be None or empty string because it violates the RBAC policy
"""
return self.registry.list_registered_features(project_name)

def list_dependent_entities(self, qualified_name: str):
"""
Lists all dependent/downstream entities for a given entity
"""
return self.registry.list_dependent_entities(qualified_name)

def delete_entity(self, qualified_name: str):
"""
Deletes a single entity if it has no downstream/dependent entities
"""
return self.registry.delete_entity(qualified_name)

def _get_registry_client(self):
"""
Expand Down
21 changes: 21 additions & 0 deletions feathr_project/feathr/registry/_feathr_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ def list_registered_features(self, project_name: str) -> List[str]:
"id": r["guid"],
"qualifiedName": r["attributes"]["qualifiedName"],
} for r in resp]

def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
resp = self._get(f"/dependent/{qualified_name}")
return [{
"name": r["attributes"]["name"],
"id": r["guid"],
"qualifiedName": r["attributes"]["qualifiedName"],
} for r in resp]

def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
self._delete(f"/entity/{qualified_name}")

def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]:
"""
Expand Down Expand Up @@ -187,6 +204,10 @@ def _create_derived_feature(self, s: DerivedFeature) -> UUID:
def _get(self, path: str) -> dict:
logging.debug("PATH: ", path)
return check(requests.get(f"{self.endpoint}{path}", headers=self._get_auth_header())).json()

def _delete(self, path: str) -> dict:
logging.debug("PATH: ", path)
return check(requests.delete(f"{self.endpoint}{path}", headers=self._get_auth_header())).json()

def _post(self, path: str, body: dict) -> dict:
logging.debug("PATH: ", path)
Expand Down
12 changes: 12 additions & 0 deletions feathr_project/feathr/registry/_feature_registry_purview.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,18 @@ def list_registered_features(self, project_name: str, limit=1000, starting_offse
feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']})

return feature_list

def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
raise NotImplementedError("Delete functionality supported through API")

def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
raise NotImplementedError("Delete functionality supported through API")

def get_feature_by_fqdn_type(self, qualifiedName, typeName):
"""
Expand Down
14 changes: 14 additions & 0 deletions feathr_project/feathr/registry/feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ def list_registered_features(self, project_name: str) -> List[str]:
"""
pass

@abstractmethod
def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
pass

@abstractmethod
def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
pass

@abstractmethod
def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]:
"""[Sync Features from registry to local workspace, given a project_name, will write project's features from registry to to user's local workspace]
Expand Down
9 changes: 9 additions & 0 deletions registry/access_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ async def get_project(project: str, response: Response, access: UserAccess = Dep
headers=get_api_header(access.user_name)))
return res

@router.get("/dependent/{entity}", name="Get downstream/dependent entitites for a given entity [Read Access Required]")
def get_dependent_entities(entity: str, access: UserAccess = Depends(project_read_access)):
response = requests.get(url=f"{registry_url}/dependent/{entity}",
headers=get_api_header(access.user_name)).content.decode('utf-8')
return json.loads(response)

@router.get("/projects/{project}/datasources", name="Get data sources of my project [Read Access Required]")
def get_project_datasources(project: str, response: Response, access: UserAccess = Depends(project_read_access)) -> list:
Expand Down Expand Up @@ -57,6 +62,10 @@ def get_feature(feature: str, response: Response, requestor: User = Depends(get_
feature_qualifiedName, requestor, AccessType.READ)
return res

@router.delete("/entity/{entity}", name="Deletes a single entity by qualified name [Write Access Required]")
def delete_entity(entity: str, access: UserAccess = Depends(project_write_access)) -> str:
requests.delete(url=f"{registry_url}/entity/{feature}",
headers=get_api_header(access.user_name)).content.decode('utf-8')

@router.get("/features/{feature}/lineage", name="Get Feature Lineage [Read Access Required]")
def get_feature_lineage(feature: str, response: Response, requestor: User = Depends(get_user)) -> dict:
Expand Down
6 changes: 6 additions & 0 deletions registry/purview-registry/api-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ Get everything defined in the project

Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships)

### `GET /dependent/{entity}`
Gets downstream/dependent entities for given entity

### `GET /projects/{project}/datasources`
Get all sources defined in the project.

Expand Down Expand Up @@ -320,6 +323,9 @@ Response Type: Object
| entity | [`Entity`](#entity) | |
| referredEntities| `map<string, object>` | For compatibility, not used |

### `DELETE /entity/{entity}`
Deletes entity

### `POST /projects`
Create new project

Expand Down
17 changes: 16 additions & 1 deletion registry/purview-registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ def get_projects_ids() -> dict:
def get_projects(project: str) -> dict:
return to_camel(registry.get_project(project).to_dict())

@router.get("/dependent/{entity}")
def get_dependent_entities(entity: str) -> list:
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
return list([e.to_dict() for e in downstream_entities])

@router.delete("/entity/{entity}")
def delete_entity(entity: str):
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
if len(downstream_entities) > 0:
raise HTTPException(
status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities.
Entities: {list([e.qualified_name for e in downstream_entities])}"""
)
registry.delete_entity(entity_id)

@router.get("/projects/{project}/datasources",tags=["Project"])
def get_project_datasources(project: str) -> list:
Expand Down Expand Up @@ -142,7 +158,6 @@ def get_feature(feature: str) -> dict:
status_code=404, detail=f"Feature {feature} not found")
return to_camel(e.to_dict())


@router.get("/features/{feature}/lineage",tags=["Feature"])
def get_feature_lineage(feature: str) -> dict:
lineage = registry.get_lineage(feature)
Expand Down
14 changes: 14 additions & 0 deletions registry/purview-registry/registry/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,17 @@ def create_project_anchor_feature(self, project_id: UUID, anchor_id: UUID, defin
@abstractmethod
def create_project_derived_feature(self, project_id: UUID, definition: DerivedFeatureDef) -> UUID:
pass

@abstractmethod
def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependent on given entity
"""
pass

@abstractmethod
def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
pass
31 changes: 30 additions & 1 deletion registry/purview-registry/registry/purview_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,35 @@ def get_lineage(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations:
return EntitiesAndRelations(
upstream_entities + downstream_entities,
upstream_edges + downstream_edges)

def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependent on given entity
"""
entity_id = self.get_entity_id(entity_id)
entity = self.get_entity(entity_id)
downstream_entities = []
if entity.entity_type == EntityType.Project:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type == EntityType.Source:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
if entity.entity_type == EntityType.Anchor:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature):
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
return [e for e in downstream_entities if str(e.id) != str(entity_id)]

def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
entity_id = self.get_entity_id(entity_id)
neighbors = self.get_all_neighbours(entity_id)
edge_guids = [str(x.id) for x in neighbors]
# Delete all edges associated with entity
self.purview_client.delete_entity(edge_guids)
#Delete entity
self.purview_client.delete_entity(str(entity_id))

def _get_edges(self, ids: list[UUID]) -> list[Edge]:
all_edges = set()
Expand All @@ -208,7 +237,7 @@ def _get_edges(self, ids: list[UUID]) -> list[Edge]:
and neighbour.to_id in ids:
all_edges.add(neighbour)
return list(all_edges)

def _create_edge_from_process(self, name:str, guid: str) -> Edge:
names = name.split(self.registry_delimiter)
return Edge(guid, names[1], names[2], RelationshipType.new(names[0]))
Expand Down
21 changes: 21 additions & 0 deletions registry/purview-registry/test/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,24 @@
name="df1", feature_type=ft1, transformation=t1, key=[k], input_anchor_features=[feature1], input_derived_features=[]))

print(proj_id,source_id,anchor1_id,feature1,derived)

derived_downstream_entities = registry.get_dependent_entities(derived)
assert len(derived_downstream_entities) == 0

feature1_downstream_entities = registry.get_dependent_entities(feature1)
assert len(feature1_downstream_entities) == 1

registry.delete_entity(derived)

# Try getting derived feature but KeyError exception should be thrown
derived_exists = 1
try:
df1 = registry.get_entity(derived)
except KeyError:
derived_exists = 0
assert derived_exists == 0

feature1_downstream_entities = registry.get_dependent_entities(feature1)
assert len(feature1_downstream_entities) == 0

# cleanup()
6 changes: 6 additions & 0 deletions registry/sql-registry/api-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ Response Type: `dict`
### `GET /projects/{project}`
Get everything defined in the project

### `GET /dependent/{entity}`
Gets downstream/dependent entities for given entity

Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships)

### `GET /projects/{project}/datasources`
Expand Down Expand Up @@ -320,6 +323,9 @@ Response Type: Object
| entity | [`Entity`](#entity) | |
| referredEntities| `map<string, object>` | For compatibility, not used |

### `DELETE /entity/{entity}`
Deletes entity

### `POST /projects`
Create new project

Expand Down
18 changes: 16 additions & 2 deletions registry/sql-registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ def get_projects_ids() -> dict:
def get_projects(project: str) -> dict:
return registry.get_project(project).to_dict()

@router.get("/dependent/{entity}")
def get_dependent_entities(entity: str) -> list:
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
return list([e.to_dict() for e in downstream_entities])

@router.delete("/entity/{entity}")
def delete_entity(entity: str):
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
if len(downstream_entities) > 0:
raise HTTPException(
status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities.
Entities: {list([e.qualified_name for e in downstream_entities])}"""
)
registry.delete_entity(entity_id)

@router.get("/projects/{project}/datasources")
def get_project_datasources(project: str) -> list:
Expand Down Expand Up @@ -135,13 +151,11 @@ def get_feature(feature: str) -> dict:
status_code=404, detail=f"Feature {feature} not found")
return e.to_dict()


@router.get("/features/{feature}/lineage")
def get_feature_lineage(feature: str) -> dict:
lineage = registry.get_lineage(feature)
return lineage.to_dict()


@router.post("/projects")
def new_project(definition: dict) -> dict:
id = registry.create_project(ProjectDef(**to_snake(definition)))
Expand Down
40 changes: 40 additions & 0 deletions registry/sql-registry/registry/db_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations:
df.attributes.input_features = features
all_edges = self._get_edges(ids)
return EntitiesAndRelations([project] + children, list(edges.union(all_edges)))

def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependant on the given entity
"""
entity_id = self.get_entity_id(entity_id)
entity = self.get_entity(entity_id)
downstream_entities = []
if entity.entity_type == EntityType.Project:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type == EntityType.Source:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
if entity.entity_type == EntityType.Anchor:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature):
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
return [e for e in downstream_entities if str(e.id) != str(entity_id)]

def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
entity_id = self.get_entity_id(entity_id)
with self.conn.transaction() as c:
self._delete_all_entity_edges(c, entity_id)
self._delete_entity(c, entity_id)

def search_entity(self,
keyword: str,
Expand Down Expand Up @@ -386,6 +412,20 @@ def _create_edge(self, cursor, from_id: UUID, to_id: UUID, type: RelationshipTyp
"to_id": str(to_id),
"type": type.name
})

def _delete_all_entity_edges(self, cursor, entity_id: UUID):
"""
Deletes all edges associated with an entity
"""
sql = fr'''DELETE FROM edges WHERE from_id = %s OR to_id = %s'''
cursor.execute(sql, (str(entity_id), str(entity_id)))

def _delete_entity(self, cursor, entity_id: UUID):
"""
Deletes entity from entities table
"""
sql = fr'''DELETE FROM entities WHERE entity_id = %s'''
cursor.execute(sql, str(entity_id))

def _fill_entity(self, e: Entity) -> Entity:
"""
Expand Down
14 changes: 14 additions & 0 deletions registry/sql-registry/registry/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,17 @@ def create_project_derived_feature(self, project_id: UUID, definition: DerivedFe
Create a new derived feature under the project
"""
pass

@abstractmethod
def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependant on the given entity
"""
pass

@abstractmethod
def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
pass
Loading

0 comments on commit 858f88f

Please sign in to comment.