Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #13790: Cost analysis Data Insights Optimisations #15147

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import traceback
from collections import defaultdict
from copy import deepcopy
from typing import Iterable, Optional
from typing import Dict, Iterable, Optional

from metadata.data_insight.processor.reports.data_processor import DataProcessor
from metadata.generated.schema.analytics.reportData import ReportData, ReportDataType
Expand All @@ -28,9 +28,11 @@
from metadata.generated.schema.analytics.reportDataType.rawCostAnalysisReportData import (
RawCostAnalysisReportData,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.lifeCycle import LifeCycle
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.logger import data_insight_logger
from metadata.utils.time_utils import get_end_of_day_timestamp_mill

Expand Down Expand Up @@ -98,36 +100,32 @@ def yield_refined_data(self) -> Iterable[ReportData]:
data=value,
) # type: ignore

def refine(self, entity: Table) -> None:
def refine(self, entity: Dict) -> None:
"""Aggregate data
Returns:
list:
"""

try:
cost_analysis_data = RawCostAnalysisReportData(
entity=self.metadata.get_entity_reference(
entity=type(entity), fqn=entity.fullyQualifiedName
for entity_fqn, cost_analysis_report_data in entity.items():
try:
cost_analysis_data = RawCostAnalysisReportData(
entity=EntityReference(
id=cost_analysis_report_data.entity.id,
fullyQualifiedName=model_str(
cost_analysis_report_data.entity.fullyQualifiedName
),
type=ENTITY_REFERENCE_TYPE_MAP[
type(cost_analysis_report_data.entity).__name__
],
),
lifeCycle=cost_analysis_report_data.life_cycle,
sizeInByte=cost_analysis_report_data.size,
)
)
if entity.lifeCycle:
cost_analysis_data.lifeCycle = entity.lifeCycle

table_profile = self.metadata.get_latest_table_profile(
fqn=entity.fullyQualifiedName
)
if table_profile.profile:
cost_analysis_data.sizeInByte = table_profile.profile.sizeInByte

if cost_analysis_data.lifeCycle or cost_analysis_data.sizeInByte:
self._refined_data[
entity.fullyQualifiedName.__root__
] = cost_analysis_data

self.processor_status.scanned(entity.name.__root__)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying fetch cost analysis data -- {err}")
self._refined_data[entity_fqn] = cost_analysis_data
self.processor_status.scanned(entity_fqn)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying fetch cost analysis data -- {err}")

def get_status(self):
return self.processor_status
Expand All @@ -142,6 +140,7 @@ def __init__(self, metadata: OpenMetadata):
super().__init__(metadata)
self._refined_data = defaultdict(lambda: defaultdict(dict))
self.post_hook = self._post_hook_fn
self.clean_up_cache = True

def yield_refined_data(self) -> Iterable[ReportData]:
"""Yield refined data"""
Expand All @@ -152,27 +151,17 @@ def yield_refined_data(self) -> Iterable[ReportData]:
data=data,
) # type: ignore

def refine(self, entity: Table) -> None:
def refine(self, entity: Dict) -> None:
"""Aggregate data
Returns:
list:
"""
try:
life_cycle = None
if entity.lifeCycle:
life_cycle = entity.lifeCycle

size = None
table_profile = self.metadata.get_latest_table_profile(
fqn=entity.fullyQualifiedName
)
if table_profile.profile:
size = table_profile.profile.sizeInByte

if life_cycle or size:
entity_type = str(entity.__class__.__name__)
service_type = str(entity.serviceType.name)
service_name = str(entity.service.name)

for entity_fqn, cost_analysis_report_data in entity.items():
entity_type = str(cost_analysis_report_data.entity.__class__.__name__)
service_type = str(cost_analysis_report_data.entity.serviceType.name)
service_name = str(cost_analysis_report_data.entity.service.name)
if not self._refined_data[str(entity_type)][service_type].get(
service_name
):
Expand All @@ -185,18 +174,18 @@ def refine(self, entity: Table) -> None:
else:
self._refined_data[entity_type][service_type][service_name][
TOTAL_SIZE
] += (size or 0)
] += (cost_analysis_report_data.size or 0)
self._refined_data[entity_type][service_type][service_name][
TOTAL_COUNT
] += 1

self._get_data_assets_dict(
life_cycle=life_cycle,
size=size,
life_cycle=cost_analysis_report_data.life_cycle,
size=cost_analysis_report_data.size,
data=self._refined_data[entity_type][service_type][service_name],
)

self.processor_status.scanned(entity.name.__root__)
self.processor_status.scanned(entity_fqn)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Error trying fetch cost analysis data -- {err}")
Expand Down Expand Up @@ -247,7 +236,10 @@ def _get_data_assets_dict(life_cycle: LifeCycle, size: Optional[float], data: di
# Iterate over the different time periods and update the data
for days, key in DAYS:
days_before_timestamp = get_end_of_day_timestamp_mill(days=days)
if life_cycle.accessed.timestamp.__root__ <= days_before_timestamp:
if (
life_cycle.accessed
and life_cycle.accessed.timestamp.__root__ <= days_before_timestamp
):
data[UNUSED_DATA_ASSETS][COUNT][key] += 1
data[UNUSED_DATA_ASSETS][SIZE][key] += size or 0
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, metadata: OpenMetadata):
self._refined_data = {}
self.post_hook: Optional[Callable] = None
self.pre_hook: Optional[Callable] = None
self.clean_up_cache: bool = False

