Skip to content

Commit

Permalink
Added cost analysis producer in DI
Browse files Browse the repository at this point in the history
  • Loading branch information
OnkarVO7 committed Nov 7, 2023
1 parent 2fdb1af commit b4afbc4
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Producer class for data insight entity reports
"""

import traceback
from typing import Iterable

from metadata.data_insight.producer.producer_interface import ProducerInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.utils.logger import data_insight_logger

logger = data_insight_logger()


class CostAnalysisProducer(ProducerInterface):
"""entity producer class"""

def _check_profiler_and_usage_support(
self, database_service: DatabaseService
) -> bool:
return (
hasattr(database_service.connection.config, "supportsUsageExtraction")
and database_service.connection.config.supportsUsageExtraction.__root__
and hasattr(database_service.connection.config, "supportsProfiler")
and database_service.connection.config.supportsProfiler.__root__
)

# pylint: disable=dangerous-default-value
def fetch_data(self, limit=100, fields=["*"]) -> Iterable:
database_services = self.metadata.list_all_entities(
DatabaseService, limit=limit, fields=fields, skip_on_failure=True
)
entities_list = []
for database_service in database_services or []:
try:
if self._check_profiler_and_usage_support(database_service):
entities_list.extend(
self.metadata.list_all_entities(
Table,
limit=limit,
fields=fields,
skip_on_failure=True,
params={
"database": database_service.fullyQualifiedName.__root__
},
)
)
except Exception as err:
logger.error(f"Error trying to fetch entities -- {err}")
logger.debug(traceback.format_exc())
return entities_list
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,3 @@ def fetch_data(self, limit=100, fields=["*"]) -> Iterable:
except Exception as err:
logger.error(f"Error trying to fetch entity -- {err}")
logger.debug(traceback.format_exc())


class EntityProducerTable(EntityProducer):
"""entity producer class for table"""

entities = [table.Table]
10 changes: 4 additions & 6 deletions ingestion/src/metadata/data_insight/producer/producer_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

from typing import Type

from metadata.data_insight.producer.entity_producer import (
EntityProducer,
EntityProducerTable,
)
from metadata.data_insight.producer.cost_analysis_producer import CostAnalysisProducer
from metadata.data_insight.producer.entity_producer import EntityProducer
from metadata.data_insight.producer.producer_interface import ProducerInterface
from metadata.data_insight.producer.web_analytics_producer import WebAnalyticsProducer
from metadata.generated.schema.analytics.reportData import ReportDataType
Expand All @@ -42,10 +40,10 @@ def create(self, producer_type: str, *args, **kwargs) -> ProducerInterface:
producer_factory = ProducerFactory()
producer_factory.register(ReportDataType.entityReportData.value, EntityProducer)
producer_factory.register(
ReportDataType.rawCostAnalysisReportData.value, EntityProducerTable
ReportDataType.rawCostAnalysisReportData.value, CostAnalysisProducer
)
producer_factory.register(
ReportDataType.aggregatedCostAnalysisReportData.value, EntityProducerTable
ReportDataType.aggregatedCostAnalysisReportData.value, CostAnalysisProducer
)
producer_factory.register(
ReportDataType.webAnalyticEntityViewReportData.value, WebAnalyticsProducer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _cache_events(
def _get_events(
self, after: Optional[str], limit=100, fields=["*"]
) -> EntityList[WebAnalyticEventData]:
"""Try to retrive events from cach otherwise retrieve them from DB"""
"""Try to retrieve events from catch otherwise retrieve them from DB"""
events = CACHED_EVENTS.get(str(after))
if not events:
events: EntityList[WebAnalyticEventData] = self.metadata.list_entities(
Expand Down

0 comments on commit b4afbc4

Please sign in to comment.