diff --git a/ingestion/src/metadata/ingestion/source/database/db2/metadata.py b/ingestion/src/metadata/ingestion/source/database/db2/metadata.py index 794cb3b85903..44baf862e60d 100644 --- a/ingestion/src/metadata/ingestion/source/database/db2/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/db2/metadata.py @@ -13,6 +13,7 @@ from typing import Iterable, Optional from ibm_db_sa.base import ischema_names +from ibm_db_sa.reflection import DB2Reflector, OS390Reflector from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.row import LegacyRow from sqlalchemy.sql.sqltypes import BOOLEAN @@ -26,6 +27,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.db2.utils import get_unique_constraints from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -34,6 +36,10 @@ ischema_names.update({"BOOLEAN": BOOLEAN}) +DB2Reflector.get_unique_constraints = get_unique_constraints +OS390Reflector.get_unique_constraints = get_unique_constraints + + class Db2Source(CommonDbSourceService): """ Implements the necessary methods to extract diff --git a/ingestion/src/metadata/ingestion/source/database/db2/utils.py b/ingestion/src/metadata/ingestion/source/database/db2/utils.py new file mode 100644 index 000000000000..657942aca471 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/db2/utils.py @@ -0,0 +1,63 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Module to define overriden dialect methods +""" +from sqlalchemy import and_, join, sql +from sqlalchemy.engine import reflection + + +@reflection.cache +def get_unique_constraints( + self, connection, table_name, schema=None, **kw +): # pylint: disable=unused-argument + """Small Method to override the Dialect default as it is not filtering properly the Schema and Table Name.""" + current_schema = self.denormalize_name(schema or self.default_schema_name) + table_name = self.denormalize_name(table_name) + syskeycol = self.sys_keycoluse + sysconst = self.sys_tabconst + query = ( + sql.select(syskeycol.c.constname, syskeycol.c.colname) + .select_from( + join( + syskeycol, + sysconst, + and_( + syskeycol.c.constname == sysconst.c.constname, + syskeycol.c.tabschema == sysconst.c.tabschema, + syskeycol.c.tabname == sysconst.c.tabname, + ), + ) + ) + .where( + and_( + sysconst.c.tabname == table_name, + sysconst.c.tabschema == current_schema, + sysconst.c.type == "U", + ) + ) + .order_by(syskeycol.c.constname) + ) + unique_consts = [] + curr_const = None + for r in connection.execute(query): + if curr_const == r[0]: + unique_consts[-1]["column_names"].append(self.normalize_name(r[1])) + else: + curr_const = r[0] + unique_consts.append( + { + "name": self.normalize_name(curr_const), + "column_names": [self.normalize_name(r[1])], + } + ) + return unique_consts