Skip to content

Commit

Permalink
Merge branch 'assets-filters' of https://github.com/open-metadata/Ope…
Browse files Browse the repository at this point in the history
…nMetadata into assets-filters
  • Loading branch information
chirag-madlani committed Dec 5, 2023
2 parents 019d789 + 5a0e6a6 commit 6196a32
Show file tree
Hide file tree
Showing 85 changed files with 1,029 additions and 163 deletions.
122 changes: 117 additions & 5 deletions ingestion/src/metadata/ingestion/api/topology_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
"""
import traceback
from functools import singledispatchmethod
from typing import Any, Generic, Iterable, List, TypeVar, Union
from typing import Any, Dict, Generic, Iterable, List, TypeVar, Union

from pydantic import BaseModel

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
Expand All @@ -38,11 +41,17 @@
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.source_hash_utils import (
SOURCE_HASH_EXCLUDE_FIELDS,
generate_source_hash,
)

logger = ingestion_logger()

C = TypeVar("C", bound=BaseModel)

CACHED_ENTITIES = "cached_entities"


class MissingExpectedEntityAckException(Exception):
"""
Expand Down Expand Up @@ -97,6 +106,10 @@ def process_nodes(self, nodes: List[TopologyNode]) -> Iterable[Entity]:
f"Unexpected value error when processing stage: [{stage}]: {err}"
)

# init the cache dict
if stage.cache_entities:
self._init_cache_dict(stage=stage, child_nodes=child_nodes)

# processing for all stages completed now cleaning the cache if applicable
for stage in node.stages:
if stage.clear_cache:
Expand All @@ -116,6 +129,54 @@ def process_nodes(self, nodes: List[TopologyNode]) -> Iterable[Entity]:
f"Could not run Post Process `{process}` from Topology Runner -- {exc}"
)

def _init_cache_dict(self, stage: NodeStage, child_nodes: List[TopologyNode]):
"""
Method to call the API to fill the entities cache
"""

if not self.context.__dict__.get(CACHED_ENTITIES):
self.context.__dict__[CACHED_ENTITIES] = {}
for child_node in child_nodes or []:
for child_stage in child_node.stages or []:
if child_stage.use_cache:
entity_fqn = self.fqn_from_context(
stage=stage,
entity_name=self.context.__dict__[stage.context],
)

if not self.context.__dict__[CACHED_ENTITIES].get(
child_stage.type_
):
self.context.__dict__[CACHED_ENTITIES][child_stage.type_] = {}

self.get_fqn_source_hash_dict(
parent_type=stage.type_,
child_type=child_stage.type_,
entity_fqn=entity_fqn,
)

def get_fqn_source_hash_dict(
self, parent_type: Entity, child_type: Entity, entity_fqn: str
) -> Dict:
"""
Get all the entities and store them as fqn:sourceHash in a dict
"""
params = {}
if parent_type in (Database, DatabaseSchema):
params = {"database": entity_fqn}
else:
params = {"service": entity_fqn}
entities_list = self.metadata.list_all_entities(
entity=child_type,
params=params,
fields=["sourceHash"],
)
for entity in entities_list:
if entity.sourceHash:
self.context.__dict__[CACHED_ENTITIES][child_type][
model_str(entity.fullyQualifiedName)
] = entity.sourceHash

def check_context_and_handle(self, post_process: str):
"""Based on the post_process step, check context and
evaluate if we can run it based on available class attributes
Expand Down Expand Up @@ -165,7 +226,7 @@ def clear_context(self, stage: NodeStage) -> None:
"""
self.context.__dict__[stage.context] = get_ctx_default(stage)

def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str:
def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
Expand All @@ -177,7 +238,7 @@ def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str:
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_request.name.__root__
*context_names, entity_name
)

