Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose deletion API for projects/features #852

Merged
merged 13 commits into from
Dec 1, 2022
2 changes: 1 addition & 1 deletion FeathrRegistry.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN npm install && npm run build
FROM python:3.9

## Install dependencies
RUN apt-get update -y && apt-get install -y nginx
RUN apt-get update -y && apt-get install -y nginx freetds-dev
COPY ./registry /usr/src/registry
WORKDIR /usr/src/registry/sql-registry
RUN pip install -r requirements.txt
Expand Down
12 changes: 12 additions & 0 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ def list_registered_features(self, project_name: str = None) -> List[str]:
`project_name` must not be None or empty string because it violates the RBAC policy
"""
return self.registry.list_registered_features(project_name)

def list_dependent_entities(self, qualified_name: str):
"""
Lists all dependent/downstream entities for a given entity
"""
return self.registry.list_dependent_entities(qualified_name)

def delete_entity(self, qualified_name: str):
"""
Deletes a single entity if it has no downstream/dependent entities
"""
return self.registry.delete_entity(qualified_name)

def _get_registry_client(self):
"""
Expand Down
21 changes: 21 additions & 0 deletions feathr_project/feathr/registry/_feathr_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,23 @@ def list_registered_features(self, project_name: str) -> List[str]:
"id": r["guid"],
"qualifiedName": r["attributes"]["qualifiedName"],
} for r in resp]

def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
resp = self._get(f"/dependent/{qualified_name}")
return [{
"name": r["attributes"]["name"],
"id": r["guid"],
"qualifiedName": r["attributes"]["qualifiedName"],
} for r in resp]

def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
self._delete(f"/entity/{qualified_name}")

def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]:
"""
Expand Down Expand Up @@ -187,6 +204,10 @@ def _create_derived_feature(self, s: DerivedFeature) -> UUID:
def _get(self, path: str) -> dict:
logging.debug("PATH: ", path)
return check(requests.get(f"{self.endpoint}{path}", headers=self._get_auth_header())).json()

def _delete(self, path: str) -> dict:
logging.debug("PATH: ", path)
return check(requests.delete(f"{self.endpoint}{path}", headers=self._get_auth_header())).json()

def _post(self, path: str, body: dict) -> dict:
logging.debug("PATH: ", path)
Expand Down
12 changes: 12 additions & 0 deletions feathr_project/feathr/registry/_feature_registry_purview.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,18 @@ def list_registered_features(self, project_name: str, limit=1000, starting_offse
feature_list.append({"name":entity["name"],'id':entity['id'],"qualifiedName":entity['qualifiedName']})

return feature_list

def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
raise NotImplementedError("Delete functionality supported through API")

def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
raise NotImplementedError("Delete functionality supported through API")

def get_feature_by_fqdn_type(self, qualifiedName, typeName):
"""
Expand Down
14 changes: 14 additions & 0 deletions feathr_project/feathr/registry/feature_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ def list_registered_features(self, project_name: str) -> List[str]:
"""
pass

@abstractmethod
def list_dependent_entities(self, qualified_name: str):
"""
Returns list of dependent entities for provided entity
"""
pass

@abstractmethod
def delete_entity(self, qualified_name: str):
"""
Deletes entity if it has no dependent entities
"""
pass

@abstractmethod
def get_features_from_registry(self, project_name: str) -> Tuple[List[FeatureAnchor], List[DerivedFeature]]:
"""[Sync Features from registry to local workspace, given a project_name, will write project's features from registry to to user's local workspace]
Expand Down
9 changes: 9 additions & 0 deletions registry/access_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ async def get_project(project: str, access: UserAccess = Depends(project_read_a
headers=get_api_header(access.user_name)).content.decode('utf-8')
return json.loads(response)

@router.get("/dependent/{entity}", name="Get downstream/dependent entitites for a given entity [Read Access Required]")
def get_dependent_entities(entity: str, access: UserAccess = Depends(project_read_access)):
response = requests.get(url=f"{registry_url}/dependent/{entity}",
headers=get_api_header(access.user_name)).content.decode('utf-8')
return json.loads(response)

@router.get("/projects/{project}/datasources", name="Get data sources of my project [Read Access Required]")
def get_project_datasources(project: str, access: UserAccess = Depends(project_read_access)) -> list:
Expand Down Expand Up @@ -58,6 +63,10 @@ def get_feature(feature: str, requestor: User = Depends(get_user)) -> dict:
feature_qualifiedName, requestor, AccessType.READ)
return ret

