Skip to content

Commit

Permalink
MINOR - Fix & Organize topology context (#14838)
Browse files Browse the repository at this point in the history
* MINOR - Fix & Organize topology context

* Handle missing context charts
  • Loading branch information
pmbrull authored Jan 25, 2024
1 parent ac4dc7f commit 85e2058
Show file tree
Hide file tree
Showing 27 changed files with 341 additions and 158 deletions.
108 changes: 14 additions & 94 deletions ingestion/src/metadata/ingestion/api/topology_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

from pydantic import BaseModel

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
Expand All @@ -35,13 +32,11 @@
ServiceTopology,
TopologyContext,
TopologyNode,
get_ctx_default,
get_topology_node,
get_topology_root,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.source_hash import generate_source_hash

Expand Down Expand Up @@ -112,7 +107,7 @@ def process_nodes(self, nodes: List[TopologyNode]) -> Iterable[Entity]:
# Once we are done processing all the stages,
for stage in node.stages:
if stage.clear_context:
self.clear_context(stage=stage)
self.context.clear_stage(stage=stage)

# process all children from the node being run
yield from self.process_nodes(child_nodes)
Expand Down Expand Up @@ -182,7 +177,7 @@ def _init_cache_dict(
for child_node in child_nodes or []:
for child_stage in child_node.stages or []:
if child_stage.use_cache:
entity_fqn = self.fqn_from_context(
entity_fqn = self.context.fqn_from_stage(
stage=stage,
entity_name=self.context.__dict__[stage.context],
)
Expand Down Expand Up @@ -229,85 +224,6 @@ def _iter(self) -> Iterable[Either]:
"""
yield from self.process_nodes(get_topology_root(self.topology))

def _replace_context(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.context.__dict__[key] = value

def _append_context(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.context.__dict__[key].append(value)

def clear_context(self, stage: NodeStage) -> None:
"""
Clear the available context
:param stage: Update stage context to the default values
"""
self.context.__dict__[stage.context] = get_ctx_default(stage)

def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
:param entity_name: name being stored
:return: Entity FQN derived from context
"""
context_names = [
self.context.__dict__[dependency]
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_name
)

def update_context(self, stage: NodeStage, right: C):
"""
Append or update context
We'll store the entity name or FQN in the topology context.
If we store the name, the FQN will be built in the source itself when needed.
"""

if stage.store_fqn:
new_context = self._build_new_context_fqn(right)
else:
new_context = model_str(right.name)

if stage.context and not stage.store_all_in_context:
self._replace_context(key=stage.context, value=new_context)
if stage.context and stage.store_all_in_context:
self._append_context(key=stage.context, value=new_context)

@singledispatchmethod
def _build_new_context_fqn(self, right: C) -> str:
"""Build context fqn string"""
raise NotImplementedError(f"Missing implementation for [{type(C)}]")

@_build_new_context_fqn.register
def _(self, right: CreateStoredProcedureRequest) -> str:
"""
Implement FQN context building for Stored Procedures.
We process the Stored Procedures lineage at the very end of the service. If we
just store the SP name, we lose the information of which db/schema the SP belongs to.
"""

return fqn.build(
metadata=self.metadata,
entity_type=StoredProcedure,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
procedure_name=right.name.__root__,
)

def create_patch_request(
self, original_entity: Entity, create_request: C
) -> PatchRequest:
Expand Down Expand Up @@ -335,7 +251,7 @@ def yield_and_update_context(
"""
entity = None
entity_name = model_str(right.name)
entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name)
entity_fqn = self.context.fqn_from_stage(stage=stage, entity_name=entity_name)

# If we don't want to write data in OM, we'll return what we fetch from the API.
# This will be applicable for service entities since we do not want to overwrite the data
Expand Down Expand Up @@ -405,7 +321,7 @@ def yield_and_update_context(
"for the service connection."
)

self.update_context(stage=stage, right=right)
self.context.update_context_name(stage=stage, right=right)

@yield_and_update_context.register
def _(
Expand All @@ -421,7 +337,7 @@ def _(
lineage has been properly drawn. We'll skip the process for now.
"""
yield entity_request
self.update_context(stage=stage, right=right.edge.fromEntity.name.__root__)
self.context.update_context_name(stage=stage, right=right.edge.fromEntity)

@yield_and_update_context.register
def _(
Expand All @@ -430,11 +346,16 @@ def _(
stage: NodeStage,
entity_request: Either[C],
) -> Iterable[Either[Entity]]:
"""Tag implementation for the context information"""
"""
Tag implementation for the context information.
We need the full OMetaTagAndClassification in the context
to build the TagLabels during the ingestion. We need to bundle
both CreateClassificationRequest and CreateTagRequest.
"""
yield entity_request

# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, right=right)
self.context.update_context_value(stage=stage, value=right)

@yield_and_update_context.register
def _(
Expand All @@ -446,8 +367,7 @@ def _(
"""Custom Property implementation for the context information"""
yield entity_request

# We'll keep the tag fqn in the context and use if required
self.update_context(stage=stage, right=right)
self.context.update_context_value(stage=stage, value=right)

def sink_request(
self, stage: NodeStage, entity_request: Either[C]
Expand Down
144 changes: 115 additions & 29 deletions ingestion/src/metadata/ingestion/models/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
"""
Defines the topology for ingesting sources
"""

from functools import singledispatchmethod
from typing import Any, Generic, List, Optional, Type, TypeVar

from pydantic import BaseModel, Extra, Field, create_model

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn

T = TypeVar("T", bound=BaseModel)
C = TypeVar("C", bound=BaseModel)


class NodeStage(BaseModel, Generic[T]):
Expand Down Expand Up @@ -130,6 +138,112 @@ def __repr__(self):
ctx = {key: value.name.__root__ for key, value in self.__dict__.items()}
return f"TopologyContext({ctx})"

@classmethod
def create(cls, topology: ServiceTopology) -> "TopologyContext":
"""
Dynamically build a context based on the topology nodes.
Builds a Pydantic BaseModel class.
:param topology: ServiceTopology
:return: TopologyContext
"""
nodes = get_topology_nodes(topology)
ctx_fields = {
stage.context: (Optional[stage.type_], None)
for node in nodes
for stage in node.stages
if stage.context
}
return create_model(
"GeneratedContext", **ctx_fields, __base__=TopologyContext
)()

def upsert(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
self.__dict__[key] = value

def append(self, key: str, value: Any) -> None:
"""
Update the key of the context with the given value
:param key: element to update from the source context
:param value: value to use for the update
"""
if self.__dict__.get(key):
self.__dict__[key].append(value)
else:
self.__dict__[key] = [value]

def clear_stage(self, stage: NodeStage) -> None:
"""
Clear the available context
:param stage: Update stage context to the default values
"""
self.__dict__[stage.context] = None

def fqn_from_stage(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
:param entity_name: name being stored
:return: Entity FQN derived from context
"""
context_names = [
self.__dict__[dependency]
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_name
)

def update_context_name(self, stage: NodeStage, right: C) -> None:
"""
Append or update context
We'll store the entity name or FQN in the topology context.
If we store the name, the FQN will be built in the source itself when needed.
"""

if stage.store_fqn:
new_context = self._build_new_context_fqn(right)
else:
new_context = model_str(right.name)

self.update_context_value(stage=stage, value=new_context)

def update_context_value(self, stage: NodeStage, value: Any) -> None:
if stage.context and not stage.store_all_in_context:
self.upsert(key=stage.context, value=value)
if stage.context and stage.store_all_in_context:
self.append(key=stage.context, value=value)

@singledispatchmethod
def _build_new_context_fqn(self, right: C) -> str:
"""Build context fqn string"""
raise NotImplementedError(f"Missing implementation for [{type(C)}]")

@_build_new_context_fqn.register
def _(self, right: CreateStoredProcedureRequest) -> str:
"""
Implement FQN context building for Stored Procedures.
We process the Stored Procedures lineage at the very end of the service. If we
just store the SP name, we lose the information of which db/schema the SP belongs to.
"""

return fqn.build(
metadata=None,
entity_type=StoredProcedure,
service_name=self.__dict__["database_service"],
database_name=self.__dict__["database"],
schema_name=self.__dict__["database_schema"],
procedure_name=right.name.__root__,
)


def get_topology_nodes(topology: ServiceTopology) -> List[TopologyNode]:
"""
Expand Down Expand Up @@ -163,34 +277,6 @@ def get_topology_root(topology: ServiceTopology) -> List[TopologyNode]:
return [node for node in nodes if node_has_no_consumers(node)]


def get_ctx_default(stage: NodeStage) -> Optional[List[Any]]:
"""
If we cache all, default value is an empty list
:param stage: Node Stage
:return: None or []
"""
return [] if stage.store_all_in_context else None


def create_source_context(topology: ServiceTopology) -> TopologyContext:
"""
Dynamically build a context based on the topology nodes.
Builds a Pydantic BaseModel class.
:param topology: ServiceTopology
:return: TopologyContext
"""
nodes = get_topology_nodes(topology)
ctx_fields = {
stage.context: (Optional[stage.type_], get_ctx_default(stage))
for node in nodes
for stage in node.stages
if stage.context
}
return create_model("GeneratedContext", **ctx_fields, __base__=TopologyContext)()


def get_topology_node(name: str, topology: ServiceTopology) -> TopologyNode:
"""
Fetch a topology node by name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyContext,
TopologyNode,
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
Expand Down Expand Up @@ -194,7 +194,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
service_connection: DashboardConnection.__fields__["config"].type_

topology = DashboardServiceTopology()
context = create_source_context(topology)
context = TopologyContext.create(topology)
dashboard_source_state: Set = set()
datamodel_source_state: Set = set()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def yield_dashboard(
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def yield_dashboard(
service_name=self.context.dashboard_service,
chart_name=chart,
)
for chart in self.context.charts
for chart in self.context.charts or []
],
service=self.context.dashboard_service,
owner=self.get_owner_ref(dashboard_details=dashboard_details),
Expand Down
Loading

0 comments on commit 85e2058

Please sign in to comment.