From bdfc9226d5d31db18cbb846c04fe4bea48021264 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Wed, 8 Nov 2023 12:04:48 +0530 Subject: [PATCH] Added cost analysis producer in DI (#13872) --- .../producer/cost_analysis_producer.py | 62 +++++++++++++++++++ .../data_insight/producer/entity_producer.py | 6 -- .../data_insight/producer/producer_factory.py | 10 ++- .../producer/web_analytics_producer.py | 2 +- 4 files changed, 67 insertions(+), 13 deletions(-) create mode 100644 ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py diff --git a/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py b/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py new file mode 100644 index 000000000000..af7381aa7813 --- /dev/null +++ b/ingestion/src/metadata/data_insight/producer/cost_analysis_producer.py @@ -0,0 +1,62 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Producer class for data insight entity reports +""" + +import traceback +from typing import Iterable + +from metadata.data_insight.producer.producer_interface import ProducerInterface +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.utils.logger import data_insight_logger + +logger = data_insight_logger() + + +class CostAnalysisProducer(ProducerInterface): + """entity producer class""" + + def _check_profiler_and_usage_support( + self, database_service: DatabaseService + ) -> bool: + return ( + hasattr(database_service.connection.config, "supportsUsageExtraction") + and database_service.connection.config.supportsUsageExtraction.__root__ + and hasattr(database_service.connection.config, "supportsProfiler") + and database_service.connection.config.supportsProfiler.__root__ + ) + + # pylint: disable=dangerous-default-value + def fetch_data(self, limit=100, fields=["*"]) -> Iterable: + database_services = self.metadata.list_all_entities( + DatabaseService, limit=limit, fields=fields, skip_on_failure=True + ) + entities_list = [] + for database_service in database_services or []: + try: + if self._check_profiler_and_usage_support(database_service): + entities_list.extend( + self.metadata.list_all_entities( + Table, + limit=limit, + fields=fields, + skip_on_failure=True, + params={ + "database": database_service.fullyQualifiedName.__root__ + }, + ) + ) + except Exception as err: + logger.error(f"Error trying to fetch entities -- {err}") + logger.debug(traceback.format_exc()) + return entities_list diff --git a/ingestion/src/metadata/data_insight/producer/entity_producer.py b/ingestion/src/metadata/data_insight/producer/entity_producer.py index 4bab7dd0b2ae..c7f7c6bdf258 100644 --- a/ingestion/src/metadata/data_insight/producer/entity_producer.py +++ b/ingestion/src/metadata/data_insight/producer/entity_producer.py @@ -61,9 +61,3 @@ def fetch_data(self, limit=100, fields=["*"]) -> Iterable: except Exception as err: logger.error(f"Error trying to fetch entity -- {err}") logger.debug(traceback.format_exc()) - - -class EntityProducerTable(EntityProducer): - """entity producer class for table""" - - entities = [table.Table] diff --git a/ingestion/src/metadata/data_insight/producer/producer_factory.py b/ingestion/src/metadata/data_insight/producer/producer_factory.py index fbe5e44acc89..8630dd59ffc2 100644 --- a/ingestion/src/metadata/data_insight/producer/producer_factory.py +++ b/ingestion/src/metadata/data_insight/producer/producer_factory.py @@ -14,10 +14,8 @@ from typing import Type -from metadata.data_insight.producer.entity_producer import ( - EntityProducer, - EntityProducerTable, -) +from metadata.data_insight.producer.cost_analysis_producer import CostAnalysisProducer +from metadata.data_insight.producer.entity_producer import EntityProducer from metadata.data_insight.producer.producer_interface import ProducerInterface from metadata.data_insight.producer.web_analytics_producer import WebAnalyticsProducer from metadata.generated.schema.analytics.reportData import ReportDataType @@ -42,10 +40,10 @@ def create(self, producer_type: str, *args, **kwargs) -> ProducerInterface: producer_factory = ProducerFactory() producer_factory.register(ReportDataType.entityReportData.value, EntityProducer) producer_factory.register( - ReportDataType.rawCostAnalysisReportData.value, EntityProducerTable + ReportDataType.rawCostAnalysisReportData.value, CostAnalysisProducer ) producer_factory.register( - ReportDataType.aggregatedCostAnalysisReportData.value, EntityProducerTable + ReportDataType.aggregatedCostAnalysisReportData.value, CostAnalysisProducer ) producer_factory.register( ReportDataType.webAnalyticEntityViewReportData.value, WebAnalyticsProducer diff --git a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py index c8829e1dbc61..d41f60994984 100644 --- a/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py +++ b/ingestion/src/metadata/data_insight/producer/web_analytics_producer.py @@ -56,7 +56,7 @@ def _cache_events( def _get_events( self, after: Optional[str], limit=100, fields=["*"] ) -> EntityList[WebAnalyticEventData]: - """Try to retrive events from cach otherwise retrieve them from DB""" + """Try to retrieve events from catch otherwise retrieve them from DB""" events = CACHED_EVENTS.get(str(after)) if not events: events: EntityList[WebAnalyticEventData] = self.metadata.list_entities(