@router.delete("/entity/{entity}", name="Deletes a single entity by qualified name [Write Access Required]")
def delete_entity(entity: str, access: UserAccess = Depends(project_write_access)) -> str:
requests.delete(url=f"{registry_url}/entity/{feature}",
headers=get_api_header(access.user_name)).content.decode('utf-8')

@router.get("/features/{feature}/lineage", name="Get Feature Lineage [Read Access Required]")
def get_feature_lineage(feature: str, requestor: User = Depends(get_user)) -> dict:
Expand Down
6 changes: 6 additions & 0 deletions registry/purview-registry/api-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ Get everything defined in the project

Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships)

### `GET /dependent/{entity}`
Gets downstream/dependent entities for given entity

### `GET /projects/{project}/datasources`
Get all sources defined in the project.

Expand Down Expand Up @@ -320,6 +323,9 @@ Response Type: Object
| entity | [`Entity`](#entity) | |
| referredEntities| `map<string, object>` | For compatibility, not used |

### `DELETE /entity/{entity}`
Deletes entity

### `POST /projects`
Create new project

Expand Down
17 changes: 16 additions & 1 deletion registry/purview-registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ def get_projects_ids() -> dict:
def get_projects(project: str) -> dict:
return to_camel(registry.get_project(project).to_dict())

@router.get("/dependent/{entity}")
def get_dependent_entities(entity: str) -> list:
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
return list([e.to_dict() for e in downstream_entities])

@router.delete("/entity/{entity}")
def delete_entity(entity: str):
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
if len(downstream_entities) > 0:
raise HTTPException(
status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities.
Entities: {list([e.qualified_name for e in downstream_entities])}"""
)
registry.delete_entity(entity_id)

@router.get("/projects/{project}/datasources",tags=["Project"])
def get_project_datasources(project: str) -> list:
Expand Down Expand Up @@ -90,7 +106,6 @@ def get_feature(feature: str) -> dict:
status_code=404, detail=f"Feature {feature} not found")
return to_camel(e.to_dict())


@router.get("/features/{feature}/lineage",tags=["Feature"])
def get_feature_lineage(feature: str) -> dict:
lineage = registry.get_lineage(feature)
Expand Down
14 changes: 14 additions & 0 deletions registry/purview-registry/registry/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,17 @@ def create_project_anchor_feature(self, project_id: UUID, anchor_id: UUID, defin
@abstractmethod
def create_project_derived_feature(self, project_id: UUID, definition: DerivedFeatureDef) -> UUID:
pass

@abstractmethod
def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependent on given entity
"""
pass

@abstractmethod
def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
pass
31 changes: 30 additions & 1 deletion registry/purview-registry/registry/purview_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,35 @@ def get_lineage(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations:
return EntitiesAndRelations(
upstream_entities + downstream_entities,
upstream_edges + downstream_edges)

def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependent on given entity
"""
entity_id = self.get_entity_id(entity_id)
entity = self.get_entity(entity_id)
downstream_entities = []
if entity.entity_type == EntityType.Project:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type == EntityType.Source:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
if entity.entity_type == EntityType.Anchor:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature):
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
return [e for e in downstream_entities if str(e.id) != str(entity_id)]

def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
entity_id = self.get_entity_id(entity_id)
neighbors = self.get_all_neighbours(entity_id)
edge_guids = [str(x.id) for x in neighbors]
# Delete all edges associated with entity
self.purview_client.delete_entity(edge_guids)
#Delete entity
self.purview_client.delete_entity(str(entity_id))

def _get_edges(self, ids: list[UUID]) -> list[Edge]:
all_edges = set()
Expand All @@ -201,7 +230,7 @@ def _get_edges(self, ids: list[UUID]) -> list[Edge]:
and neighbour.to_id in ids:
all_edges.add(neighbour)
return list(all_edges)

def _create_edge_from_process(self, name:str, guid: str) -> Edge:
names = name.split(self.registry_delimiter)
return Edge(guid, names[1], names[2], RelationshipType.new(names[0]))
Expand Down
21 changes: 21 additions & 0 deletions registry/purview-registry/test/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,24 @@
name="df1", feature_type=ft1, transformation=t1, key=[k], input_anchor_features=[feature1], input_derived_features=[]))

print(proj_id,source_id,anchor1_id,feature1,derived)

derived_downstream_entities = registry.get_dependent_entities(derived)
assert len(derived_downstream_entities) == 0

feature1_downstream_entities = registry.get_dependent_entities(feature1)
assert len(feature1_downstream_entities) == 1

registry.delete_entity(derived)

# Try getting derived feature but KeyError exception should be thrown
derived_exists = 1
try:
df1 = registry.get_entity(derived)
except KeyError:
derived_exists = 0
assert derived_exists == 0

feature1_downstream_entities = registry.get_dependent_entities(feature1)
assert len(feature1_downstream_entities) == 0

# cleanup()
6 changes: 6 additions & 0 deletions registry/sql-registry/api-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ Response Type: `dict`
### `GET /projects/{project}`
Get everything defined in the project

### `GET /dependent/{entity}`
Gets downstream/dependent entities for given entity

Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships)

### `GET /projects/{project}/datasources`
Expand Down Expand Up @@ -320,6 +323,9 @@ Response Type: Object
| entity | [`Entity`](#entity) | |
| referredEntities| `map<string, object>` | For compatibility, not used |

### `DELETE /entity/{entity}`
Deletes entity

### `POST /projects`
Create new project

Expand Down
18 changes: 16 additions & 2 deletions registry/sql-registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ def get_projects_ids() -> dict:
def get_projects(project: str) -> dict:
return registry.get_project(project).to_dict()

@router.get("/dependent/{entity}")
def get_dependent_entities(entity: str) -> list:
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
return list([e.to_dict() for e in downstream_entities])

@router.delete("/entity/{entity}")
def delete_entity(entity: str):
entity_id = registry.get_entity_id(entity)
downstream_entities = registry.get_dependent_entities(entity_id)
if len(downstream_entities) > 0:
raise HTTPException(
status_code=412, detail=f"""Entity cannot be deleted as it has downstream/dependent entities.
Entities: {list([e.qualified_name for e in downstream_entities])}"""
)
registry.delete_entity(entity_id)

@router.get("/projects/{project}/datasources")
def get_project_datasources(project: str) -> list:
Expand Down Expand Up @@ -135,13 +151,11 @@ def get_feature(feature: str) -> dict:
status_code=404, detail=f"Feature {feature} not found")
return e.to_dict()


@router.get("/features/{feature}/lineage")
def get_feature_lineage(feature: str) -> dict:
lineage = registry.get_lineage(feature)
return lineage.to_dict()


@router.post("/projects")
def new_project(definition: dict) -> dict:
id = registry.create_project(ProjectDef(**to_snake(definition)))
Expand Down
40 changes: 40 additions & 0 deletions registry/sql-registry/registry/db_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,32 @@ def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations:
df.attributes.input_features = features
all_edges = self._get_edges(ids)
return EntitiesAndRelations([project] + children, list(edges.union(all_edges)))

def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependant on the given entity
"""
entity_id = self.get_entity_id(entity_id)
entity = self.get_entity(entity_id)
downstream_entities = []
if entity.entity_type == EntityType.Project:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type == EntityType.Source:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
if entity.entity_type == EntityType.Anchor:
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Contains)
if entity.entity_type in (EntityType.AnchorFeature, EntityType.DerivedFeature):
downstream_entities, _ = self._bfs(entity_id, RelationshipType.Produces)
return [e for e in downstream_entities if str(e.id) != str(entity_id)]

def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
entity_id = self.get_entity_id(entity_id)
with self.conn.transaction() as c:
self._delete_all_entity_edges(c, entity_id)
self._delete_entity(c, entity_id)

def search_entity(self,
keyword: str,
Expand Down Expand Up @@ -386,6 +412,20 @@ def _create_edge(self, cursor, from_id: UUID, to_id: UUID, type: RelationshipTyp
"to_id": str(to_id),
"type": type.name
})

def _delete_all_entity_edges(self, cursor, entity_id: UUID):
"""
Deletes all edges associated with an entity
"""
sql = fr'''DELETE FROM edges WHERE from_id = %s OR to_id = %s'''
cursor.execute(sql, (str(entity_id), str(entity_id)))

def _delete_entity(self, cursor, entity_id: UUID):
"""
Deletes entity from entities table
"""
sql = fr'''DELETE FROM entities WHERE entity_id = %s'''
cursor.execute(sql, str(entity_id))

def _fill_entity(self, e: Entity) -> Entity:
"""
Expand Down
14 changes: 14 additions & 0 deletions registry/sql-registry/registry/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,17 @@ def create_project_derived_feature(self, project_id: UUID, definition: DerivedFe
Create a new derived feature under the project
"""
pass

@abstractmethod
def get_dependent_entities(self, entity_id: Union[str, UUID]) -> list[Entity]:
"""
Given entity id, returns list of all entities that are downstream/dependant on the given entity
"""
pass

@abstractmethod
def delete_entity(self, entity_id: Union[str, UUID]):
"""
Deletes given entity
"""
pass
Loading