Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for external profiler workflow #13887

Merged
merged 6 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"""Azure SQL source module"""

import traceback
from typing import Iterable
from typing import Iterable, Optional

from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names

Expand All @@ -24,12 +24,14 @@
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.azuresql.queries import AZURE_SQL_GET_DATABASES
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
Expand All @@ -50,7 +52,7 @@
MSDialect.get_columns = get_columns


class AzuresqlSource(CommonDbSourceService):
class AzuresqlSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Azuresql Source
Expand All @@ -66,19 +68,22 @@ def create(cls, config_dict, metadata: OpenMetadata):
)
return cls(config, metadata)

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
ulixius9 marked this conversation as resolved.
Show resolved Hide resolved
yield from self._execute_database_query(AZURE_SQL_GET_DATABASES)

def get_database_names(self) -> Iterable[str]:

if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(
"SELECT name FROM master.sys.databases order by name"
)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SQL Queries used during ingestion
"""

AZURE_SQL_GET_DATABASES = "SELECT name FROM master.sys.databases order by name"
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
Expand Down Expand Up @@ -190,7 +191,7 @@ def _build_formatted_table_id(table):
)


class BigquerySource(StoredProcedureMixin, CommonDbSourceService):
class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
Expand Down Expand Up @@ -421,6 +422,12 @@ def set_inspector(self, database_name: str):
self.engine = inspector_details.engine
self.inspector = inspector_details.inspector

def get_configured_database(self) -> Optional[str]:
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self.project_ids

def get_database_names(self) -> Iterable[str]:
for project_id in self.project_ids:
database_fqn = fqn.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import traceback
from collections import namedtuple
from typing import Iterable, Tuple
from typing import Iterable, Optional, Tuple

from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
Expand Down Expand Up @@ -51,6 +51,7 @@
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
Expand Down Expand Up @@ -109,7 +110,7 @@
PGDialect.ischema_names = ischema_names


class GreenplumSource(CommonDbSourceService):
class GreenplumSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Greenplum Source
Expand Down Expand Up @@ -144,16 +145,21 @@ def query_table_names_and_types(
for name, relkind in result
]

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(GREENPLUM_GET_DB_NAMES)

def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(GREENPLUM_GET_DB_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# limitations under the License.
"""MSSQL source module"""
import traceback
from typing import Iterable
from typing import Iterable, Optional

from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names

Expand All @@ -24,11 +24,13 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
Expand All @@ -53,7 +55,7 @@
MSDialect.get_columns = get_columns


class MssqlSource(CommonDbSourceService):
class MssqlSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from MSSQL Source
Expand All @@ -70,19 +72,22 @@ def create(cls, config_dict, metadata: OpenMetadata):
)
return cls(config, metadata)

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(MSSQL_GET_DATABASE)

def get_database_names(self) -> Iterable[str]:

if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(
"SELECT name FROM master.sys.databases order by name"
)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Multi DB Source Abstract class
"""

from abc import ABC, abstractmethod
from typing import Iterable, Optional


class MultiDBSource(ABC):
@abstractmethod
def get_configured_database(self) -> Optional[str]:
"""
Method to return the name of default configured database if available
"""

@abstractmethod
def get_database_names_raw(self) -> Iterable[str]:
"""
Method to return the name of all databases.
"""

def _execute_database_query(self, query: str) -> Iterable[str]:
results = self.connection.execute(query) # pylint: disable=no-member
for res in results:
row = list(res)
yield row[0]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import traceback
from collections import namedtuple
from typing import Iterable, Tuple
from typing import Iterable, Optional, Tuple

from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
Expand All @@ -40,6 +40,7 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES,
Expand Down Expand Up @@ -111,7 +112,7 @@
PGDialect.ischema_names = ischema_names


class PostgresSource(CommonDbSourceService):
class PostgresSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
Expand Down Expand Up @@ -146,16 +147,21 @@ def query_table_names_and_types(
for name, relkind in result
]

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(POSTGRES_GET_DB_NAMES)

def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(POSTGRES_GET_DB_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATION_INFO,
Expand Down Expand Up @@ -101,7 +102,7 @@
)


class RedshiftSource(StoredProcedureMixin, CommonDbSourceService):
class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Redshift Source
Expand Down Expand Up @@ -153,16 +154,21 @@ def query_table_names_and_types(
for name, relkind in result
]

def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None

def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(REDSHIFT_GET_DATABASE_NAMES)

def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
self.inspector = inspect(self.engine)
self.get_partition_details()
yield self.config.serviceConnection.__root__.config.database
else:
results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,
Expand Down
Loading
Loading