def update_context(
Expand All @@ -192,6 +253,18 @@ def update_context(
if stage.context and stage.cache_all:
self._append_context(key=stage.context, value=context)

def create_patch_request(
self, original_entity: Entity, create_request: C
) -> PatchRequest:
"""
Method to get the PatchRequest object
To be overridden by the process if any custom logic is to be applied
"""
return PatchRequest(
original_entity=original_entity,
new_entity=original_entity.copy(update=create_request.__dict__),
)

@singledispatchmethod
def yield_and_update_context(
self,
Expand All @@ -207,7 +280,7 @@ def yield_and_update_context(
"""
entity = None
entity_name = model_str(right.name)
entity_fqn = self.fqn_from_context(stage=stage, entity_request=right)
entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name)

# we get entity from OM if we do not want to overwrite existing data in OM
# This will be applicable for service entities since we do not want to overwrite the data
Expand All @@ -217,7 +290,46 @@ def yield_and_update_context(
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)
if entity is None:
create_entity_request_hash = generate_source_hash(
create_request=entity_request.right,
exclude_fields=SOURCE_HASH_EXCLUDE_FIELDS,
)

if hasattr(entity_request.right, "sourceHash"):
entity_request.right.sourceHash = create_entity_request_hash

skip_processing_entity = False
if entity is None and stage.use_cache:
# check if we find the entity in the entities list
entity_source_hash = self.context.__dict__[CACHED_ENTITIES][
stage.type_
].get(entity_fqn)
if entity_source_hash:
# if the source hash is present, compare it with new hash
if entity_source_hash != create_entity_request_hash:
# the entity has changed, get the entity from server and make a patch request
entity = self.metadata.get_by_name(
entity=stage.type_,
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)

# we return the entity for a patch update
if entity:
patch_entity = self.create_patch_request(
original_entity=entity, create_request=entity_request.right
)
entity_request.right = patch_entity
else:
# nothing has changed on the source skip the API call
logger.debug(
f"No changes detected for {str(stage.type_.__name__)} '{entity_fqn}'"
)
skip_processing_entity = True

if not skip_processing_entity:
# We store the generated source hash and yield the request

yield entity_request

# We have ack the sink waiting for a response, but got nothing back
Expand Down
120 changes: 120 additions & 0 deletions ingestion/src/metadata/ingestion/models/patch_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pydantic definition for storing entities for patching
"""
from pydantic import BaseModel

from metadata.ingestion.api.models import Entity


class PatchRequest(BaseModel):
"""
Store the original and new entities for patch
"""

original_entity: Entity
new_entity: Entity


ALLOWED_COLUMN_FIELDS = {
"name": True,
"displayName": True,
"dataType": True,
"arrayDataType": True,
"dataLength": True,
"constraint": True,
"children": True,
"ordinalPosition": True,
"precision": True,
"scale": True,
"dataTypeDisplay": True,
"jsonSchema": True,
}

ALLOWED_TASK_FIELDS = {
"name": True,
"displayName": True,
"sourceUrl": True,
"downstreamTasks": True,
"taskType": True,
"taskSQL": True,
"startDate": True,
"endDate": True,
}

ALLOWED_ENTITY_REFERENCE_FIELDS = {"id": True, "type": True}

ALLOWED_CONTAINER_DATAMODEL_FIELDS = {
"isPartitioned": True,
"columns": {"__all__": ALLOWED_COLUMN_FIELDS},
}

ALLOWED_COMMON_PATCH_FIELDS = {
# Common Entity Fields
"name": True,
"displayName": True,
"sourceUrl": True,
# Table Entity Fields
"tableType": True,
"columns": {"__all__": ALLOWED_COLUMN_FIELDS},
"tableConstraints": True,
"tablePartition": True,
"location": True,
"viewDefinition": True,
"sampleData": True,
"retentionPeriod": True,
"fileFormat": True,
# Stored Procedure Fields
"storedProcedureCode": True,
"code": True,
# Dashboard Entity Fields
"chartType": True,
"project": True,
"dashboardType": True,
"charts": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
"dataModels": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
# Pipeline Entity Fields
"concurrency": True,
"pipelineLocation": True,
"startDate": True,
"scheduleInterval": True,
"tasks": {"__all__": ALLOWED_TASK_FIELDS},
# Topic Entity Fields
"messageSchema": True,
"partitions": True,
"cleanupPolicies": True,
"retentionTime": True,
"replicationFactor": True,
"maximumMessageSize": True,
"minimumInSyncReplicas": True,
"retentionSize": True,
"topicConfig": True,
# MlModel Entity Fields
"algorithm": True,
"mlFeatures": True,
"mlHyperParameters": True,
"target": True,
"dashboard": ALLOWED_ENTITY_REFERENCE_FIELDS,
"mlStore": True,
"server": True,
# SearchIndex Entity Fields
"fields": {"__all__": ALLOWED_COLUMN_FIELDS},
"searchIndexSettings": True,
# Container Entity Fields
"parent": ALLOWED_ENTITY_REFERENCE_FIELDS,
"children": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
"dataModel": ALLOWED_CONTAINER_DATAMODEL_FIELDS,
"prefix": True,
"numberOfObjects": True,
"size": True,
"fileFormats": True,
}
6 changes: 6 additions & 0 deletions ingestion/src/metadata/ingestion/models/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class Config:
consumer: Optional[
List[str]
] = None # keys in the source context to fetch state from the parent's context
cache_entities: bool = (
False # Cache all the entities which have use_cache set as True
)
use_cache: bool = (
False # enable this to get the entity from cached state in the context
)


class TopologyNode(BaseModel):
Expand Down
35 changes: 29 additions & 6 deletions ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,13 @@ class OMetaPatchMixin(OMetaPatchMixinBase):

client: REST

def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]:
def patch(
self,
entity: Type[T],
source: T,
destination: T,
allowed_fields: Optional[Dict] = None,
) -> Optional[T]:
"""
Given an Entity type and Source entity and Destination entity,
generate a JSON Patch and apply it.
Expand All @@ -131,11 +137,28 @@ def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]:
destination.changeDescription = None

# Get the difference between source and destination
patch = jsonpatch.make_patch(
json.loads(source.json(exclude_unset=True, exclude_none=True)),
json.loads(destination.json(exclude_unset=True, exclude_none=True)),
)

if allowed_fields:
patch = jsonpatch.make_patch(
json.loads(
source.json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
)
),
json.loads(
destination.json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
)
),
)
else:
patch = jsonpatch.make_patch(
json.loads(source.json(exclude_unset=True, exclude_none=True)),
json.loads(destination.json(exclude_unset=True, exclude_none=True)),
)
if not patch:
logger.debug(
"Nothing to update when running the patch. Are you passing `force=True`?"
Expand Down
Loading

0 comments on commit 6196a32

Please sign in to comment.