From 8d925c46a5a7f1117c76e39bcbff10ce453ae3d3 Mon Sep 17 00:00:00 2001 From: NiharDoshi99 <51595473+NiharDoshi99@users.noreply.github.com> Date: Fri, 8 Dec 2023 12:04:28 +0530 Subject: [PATCH] #13696: add support for dot in schema name to fetch tables (#14246) --- .../source/database/mssql/metadata.py | 10 + .../source/database/mssql/queries.py | 100 +++++ .../ingestion/source/database/mssql/utils.py | 207 +++++++++- .../unit/topology/database/test_mssql.py | 361 ++++++++++++++++++ 4 files changed, 674 insertions(+), 4 deletions(-) create mode 100644 ingestion/tests/unit/topology/database/test_mssql.py diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index 6f0c4127b52d..e9eb4cce662a 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -27,8 +27,13 @@ from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE from metadata.ingestion.source.database.mssql.utils import ( get_columns, + get_foreign_keys, + get_pk_constraint, get_table_comment, + get_table_names, + get_unique_constraints, get_view_definition, + get_view_names, ) from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn @@ -53,6 +58,11 @@ MSDialect.get_all_view_definitions = get_all_view_definitions MSDialect.get_all_table_comments = get_all_table_comments MSDialect.get_columns = get_columns +MSDialect.get_pk_constraint = get_pk_constraint +MSDialect.get_unique_constraints = get_unique_constraints +MSDialect.get_foreign_keys = get_foreign_keys +MSDialect.get_table_names = get_table_names +MSDialect.get_view_names = get_view_names class MssqlSource(CommonDbSourceService, MultiDBSource): diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/queries.py b/ingestion/src/metadata/ingestion/source/database/mssql/queries.py index 794a463eb62e..213c7f44f864 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/queries.py @@ -86,3 +86,103 @@ ON db.database_id = t.dbid """ ) + +MSSQL_GET_FOREIGN_KEY = """\ +WITH fk_info AS ( + SELECT + ischema_ref_con.constraint_schema, + ischema_ref_con.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.[table_schema], + ischema_key_col.table_name, + ischema_ref_con.unique_constraint_schema, + ischema_ref_con.unique_constraint_name, + ischema_ref_con.match_option, + ischema_ref_con.update_rule, + ischema_ref_con.delete_rule, + ischema_key_col.column_name AS constrained_column + FROM + INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con + INNER JOIN + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON + ischema_key_col.[table_schema] = ischema_ref_con.constraint_schema + AND ischema_key_col.constraint_name = + ischema_ref_con.constraint_name + WHERE ischema_key_col.table_name = :tablename + AND ischema_key_col.[table_schema] = :owner +), +constraint_info AS ( + SELECT + ischema_key_col.constraint_schema, + ischema_key_col.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.[table_schema], + ischema_key_col.table_name, + ischema_key_col.column_name + FROM + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col +), +index_info AS ( + SELECT + sys.schemas.name AS index_schema, + sys.indexes.name AS index_name, + sys.index_columns.key_ordinal AS ordinal_position, + sys.schemas.name AS [table_schema], + sys.objects.name AS table_name, + sys.columns.name AS column_name + FROM + sys.indexes + INNER JOIN + sys.objects ON + sys.objects.object_id = sys.indexes.object_id + INNER JOIN + sys.schemas ON + sys.schemas.schema_id = sys.objects.schema_id + INNER JOIN + sys.index_columns ON + sys.index_columns.object_id = sys.objects.object_id + AND sys.index_columns.index_id = sys.indexes.index_id + INNER JOIN + sys.columns ON + sys.columns.object_id = sys.indexes.object_id + AND sys.columns.column_id = sys.index_columns.column_id +) + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + constraint_info.[table_schema] AS referred_table_schema, + constraint_info.table_name AS referred_table_name, + constraint_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN constraint_info ON + constraint_info.constraint_schema = + fk_info.unique_constraint_schema + AND constraint_info.constraint_name = + fk_info.unique_constraint_name + AND constraint_info.ordinal_position = fk_info.ordinal_position + UNION + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + index_info.[table_schema] AS referred_table_schema, + index_info.table_name AS referred_table_name, + index_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN index_info ON + index_info.index_schema = fk_info.unique_constraint_schema + AND index_info.index_name = fk_info.unique_constraint_name + AND index_info.ordinal_position = fk_info.ordinal_position + + ORDER BY fk_info.constraint_schema, fk_info.constraint_name, + fk_info.ordinal_position +""" diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/utils.py b/ingestion/src/metadata/ingestion/source/database/mssql/utils.py index 3664126425fb..3b1caed19eca 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/utils.py @@ -12,7 +12,7 @@ MSSQL SQLAlchemy Helper Methods """ -from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql +from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql, text from sqlalchemy import types as sqltypes from sqlalchemy import util from sqlalchemy.dialects.mssql import information_schema as ischema @@ -25,7 +25,9 @@ MSString, MSText, MSVarBinary, - _db_plus_owner, + _owner_plus_db, + _switch_db, + update_wrapper, ) from sqlalchemy.engine import reflection from sqlalchemy.sql import func @@ -34,6 +36,7 @@ from metadata.ingestion.source.database.mssql.queries import ( MSSQL_ALL_VIEW_DEFINITIONS, + MSSQL_GET_FOREIGN_KEY, MSSQL_GET_TABLE_COMMENTS, ) from metadata.utils.logger import ingestion_logger @@ -59,8 +62,39 @@ def get_table_comment( ) +def db_plus_owner_listing(fn): + def wrap(dialect, connection, schema=None, **kw): + schema = f"[{schema}]" if schema and "." in schema else schema + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, connection, fn, dialect, connection, dbname, owner, schema, **kw + ) + + return update_wrapper(wrap, fn) + + +def db_plus_owner(fn): + def wrap(dialect, connection, tablename, schema=None, **kw): + schema = f"[{schema}]" if schema and "." in schema else schema + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, + connection, + fn, + dialect, + connection, + tablename, + dbname, + owner, + schema, + **kw, + ) + + return update_wrapper(wrap, fn) + + @reflection.cache -@_db_plus_owner +@db_plus_owner def get_columns( self, connection, tablename, dbname, owner, schema, **kw ): # pylint: disable=unused-argument, too-many-locals, disable=too-many-branches, too-many-statements @@ -271,7 +305,7 @@ def get_columns( @reflection.cache -@_db_plus_owner +@db_plus_owner def get_view_definition( self, connection, viewname, dbname, owner, schema, **kw ): # pylint: disable=unused-argument @@ -282,3 +316,168 @@ def get_view_definition( schema=owner, query=MSSQL_ALL_VIEW_DEFINITIONS, ) + + +@reflection.cache +@db_plus_owner +def get_pk_constraint( + self, connection, tablename, dbname, owner=None, schema=None, **kw +): # pylint: disable=unused-argument + """ + This function overrides to get pk constraint + """ + pkeys = [] + tc = ischema.constraints + c = ischema.key_constraints.alias("C") + + # Primary key constraints + s = ( + sql.select(c.c.column_name, tc.c.constraint_type, c.c.constraint_name) + .where( + sql.and_( + tc.c.constraint_name == c.c.constraint_name, + tc.c.table_schema == c.c.table_schema, + c.c.table_name == tablename, + c.c.table_schema == owner, + ), + ) + .order_by(tc.c.constraint_name, c.c.ordinal_position) + ) + cursor = connection.execution_options(future_result=True).execute(s) + constraint_name = None + for row in cursor.mappings(): + if "PRIMARY" in row[tc.c.constraint_type.name]: + pkeys.append(row["COLUMN_NAME"]) + if constraint_name is None: + constraint_name = row[c.c.constraint_name.name] + return {"constrained_columns": pkeys, "name": constraint_name} + + +@reflection.cache +def get_unique_constraints(self, connection, table_name, schema=None, **kw): + raise NotImplementedError() + + +@reflection.cache +@db_plus_owner +def get_foreign_keys( + self, connection, tablename, dbname, owner=None, schema=None, **kw +): # pylint: disable=unused-argument, too-many-locals + """ + This function overrides to get foreign key constraint + """ + s = ( + text(MSSQL_GET_FOREIGN_KEY) + .bindparams( + sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), + sql.bindparam("owner", owner, ischema.CoerceUnicode()), + ) + .columns( + constraint_schema=sqltypes.Unicode(), + constraint_name=sqltypes.Unicode(), + table_schema=sqltypes.Unicode(), + table_name=sqltypes.Unicode(), + constrained_column=sqltypes.Unicode(), + referred_table_schema=sqltypes.Unicode(), + referred_table_name=sqltypes.Unicode(), + referred_column=sqltypes.Unicode(), + ) + ) + + # group rows by constraint ID, to handle multi-column FKs + fkeys = [] + + def fkey_rec(): + return { + "name": None, + "constrained_columns": [], + "referred_schema": None, + "referred_table": None, + "referred_columns": [], + "options": {}, + } + + fkeys = util.defaultdict(fkey_rec) + + for r in connection.execute(s).fetchall(): + ( + _, # constraint schema + rfknm, + _, # ordinal position + scol, + rschema, + rtbl, + rcol, + # TODO: we support match= for foreign keys so + # we can support this also, PG has match=FULL for example + # but this seems to not be a valid value for SQL Server + _, # match rule + fkuprule, + fkdelrule, + ) = r + + rec = fkeys[rfknm] + rec["name"] = rfknm + + if fkuprule != "NO ACTION": + rec["options"]["onupdate"] = fkuprule + + if fkdelrule != "NO ACTION": + rec["options"]["ondelete"] = fkdelrule + + if not rec["referred_table"]: + rec["referred_table"] = rtbl + if schema is not None or owner != rschema: + if dbname: + rschema = dbname + "." + rschema + rec["referred_schema"] = rschema + + local_cols, remote_cols = ( + rec["constrained_columns"], + rec["referred_columns"], + ) + + local_cols.append(scol) + remote_cols.append(rcol) + + return list(fkeys.values()) + + +@reflection.cache +@db_plus_owner_listing +def get_table_names( + self, connection, dbname, owner, schema, **kw +): # pylint: disable=unused-argument + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "BASE TABLE", + ) + ) + .order_by(tables.c.table_name) + ) + table_names = [r[0] for r in connection.execute(s)] + return table_names + + +@reflection.cache +@db_plus_owner_listing +def get_view_names( + self, connection, dbname, owner, schema, **kw +): # pylint: disable=unused-argument + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "VIEW", + ) + ) + .order_by(tables.c.table_name) + ) + view_names = [r[0] for r in connection.execute(s)] + return view_names diff --git a/ingestion/tests/unit/topology/database/test_mssql.py b/ingestion/tests/unit/topology/database/test_mssql.py new file mode 100644 index 000000000000..bff80a7471c1 --- /dev/null +++ b/ingestion/tests/unit/topology/database/test_mssql.py @@ -0,0 +1,361 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test Mssql using the topology +""" + +import types +from unittest import TestCase +from unittest.mock import patch + +from sqlalchemy.types import INTEGER, VARCHAR + +import metadata.ingestion.source.database.mssql.utils as mssql_dialet +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnName, + DataType, + TableType, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.source.database.mssql.metadata import MssqlSource + +mock_mssql_config = { + "source": { + "type": "mssql", + "serviceName": "test2", + "serviceConnection": { + "config": { + "type": "Mssql", + "database": "test_database", + "username": "username", + "password": "password", + "hostPort": "localhost:1466", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + +MOCK_DATABASE_SERVICE = DatabaseService( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="mssql_source_test", + connection=DatabaseConnection(), + serviceType=DatabaseServiceType.Mssql, +) + +MOCK_DATABASE = Database( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + name="sample_database", + fullyQualifiedName="mssql_source_test.sample_database", + displayName="sample_database", + description="", + service=EntityReference( + id="85811038-099a-11ed-861d-0242ac120002", type="databaseService" + ), +) + +MOCK_DATABASE_SCHEMA = DatabaseSchema( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="sample.schema", + fullyQualifiedName="mssql_source_test.sample_database.sample.schema", + service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"), + database=EntityReference( + id="a58b1856-729c-493b-bc87-6d2269b43ec0", + type="database", + ), +) + +MOCK_COLUMN_VALUE = [ + { + "name": "sample_col_1", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "varchar(50)", + "comment": None, + }, + { + "name": "sample_col_2", + "type": INTEGER(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "int", + "comment": None, + }, + { + "name": "sample_col_3", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "system_data_type": "varchar(50)", + "comment": None, + }, + { + "name": "sample_col_4", + "type": VARCHAR(), + "nullable": True, + "default": None, + "autoincrement": False, + "comment": None, + "system_data_type": "varchar(50)", + }, +] + +EXPECTED_DATABASE = [ + CreateDatabaseRequest( + name=EntityName(__root__="sample_database"), + displayName=None, + description=None, + tags=None, + owner=None, + service=FullyQualifiedEntityName(__root__="mssql_source_test"), + dataProducts=None, + default=False, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_DATABASE_SCHEMA = [ + CreateDatabaseSchemaRequest( + name=EntityName(__root__="sample.schema"), + displayName=None, + description=None, + owner=None, + database=FullyQualifiedEntityName(__root__="mssql_source_test.sample_database"), + dataProducts=None, + tags=None, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + lifeCycle=None, + sourceHash=None, + ) +] + +EXPECTED_TABLE = [ + CreateTableRequest( + name=EntityName(__root__="sample_table"), + displayName=None, + description=None, + tableType=TableType.Regular.name, + columns=[ + Column( + name=ColumnName(__root__="sample_col_1"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_2"), + displayName=None, + dataType=DataType.INT.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="int", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_3"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + Column( + name=ColumnName(__root__="sample_col_4"), + displayName=None, + dataType=DataType.VARCHAR.name, + arrayDataType=None, + dataLength=1, + precision=None, + scale=None, + dataTypeDisplay="varchar(50)", + description=None, + fullyQualifiedName=None, + tags=None, + constraint="NULL", + ordinalPosition=None, + jsonSchema=None, + children=None, + profile=None, + customMetrics=None, + ), + ], + tableConstraints=[], + tablePartition=None, + tableProfilerConfig=None, + owner=None, + databaseSchema=FullyQualifiedEntityName( + __root__='mssql_source_test.sample_database."sample.schema"' + ), + tags=None, + viewDefinition=None, + retentionPeriod=None, + extension=None, + sourceUrl=None, + domain=None, + dataProducts=None, + fileFormat=None, + lifeCycle=None, + sourceHash=None, + ) +] + + +class MssqlUnitTest(TestCase): + """ + Implements the necessary methods to extract + Mssql Unit Test + """ + + @patch( + "metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection" + ) + def __init__( + self, + methodName, + test_connection, + ) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config) + self.mssql = MssqlSource.create( + mock_mssql_config["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + self.mssql.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + self.mssql.inspector = types.SimpleNamespace() + self.mssql.inspector.get_columns = ( + lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE + ) + self.mssql.inspector.get_pk_constraint = lambda table_name, schema_name: [] + self.mssql.inspector.get_unique_constraints = lambda table_name, schema_name: [] + self.mssql.inspector.get_foreign_keys = lambda table_name, schema_name: [] + + def test_yield_database(self): + assert EXPECTED_DATABASE == [ + either.right for either in self.mssql.yield_database(MOCK_DATABASE.name) + ] + + self.mssql.context.__dict__[ + "database_service" + ] = MOCK_DATABASE_SERVICE.name.__root__ + self.mssql.context.__dict__["database"] = MOCK_DATABASE.name.__root__ + + @mssql_dialet.db_plus_owner + def mock_function( + self, connection, tablename, dbname, owner, schema, **kw + ): # pylint: disable=unused-argument + # Mock function for testing + return schema + + def test_schema_with_dot(self): + # Test when the schema contains a dot + result = self.mock_function( # pylint: disable=no-value-for-parameter + "mock_dialect", + "mock_connection", + "your.schema", + ) + self.assertEqual(result, "[your.schema]") + + def test_yield_schema(self): + assert EXPECTED_DATABASE_SCHEMA == [ + either.right + for either in self.mssql.yield_database_schema(MOCK_DATABASE_SCHEMA.name) + ] + + self.mssql.context.__dict__[ + "database_schema" + ] = MOCK_DATABASE_SCHEMA.name.__root__ + + def test_yield_table(self): + assert EXPECTED_TABLE == [ + either.right + for either in self.mssql.yield_table(("sample_table", "Regular")) + ]