From 460aa12420991f9135b50cf26806a7fb35084de2 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Tue, 7 Nov 2023 21:55:06 +0530 Subject: [PATCH 1/6] Add support for external profiler workflow --- .../source/database/azuresql/metadata.py | 22 +- .../source/database/bigquery/metadata.py | 6 + .../source/database/mssql/metadata.py | 22 +- .../source/database/postgres/metadata.py | 18 +- .../source/database/redshift/metadata.py | 16 +- .../source/database/snowflake/metadata.py | 14 +- .../source/database/vertica/metadata.py | 14 +- .../src/metadata/profiler/source/metadata.py | 7 +- .../metadata/profiler/source/metadata_ext.py | 285 ++++++++++++++++++ ingestion/src/metadata/utils/fqn.py | 34 ++- ingestion/src/metadata/workflow/metadata.py | 5 + ingestion/src/metadata/workflow/profiler.py | 9 +- ingestion/src/metadata/workflow/usage.py | 5 + .../schema/metadataIngestion/workflow.json | 2 +- 14 files changed, 421 insertions(+), 38 deletions(-) create mode 100644 ingestion/src/metadata/profiler/source/metadata_ext.py diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py index fce9d3dd59b4..871af2113f6a 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py @@ -11,7 +11,7 @@ """Azure SQL source module""" import traceback -from typing import Iterable +from typing import Iterable, Optional from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names @@ -66,6 +66,19 @@ def create(cls, config_dict, metadata: OpenMetadata): ) return cls(config, metadata) + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute( + "SELECT name FROM master.sys.databases order by name" + ) + for res in results: + row = list(res) + yield row[0] + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: @@ -73,12 +86,7 @@ def get_database_names(self) -> Iterable[str]: self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 2ab083e12a9d..638afcd5fa3b 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -421,6 +421,12 @@ def set_inspector(self, database_name: str): self.engine = inspector_details.engine self.inspector = inspector_details.inspector + def get_configured_database(self) -> Optional[str]: + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self.project_ids + def get_database_names(self) -> Iterable[str]: for project_id in self.project_ids: database_fqn = fqn.build( diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index c505dc2e4c97..16048f2b9865 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -10,7 +10,7 @@ # limitations under the License. """MSSQL source module""" import traceback -from typing import Iterable +from typing import Iterable, Optional from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names @@ -70,6 +70,19 @@ def create(cls, config_dict, metadata: OpenMetadata): ) return cls(config, metadata) + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute( + "SELECT name FROM master.sys.databases order by name" + ) + for res in results: + row = list(res) + yield row[0] + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: @@ -77,12 +90,7 @@ def get_database_names(self) -> Iterable[str]: self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 7ddde065ee25..714c47c0c76c 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -13,7 +13,7 @@ """ import traceback from collections import namedtuple -from typing import Iterable, Tuple +from typing import Iterable, Optional, Tuple from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names @@ -146,16 +146,24 @@ def query_table_names_and_types( for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute(POSTGRES_GET_DB_NAMES) + for res in results: + row = list(res) + yield row[0] + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute(POSTGRES_GET_DB_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 403378c952fe..ef2f6e071686 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -153,16 +153,24 @@ def query_table_names_and_types( for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES) + for res in results: + row = list(res) + yield row[0] + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: self.inspector = inspect(self.engine) self.get_partition_details() yield self.config.serviceConnection.__root__.config.database else: - results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index d72252d4b080..b0144b641521 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -214,6 +214,15 @@ def get_database_description(self, database_name: str) -> Optional[str]: """ return self.database_desc_map.get(database_name) + def get_configured_database(self) -> Optional[str]: + return self.service_connection.database + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute(SNOWFLAKE_GET_DATABASES) + for res in results: + row = list(res) + yield row[1] + def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database if configured_db: @@ -224,10 +233,7 @@ def get_database_names(self) -> Iterable[str]: self.set_database_description_map() yield configured_db else: - results = self.connection.execute(SNOWFLAKE_GET_DATABASES) - for res in results: - row = list(res) - new_database = row[1] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py index f87f6f3667a0..cd81b8184450 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py @@ -293,6 +293,15 @@ def set_schema_description_map(self) -> None: self.engine, VERTICA_SCHEMA_COMMENTS ) + def get_configured_database(self) -> Optional[str]: + return self.service_connection.database + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute(VERTICA_LIST_DATABASES) + for res in results: + row = list(res) + yield row[0] + def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database if configured_db: @@ -300,10 +309,7 @@ def get_database_names(self) -> Iterable[str]: self.set_schema_description_map() yield configured_db else: - results = self.connection.execute(VERTICA_LIST_DATABASES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/profiler/source/metadata.py b/ingestion/src/metadata/profiler/source/metadata.py index f44d49ed121a..a9a16f677711 100644 --- a/ingestion/src/metadata/profiler/source/metadata.py +++ b/ingestion/src/metadata/profiler/source/metadata.py @@ -64,13 +64,16 @@ class OpenMetadataSource(Source): We do this here as well. """ + def init_steps(self): + super().__init__() + + # pylint: disable=super-init-not-called def __init__( self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata, ): - - super().__init__() + self.init_steps() self.config = config self.metadata = metadata diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py new file mode 100644 index 000000000000..c46a5e0cb2f7 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -0,0 +1,285 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +OpenMetadata source for the profiler +""" +import traceback +from copy import deepcopy +from typing import Iterable, cast + +from sqlalchemy.inspection import inspect + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table, TableType +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection +from metadata.profiler.source.metadata import ( + OpenMetadataSource, + ProfilerSourceAndEntity, +) +from metadata.profiler.source.profiler_source_factory import profiler_source_factory +from metadata.utils import fqn +from metadata.utils.class_helper import get_service_type_from_source_type +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table +from metadata.utils.importer import import_source_class +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class OpenMetadataSourceExt(OpenMetadataSource): + """ + This source lists and filters the entities that need + to be processed by the profiler workflow. + + Note that in order to manage the following steps we need + to test the connection against the Database Service Source. + We do this here as well. + """ + + # pylint: disable=super-init-not-called + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata: OpenMetadata, + ): + self.init_steps() + + self.config = config + self.metadata = metadata + self.test_connection() + + # Init and type the source config + self.service_connection = self.config.source.serviceConnection.__root__.config + self.source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) # Used to satisfy type checked + source_type = self.config.source.type.lower() + service_type = get_service_type_from_source_type(self.config.source.type) + source_class = import_source_class( + service_type=service_type, source_type=source_type + ) + database_source_config = DatabaseServiceMetadataPipeline() + new_config = deepcopy(self.config.source) + new_config.sourceConfig.config = database_source_config + self.source = source_class.create(new_config.dict(), self.metadata) + self.engine = None + self.inspector = None + self._connection = None + self.set_inspector() + + logger.info( + f"Starting profiler for service {self.config.source.type}" + f":{self.config.source.type.lower()}" + ) + + def set_inspector(self, database_name: str = None) -> None: + """ + When sources override `get_database_names`, they will need + to setup multiple inspectors. They can use this function. + :param database_name: new database to set + """ + new_service_connection = deepcopy(self.service_connection) + if database_name: + logger.info(f"Ingesting from database: {database_name}") + new_service_connection.database = database_name + self.engine = get_connection(new_service_connection) + self.inspector = inspect(self.engine) + self._connection = None # Lazy init as well + + def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: + for database_name in self.get_database_names() or []: + try: + database_entity = fqn.search_database_from_es( + database_name=database_name, + metadata=self.metadata, + service_name=None, + ) + if not database_entity: + logger.debug( + f"Database Entity for database `{database_name}` not found" + ) + continue + for schema_name in self.get_schema_names() or []: + for table_name in self.get_table_names(schema_name) or []: + table_entity = fqn.search_table_from_es( + metadata=self.metadata, + database_name=database_name, + service_name=None, + schema_name=schema_name, + table_name=table_name, + fields="tableProfilerConfig", + ) + if not table_entity: + logger.debug( + f"Table Entity for table `{database_name}.{schema_name}.{table_name}` not found" + ) + continue + + profiler_source = profiler_source_factory.create( + self.config.source.type.lower(), + self.config, + database_entity, + self.metadata, + ) + yield Either( + right=ProfilerSourceAndEntity( + profiler_source=profiler_source, + entity=table_entity, + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=database_name, + error=f"Error listing source and entities for database due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + def get_table_names(self, schema_name: str) -> Iterable[str]: + for table_name in self.inspector.get_table_names(schema_name) or []: + if filter_by_table(self.source_config.tableFilterPattern, table_name): + self.status.filter(table_name, "Table pattern not allowed") + continue + yield table_name + + def get_schema_names(self) -> Iterable[str]: + if self.service_connection.__dict__.get("databaseSchema"): + yield self.service_connection.databaseSchema + else: + for schema_name in self.inspector.get_schema_names(): + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name + ): + self.status.filter(schema_name, "Schema pattern not allowed") + continue + yield schema_name + + def get_database_names(self) -> Iterable[str]: + """ + Method to fetch database names from source + """ + try: + if hasattr(self.service_connection, "supportsDatabase"): + configured_db = self.source.get_configured_database() + if configured_db: + yield configured_db + else: + database_names = self.source.get_database_names_raw() + for database in database_names: + if filter_by_database( + self.source_config.databaseFilterPattern, database + ): + self.status.filter(database, "Database pattern not allowed") + continue + self.set_inspector(database_name=database) + yield database + else: + custom_database_name = self.service_connection.__dict__.get( + "databaseName" + ) + database_name = self.service_connection.__dict__.get( + "database", custom_database_name or "default" + ) + yield database_name + except Exception as exc: + logger.debug(f"Failed to fetch database names {exc}") + logger.debug(traceback.format_exc()) + + def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]: + """ + From a list of tables, apply the SQLSourceConfig + filter patterns. + + We will update the status on the SQLSource Status. + """ + for table in tables: + try: + if filter_by_schema( + self.source_config.schemaFilterPattern, + table.databaseSchema.name, # type: ignore + ): + self.status.filter( + f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", + "Schema pattern not allowed", + ) + continue + if filter_by_table( + self.source_config.tableFilterPattern, + table.name.__root__, + ): + self.status.filter( + f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", + "Table pattern not allowed", + ) + continue + if ( + table.tableType == TableType.View + and not self.source_config.includeViews + ): + self.status.filter( + table.fullyQualifiedName.__root__, + "View filtered out", + ) + continue + yield table + except Exception as exc: + self.status.failed( + StackTraceError( + name=table.fullyQualifiedName.__root__, + error=f"Unexpected error filtering entities for table [{table}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def get_table_entities(self, database): + """ + List and filter OpenMetadata tables based on the + source configuration. + + The listing will be based on the entities from the + informed service name in the source configuration. + + Note that users can specify `table_filter_pattern` to + either be `includes` or `excludes`. This means + that we will either what is specified in `includes` + or we will use everything but the tables excluded. + + Same with `schema_filter_pattern`. + """ + tables = self.metadata.list_all_entities( + entity=Table, + fields=[ + "tableProfilerConfig", + ], + params={ + "service": self.config.source.serviceName, + "database": fqn.build( + self.metadata, + entity_type=Database, + service_name=self.config.source.serviceName, + database_name=database.name.__root__, + ), + }, # type: ignore + ) + + yield from self.filter_entities(tables) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 2caeddaef9a0..a39345ec9bd8 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -531,12 +531,12 @@ def build_es_fqn_search_string( Returns: FQN search string """ - if not service_name or not table_name: + if not table_name: raise FQNBuildingException( - f"Service Name and Table Name should be informed, but got service=`{service_name}`, table=`{table_name}`" + f"Table Name should be informed, but got table=`{table_name}`" ) fqn_search_string = _build( - service_name, database_name or "*", schema_name or "*", table_name + service_name or "*", database_name or "*", schema_name or "*", table_name ) return fqn_search_string @@ -548,6 +548,7 @@ def search_table_from_es( service_name: str, table_name: str, fetch_multiple_entities: bool = False, + fields: Optional[str] = None, ): fqn_search_string = build_es_fqn_search_string( database_name, schema_name, service_name, table_name @@ -556,6 +557,33 @@ def search_table_from_es( es_result = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=fqn_search_string, + fields=fields, + ) + + return get_entity_from_es_result( + entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities + ) + + +def search_database_from_es( + metadata: OpenMetadata, + database_name: str, + service_name: Optional[str], + fetch_multiple_entities: Optional[bool] = False, + fields: Optional[str] = None, +): + + if not database_name: + raise FQNBuildingException( + f"Database Name should be informed, but got database=`{database_name}`" + ) + + fqn_search_string = _build(service_name or "*", database_name) + + es_result = metadata.es_search_from_fqn( + entity_type=Database, + fqn_search_string=fqn_search_string, + fields=fields, ) return get_entity_from_es_result( diff --git a/ingestion/src/metadata/workflow/metadata.py b/ingestion/src/metadata/workflow/metadata.py index 97a7faa71a97..a13932e90fec 100644 --- a/ingestion/src/metadata/workflow/metadata.py +++ b/ingestion/src/metadata/workflow/metadata.py @@ -12,6 +12,7 @@ Workflow definition for metadata related ingestions: metadata and lineage. """ +from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import Sink, Source from metadata.utils.importer import ( import_from_module, @@ -40,6 +41,10 @@ def set_steps(self): def _get_source(self) -> Source: # Source that we are ingesting, e.g., mysql, looker or kafka source_type = self.config.source.type.lower() + if not self.config.source.serviceName: + raise WorkflowExecutionError( + "ServiceName is required field for executing the Metadata Workflow" + ) source_class = ( import_from_module( diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index 54dea4b86e2a..bd3b3aa739cc 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -19,6 +19,7 @@ from metadata.pii.processor import PIIProcessor from metadata.profiler.processor.processor import ProfilerProcessor from metadata.profiler.source.metadata import OpenMetadataSource +from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt from metadata.utils.importer import import_sink_class from metadata.utils.logger import profiler_logger from metadata.workflow.base import BaseWorkflow @@ -40,8 +41,14 @@ def __init__(self, config: OpenMetadataWorkflowConfig): # Validate that we can properly reach the source database self.test_connection() + def _get_source_class(self): + if self.config.source.serviceName: + return OpenMetadataSource + return OpenMetadataSourceExt + def set_steps(self): - self.source = OpenMetadataSource.create(self.config.dict(), self.metadata) + source_class = self._get_source_class() + self.source = source_class.create(self.config.dict(), self.metadata) profiler_processor = self._get_profiler_processor() pii_processor = self._get_pii_processor() diff --git a/ingestion/src/metadata/workflow/usage.py b/ingestion/src/metadata/workflow/usage.py index 7504d2b40a04..96c8eea1643e 100644 --- a/ingestion/src/metadata/workflow/usage.py +++ b/ingestion/src/metadata/workflow/usage.py @@ -12,6 +12,7 @@ Usage Workflow Definition """ +from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage from metadata.utils.importer import ( import_bulk_sink_type, @@ -44,6 +45,10 @@ def set_steps(self): def _get_source(self) -> Source: # Source that we are ingesting, e.g., mysql, looker or kafka source_type = self.config.source.type.lower() + if not self.config.source.serviceName: + raise WorkflowExecutionError( + "ServiceName is required field for executing the Metadata Workflow" + ) source_class = ( import_from_module( diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index 84a6e86bbc77..78b67926eee7 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -83,7 +83,7 @@ } }, "additionalProperties": false, - "required": ["type", "serviceName", "sourceConfig"] + "required": ["type", "sourceConfig"] }, "processor": { "description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.", From 2c5e527e00327f146d1223249432ff043daf645c Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Nov 2023 11:38:46 +0530 Subject: [PATCH 2/6] pylint --- ingestion/src/metadata/utils/fqn.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index a39345ec9bd8..a40187042f58 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -572,6 +572,9 @@ def search_database_from_es( fetch_multiple_entities: Optional[bool] = False, fields: Optional[str] = None, ): + """ + Search Database entity from ES + """ if not database_name: raise FQNBuildingException( From 65af59b0e12253ed8bd550c6314cd70a3d68929f Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Nov 2023 16:27:18 +0530 Subject: [PATCH 3/6] resolve comments --- .../source/database/azuresql/metadata.py | 11 ++---- .../source/database/azuresql/queries.py | 15 ++++++++ .../source/database/bigquery/metadata.py | 3 +- .../source/database/greenplum/metadata.py | 18 ++++++--- .../source/database/mssql/metadata.py | 11 ++---- .../source/database/multi_db_source.py | 38 +++++++++++++++++++ .../source/database/postgres/metadata.py | 8 ++-- .../source/database/redshift/metadata.py | 8 ++-- .../source/database/snowflake/metadata.py | 5 ++- .../source/database/vertica/metadata.py | 8 ++-- .../metadata/profiler/source/metadata_ext.py | 10 ++++- ingestion/src/metadata/workflow/profiler.py | 5 +++ 12 files changed, 102 insertions(+), 38 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/azuresql/queries.py create mode 100644 ingestion/src/metadata/ingestion/source/database/multi_db_source.py diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py index 871af2113f6a..58b8fb96fa8d 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py @@ -24,12 +24,14 @@ ) from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.azuresql.queries import AZURE_SQL_GET_DATABASES from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.mssql.utils import ( get_columns, get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -50,7 +52,7 @@ MSDialect.get_columns = get_columns -class AzuresqlSource(CommonDbSourceService): +class AzuresqlSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Azuresql Source @@ -72,12 +74,7 @@ def get_configured_database(self) -> Optional[str]: return None def get_database_names_raw(self) -> Iterable[str]: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - yield row[0] + yield from self._execute_database_query(AZURE_SQL_GET_DATABASES) def get_database_names(self) -> Iterable[str]: diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py b/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py new file mode 100644 index 000000000000..b98f4afec76f --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py @@ -0,0 +1,15 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during ingestion +""" + +AZURE_SQL_GET_DATABASES = "SELECT name FROM master.sys.databases order by name" diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 638afcd5fa3b..051919dac8d1 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -68,6 +68,7 @@ CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.stored_procedures_mixin import ( QueryByProcedure, StoredProcedureMixin, @@ -190,7 +191,7 @@ def _build_formatted_table_id(table): ) -class BigquerySource(StoredProcedureMixin, CommonDbSourceService): +class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Bigquery Source diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index bcc15a4b88f3..a277252c44db 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -13,7 +13,7 @@ """ import traceback from collections import namedtuple -from typing import Iterable, Tuple +from typing import Iterable, Optional, Tuple from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names @@ -51,6 +51,7 @@ get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -109,7 +110,7 @@ PGDialect.ischema_names = ischema_names -class GreenplumSource(CommonDbSourceService): +class GreenplumSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Greenplum Source @@ -144,16 +145,21 @@ def query_table_names_and_types( for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(GREENPLUM_GET_DB_NAMES) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute(GREENPLUM_GET_DB_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index 16048f2b9865..6aac7b9122f1 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -24,11 +24,13 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE from metadata.ingestion.source.database.mssql.utils import ( get_columns, get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -53,7 +55,7 @@ MSDialect.get_columns = get_columns -class MssqlSource(CommonDbSourceService): +class MssqlSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from MSSQL Source @@ -76,12 +78,7 @@ def get_configured_database(self) -> Optional[str]: return None def get_database_names_raw(self) -> Iterable[str]: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - yield row[0] + yield from self._execute_database_query(MSSQL_GET_DATABASE) def get_database_names(self) -> Iterable[str]: diff --git a/ingestion/src/metadata/ingestion/source/database/multi_db_source.py b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py new file mode 100644 index 000000000000..c978d6fc5710 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py @@ -0,0 +1,38 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Multi DB Source Abstract class +""" + +from abc import ABC, abstractmethod +from typing import Iterable, Optional + + +class MultiDBSource(ABC): + + @abstractmethod + def get_configured_database(self) -> Optional[str]: + """ + Method to return the name of default configured database if available + """ + + @abstractmethod + def get_database_names_raw(self) -> Iterable[str]: + """ + Method to return the name of all databases. + """ + + def _execute_database_query(self, query: str) -> Iterable[str]: + results = self.connection.execute(query) # pylint: disable=no-member + for res in results: + row = list(res) + yield row[0] diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 714c47c0c76c..6494c83878d8 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -40,6 +40,7 @@ CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_GET_ALL_TABLE_PG_POLICY, POSTGRES_GET_DB_NAMES, @@ -111,7 +112,7 @@ PGDialect.ischema_names = ischema_names -class PostgresSource(CommonDbSourceService): +class PostgresSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Postgres Source @@ -152,10 +153,7 @@ def get_configured_database(self) -> Optional[str]: return None def get_database_names_raw(self) -> Iterable[str]: - results = self.connection.execute(POSTGRES_GET_DB_NAMES) - for res in results: - row = list(res) - yield row[0] + yield from self._execute_database_query(POSTGRES_GET_DB_NAMES) def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index ef2f6e071686..9109721c1daf 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -50,6 +50,7 @@ CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure from metadata.ingestion.source.database.redshift.queries import ( REDSHIFT_GET_ALL_RELATION_INFO, @@ -101,7 +102,7 @@ ) -class RedshiftSource(StoredProcedureMixin, CommonDbSourceService): +class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Redshift Source @@ -159,10 +160,7 @@ def get_configured_database(self) -> Optional[str]: return None def get_database_names_raw(self) -> Iterable[str]: - results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES) - for res in results: - row = list(res) - yield row[0] + yield from self._execute_database_query(REDSHIFT_GET_DATABASE_NAMES) def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index b0144b641521..901b86c3b978 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -52,6 +52,7 @@ from metadata.ingestion.source.database.life_cycle_query_mixin import ( LifeCycleQueryMixin, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.snowflake.models import ( STORED_PROC_LANGUAGE_MAP, SnowflakeStoredProcedure, @@ -122,7 +123,9 @@ SnowflakeDialect.get_columns = get_columns -class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService): +class SnowflakeSource( + LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource +): """ Implements the necessary methods to extract Database metadata from Snowflake Source diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py index cd81b8184450..c53c3f3709c2 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py @@ -32,6 +32,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.vertica.queries import ( VERTICA_GET_COLUMNS, VERTICA_GET_PRIMARY_KEYS, @@ -262,7 +263,7 @@ def get_table_comment( VerticaDialect.get_table_comment = get_table_comment -class VerticaSource(CommonDbSourceService): +class VerticaSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Vertica Source @@ -297,10 +298,7 @@ def get_configured_database(self) -> Optional[str]: return self.service_connection.database def get_database_names_raw(self) -> Iterable[str]: - results = self.connection.execute(VERTICA_LIST_DATABASES) - for res in results: - row = list(res) - yield row[0] + yield from self._execute_database_query(VERTICA_LIST_DATABASES) def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py index c46a5e0cb2f7..ce7e211ddec1 100644 --- a/ingestion/src/metadata/profiler/source/metadata_ext.py +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -9,7 +9,15 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -OpenMetadata source for the profiler +OpenMetadataExt source for the profiler + +This source is used in cases where the service name +is not provided for the profiler workflow. +In such situations, the profiler will perform a thorough scan +of the entire data source to locate the +corresponding table entity in OpenMetadata. +Subsequently, it will proceed to ingest relevant metrics +and sample data for that identified entity. """ import traceback from copy import deepcopy diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index bd3b3aa739cc..d72ea384a374 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -44,6 +44,11 @@ def __init__(self, config: OpenMetadataWorkflowConfig): def _get_source_class(self): if self.config.source.serviceName: return OpenMetadataSource + logger.info( + "Database Service name not provided, we will scan all the tables" + "available within data source and locate table entity in OpenMetadata" + "to ingest profiler data." + ) return OpenMetadataSourceExt def set_steps(self): From 45145ad2e4b83127f651f2a5068085072a490a64 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Nov 2023 16:42:02 +0530 Subject: [PATCH 4/6] resolve comments --- ingestion/src/metadata/workflow/metadata.py | 4 +++- ingestion/src/metadata/workflow/usage.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/workflow/metadata.py b/ingestion/src/metadata/workflow/metadata.py index a13932e90fec..631b48f21da5 100644 --- a/ingestion/src/metadata/workflow/metadata.py +++ b/ingestion/src/metadata/workflow/metadata.py @@ -43,7 +43,9 @@ def _get_source(self) -> Source: source_type = self.config.source.type.lower() if not self.config.source.serviceName: raise WorkflowExecutionError( - "ServiceName is required field for executing the Metadata Workflow" + "serviceName is required field for executing the Metadata Workflow. " + "You can find more information on how to build the YAML " + "configuration here: https://docs.open-metadata.org/connectors" ) source_class = ( diff --git a/ingestion/src/metadata/workflow/usage.py b/ingestion/src/metadata/workflow/usage.py index 96c8eea1643e..bee82843b34f 100644 --- a/ingestion/src/metadata/workflow/usage.py +++ b/ingestion/src/metadata/workflow/usage.py @@ -47,7 +47,9 @@ def _get_source(self) -> Source: source_type = self.config.source.type.lower() if not self.config.source.serviceName: raise WorkflowExecutionError( - "ServiceName is required field for executing the Metadata Workflow" + "ServiceName is required field for executing the Usage Workflow. " + "You can find more information on how to build the YAML " + "configuration here: https://docs.open-metadata.org/connectors" ) source_class = ( From d197d2cd2ebb10d0900b4641b3487567460b4002 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Nov 2023 16:42:57 +0530 Subject: [PATCH 5/6] pyformat --- .../src/metadata/ingestion/source/database/multi_db_source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/database/multi_db_source.py b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py index c978d6fc5710..394bdd8b2edc 100644 --- a/ingestion/src/metadata/ingestion/source/database/multi_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py @@ -18,7 +18,6 @@ class MultiDBSource(ABC): - @abstractmethod def get_configured_database(self) -> Optional[str]: """ From 8ff180660cdb3b9648d019d3fd2c36e10956f822 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Nov 2023 19:19:50 +0530 Subject: [PATCH 6/6] fix code smell --- ingestion/src/metadata/profiler/source/metadata_ext.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py index ce7e211ddec1..67ce7cd1475d 100644 --- a/ingestion/src/metadata/profiler/source/metadata_ext.py +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -114,7 +114,7 @@ def set_inspector(self, database_name: str = None) -> None: self._connection = None # Lazy init as well def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: - for database_name in self.get_database_names() or []: + for database_name in self.get_database_names(): try: database_entity = fqn.search_database_from_es( database_name=database_name, @@ -126,8 +126,8 @@ def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: f"Database Entity for database `{database_name}` not found" ) continue - for schema_name in self.get_schema_names() or []: - for table_name in self.get_table_names(schema_name) or []: + for schema_name in self.get_schema_names(): + for table_name in self.get_table_names(schema_name): table_entity = fqn.search_table_from_es( metadata=self.metadata, database_name=database_name,