diff --git a/ingestion/src/metadata/profiler/adaptors/adaptor_factory.py b/ingestion/src/metadata/profiler/adaptors/adaptor_factory.py new file mode 100644 index 000000000000..fb06b9969bb2 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/adaptor_factory.py @@ -0,0 +1,39 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +factory for NoSQL adaptors that are used in the NoSQLProfiler. +""" + +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) +from metadata.profiler.adaptors.mongodb import MongoDB +from metadata.profiler.factory import Factory +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class NoSQLAdaptorFactory(Factory): + def create(self, interface_type: str, *args, **kwargs) -> any: + logger.debug(f"Creating NoSQL client for {interface_type}") + client_class = self._interface_type.get(interface_type) + if not client_class: + raise ValueError(f"Unknown NoSQL source: {interface_type}") + logger.debug(f"Using NoSQL client constructor: {client_class.__name__}") + return client_class(*args, **kwargs) + + +adaptors = profilers = { + MongoDBConnection.__name__: MongoDB, +} +factory = NoSQLAdaptorFactory() +factory.register_many(adaptors) diff --git a/ingestion/src/metadata/profiler/adaptors/factory.py b/ingestion/src/metadata/profiler/adaptors/factory.py deleted file mode 100644 index 116eab77dd7a..000000000000 --- a/ingestion/src/metadata/profiler/adaptors/factory.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2024 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -factory for NoSQL adaptors that are used in the NoSQLProfiler. -""" -from typing import Callable, Dict, Type - -from pymongo import MongoClient - -from metadata.profiler.adaptors.mongodb import MongoDB -from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor - -NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor] - - -class NoSQLAdaptorFactory: - """ - A factory class for creating NoSQL client instances. - - This class maintains a registry of callable constructors for different NoSQL client types. - The client types are registered with their corresponding constructors, - and can be created using the `construct` method. - - Attributes: - _clients (Dict[str, NoSQLClientConstructor]): A dictionary mapping client type names to their constructors. - - Methods: - register(source_class: Type, target_class: NoSQLClientConstructor): Register a client type with its constructor. - construct(source_client: any) -> NoSQLClient: Create a client instance of the type of the given source client. - """ - - def __init__(self): - """ - Initialize a new instance of NoSQLClientFactory. - """ - self._clients: Dict[str, NoSQLAdaptorConstructor] = {} - - def register(self, source_class: Type, target_class: NoSQLAdaptorConstructor): - """ - Register a client type with its constructor. - - Args: - source_class (Type): The class of the source client. - target_class (NoSQLClientConstructor): The constructor for the target client. - - Returns: - None - """ - self._clients[source_class.__name__] = target_class - - def construct(self, source_client: any) -> NoSQLAdaptor: - """ - Create a client instance of the type of the given source client. - - Args: - source_client (any): The source client instance. - - Returns: - NoSQLAdaptor: The created client instance. - - Raises: - ValueError: If the type of the source client is not registered. - """ - client_class = self._clients.get(type(source_client).__name__) - if not client_class: - raise ValueError(f"Unknown NoSQL source: {source_client.__name__}") - return client_class(source_client) - - -factory = NoSQLAdaptorFactory() -factory.register(MongoClient, MongoDB) diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py index edbb637b263f..5be817ec6e8f 100644 --- a/ingestion/src/metadata/profiler/adaptors/mongodb.py +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -13,13 +13,16 @@ """ import json from dataclasses import dataclass, field -from typing import Dict, List, Optional - -from pymongo import MongoClient +from typing import TYPE_CHECKING, Dict, List, Optional from metadata.generated.schema.entity.data.table import Column, Table from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +if TYPE_CHECKING: + from pymongo import MongoClient +else: + MongoClient = None # pylint: disable=invalid-name + @dataclass class Query: diff --git a/ingestion/src/metadata/profiler/factory.py b/ingestion/src/metadata/profiler/factory.py new file mode 100644 index 000000000000..fa89590401b7 --- /dev/null +++ b/ingestion/src/metadata/profiler/factory.py @@ -0,0 +1,41 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Factory class for creating profiler interface objects +""" +from abc import ABC, abstractmethod + + +class Factory(ABC): + """Creational factory for interface objects""" + + def __init__(self): + self._interface_type = {} + + def register(self, interface_type: str, interface_class): + """Register a new interface""" + self._interface_type[interface_type] = interface_class + + def register_many(self, interface_dict): + """ + Registers multiple profiler interfaces at once. + + Args: + interface_dict: A dictionary mapping connection class names (strings) to their + corresponding profiler interface classes. + """ + for interface_type, interface_class in interface_dict.items(): + self.register(interface_type, interface_class) + + @abstractmethod + def create(self, interface_type: str, *args, **kwargs) -> any: + pass diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py index 0ec581d22661..2a8766387e2b 100644 --- a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -21,7 +21,7 @@ from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.tests.customMetric import CustomMetric -from metadata.profiler.adaptors.factory import factory +from metadata.profiler.adaptors.adaptor_factory import factory from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor from metadata.profiler.api.models import ThreadPoolMetrics from metadata.profiler.interface.profiler_interface import ProfilerInterface @@ -149,7 +149,9 @@ def _get_sampler(self) -> NoSQLSampler: return sampler_factory_.create( self.service_connection_config.__class__.__name__, table=self.table, - client=factory.construct(self.connection), + client=factory.create( + self.service_connection_config.__class__.__name__, self.connection + ), profile_sample_config=self.profile_sample_config, partition_details=self.partition_details, profile_sample_query=self.profile_query, @@ -171,7 +173,9 @@ def get_all_metrics( ): """get all profiler metrics""" profile_results = {"table": {}, "columns": {}} - runner = factory.construct(self.connection) + runner = factory.create( + self.service_connection_config.__class__.__name__, self.connection + ) metric_list = [ self.compute_metrics(runner, metric_func) for metric_func in metric_funcs ] diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py index 89c4c5a5ae06..3a03a921a9d2 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py @@ -46,6 +46,7 @@ UnityCatalogConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.profiler.factory import Factory from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, @@ -80,27 +81,7 @@ ) -class ProfilerInterfaceFactory: - """Creational factory for profiler interface objects""" - - def __init__(self): - self._interface_type = {} - - def register(self, interface_type: str, interface_class): - """Register a new interface""" - self._interface_type[interface_type] = interface_class - - def register_many(self, interface_dict): - """ - Registers multiple profiler interfaces at once. - - Args: - interface_dict: A dictionary mapping connection class names (strings) to their - corresponding profiler interface classes. - """ - for interface_type, interface_class in interface_dict.items(): - self.register(interface_type, interface_class) - +class ProfilerInterfaceFactory(Factory): def create(self, interface_type: str, *args, **kwargs): """Create interface object based on interface type""" interface_class = self._interface_type.get(interface_type) @@ -124,5 +105,4 @@ def create(self, interface_type: str, *args, **kwargs): Db2Connection.__name__: DB2ProfilerInterface, MongoDBConnection.__name__: NoSQLProfilerInterface, } - profiler_interface_factory.register_many(profilers) diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index 775d225cffe6..580a2c539e45 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -73,13 +73,14 @@ def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { + "loggerLevel": "DEBUG", "openMetadataServerConfig": { "hostPort": "http://localhost:8585/api", "authProvider": "openmetadata", "securityConfig": { "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" }, - } + }, }, }