@classmethod
def create(cls, _data_processor_type, metadata: OpenMetadata):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,31 @@
"""

import traceback
from typing import Iterable
from typing import Dict, Iterable, Optional

from pydantic import BaseModel

from metadata.data_insight.producer.producer_interface import ProducerInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.lifeCycle import LifeCycle
from metadata.ingestion.api.models import Entity
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import data_insight_logger

logger = data_insight_logger()


class CostAnalysisReportData(BaseModel):
"""
Query executed get life cycle
"""

entity: Entity
life_cycle: Optional[LifeCycle]
size: Optional[float]


class CostAnalysisProducer(ProducerInterface):
"""entity producer class"""

Expand All @@ -36,27 +51,76 @@ def _check_profiler_and_usage_support(
and database_service.connection.config.supportsProfiler.__root__
)

def _check_life_cycle_and_size_data(
self, table: Table
) -> Optional[CostAnalysisReportData]:
"""
Method to check if the valid life cycle and table size data is present for the table
"""
cost_analysis_report_data = CostAnalysisReportData(entity=table)
if table.lifeCycle and table.lifeCycle.accessed:
cost_analysis_report_data.life_cycle = table.lifeCycle

table_profile = self.metadata.get_latest_table_profile(
fqn=table.fullyQualifiedName
)
if table_profile.profile:
cost_analysis_report_data.size = table_profile.profile.sizeInByte

if cost_analysis_report_data.life_cycle or cost_analysis_report_data.size:
return cost_analysis_report_data
return None

def life_cycle_data_dict(
self, entities_cache: Optional[Dict], database_service_fqn: str
) -> Iterable[Dict]:
"""
Cache the required lifecycle data to be used by the processors and return the dict
"""
if entities_cache.get(database_service_fqn):
yield entities_cache[database_service_fqn]
else:
tables = self.metadata.list_all_entities(
Table,
limit=100,
skip_on_failure=True,
params={"database": database_service_fqn},
)
entities_cache[database_service_fqn] = {}

for table in tables:
try:
cost_analysis_data = self._check_life_cycle_and_size_data(
table=table
)
if cost_analysis_data:
entities_cache[database_service_fqn][
model_str(table.fullyQualifiedName)
] = cost_analysis_data
except Exception as err:
logger.error(
f"Error trying to fetch cost analysis data for [{model_str(table.fullyQualifiedName)}] -- {err}"
)
logger.debug(traceback.format_exc())

yield entities_cache[database_service_fqn]

# pylint: disable=dangerous-default-value
def fetch_data(self, limit=100, fields=["*"]) -> Iterable:
def fetch_data(
self, limit=100, fields=["*"], entities_cache=None
) -> Optional[Iterable[Dict]]:
database_services = self.metadata.list_all_entities(
DatabaseService, limit=limit, fields=fields, skip_on_failure=True
)
entities_list = []
for database_service in database_services or []:
try:
if self._check_profiler_and_usage_support(database_service):
entities_list.extend(
self.metadata.list_all_entities(
Table,
limit=limit,
fields=fields,
skip_on_failure=True,
params={
"database": database_service.fullyQualifiedName.__root__
},
)
yield from self.life_cycle_data_dict(
entities_cache=entities_cache,
database_service_fqn=model_str(
database_service.fullyQualifiedName
),
)
except Exception as err:
logger.error(f"Error trying to fetch entities -- {err}")
logger.debug(traceback.format_exc())
return entities_list
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class EntityProducer(ProducerInterface):
]

# pylint: disable=dangerous-default-value
def fetch_data(self, limit=100, fields=["*"]) -> Iterable:
def fetch_data(self, limit=100, fields=["*"], entities_cache=None) -> Iterable:
for entity in self.entities:
try:
yield from self.metadata.list_all_entities(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ def __init__(self, metadata: OpenMetadata):
self.metadata = metadata

@abstractmethod
def fetch_data(self, limit, fields):
def fetch_data(self, limit, fields, entities_cache=None):
"""fetch data from source"""
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _clear_cache(self):
CACHED_EVENTS.clear()

def fetch_data(
self, limit=100, fields=["*"]
self, limit=100, fields=["*"], entities_cache=None
): # pylint: disable=dangerous-default-value
"""fetch data for web analytics event"""
events = self._get_events(None, limit, fields)
Expand Down
11 changes: 10 additions & 1 deletion ingestion/src/metadata/data_insight/source/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(self, metadata: OpenMetadata):
super().__init__()
self.metadata = metadata
self.date = datetime.utcnow().strftime("%Y-%m-%d")
self.entities_cache = {}

_processors = self._instantiate_processors()
self._processors: Dict[
Expand Down Expand Up @@ -130,11 +131,19 @@ def _iter(self, *_, **__) -> Iterable[Either[DataInsightRecord]]:
processor = cast(DataProcessor, processor)
processor.pre_hook() if processor.pre_hook else None # pylint: disable=expression-not-assigned

for data in producer.fetch_data(fields=["owner", "tags"]):
for data in (
producer.fetch_data(
fields=["owner", "tags"], entities_cache=self.entities_cache
)
or []
):
processor.refine(data)

processor.post_hook() if processor.post_hook else None # pylint: disable=expression-not-assigned

if processor.clean_up_cache:
self.entities_cache.clear()

for data in processor.yield_refined_data():
yield Either(left=None, right=DataInsightRecord(data=data))
except KeyError as key_error:
Expand Down
Loading