Skip to content

Commit

Permalink
#13696: add support for dot in schema name to fetch tables (#14246)
Browse files Browse the repository at this point in the history
  • Loading branch information
NiharDoshi99 authored Dec 8, 2023
1 parent 2cfa562 commit 8d925c4
Show file tree
Hide file tree
Showing 4 changed files with 674 additions and 4 deletions.
10 changes: 10 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/mssql/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_foreign_keys,
get_pk_constraint,
get_table_comment,
get_table_names,
get_unique_constraints,
get_view_definition,
get_view_names,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
Expand All @@ -53,6 +58,11 @@
MSDialect.get_all_view_definitions = get_all_view_definitions
MSDialect.get_all_table_comments = get_all_table_comments
MSDialect.get_columns = get_columns
MSDialect.get_pk_constraint = get_pk_constraint
MSDialect.get_unique_constraints = get_unique_constraints
MSDialect.get_foreign_keys = get_foreign_keys
MSDialect.get_table_names = get_table_names
MSDialect.get_view_names = get_view_names


class MssqlSource(CommonDbSourceService, MultiDBSource):
Expand Down
100 changes: 100 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/mssql/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,103 @@
ON db.database_id = t.dbid
"""
)

MSSQL_GET_FOREIGN_KEY = """\
WITH fk_info AS (
SELECT
ischema_ref_con.constraint_schema,
ischema_ref_con.constraint_name,
ischema_key_col.ordinal_position,
ischema_key_col.[table_schema],
ischema_key_col.table_name,
ischema_ref_con.unique_constraint_schema,
ischema_ref_con.unique_constraint_name,
ischema_ref_con.match_option,
ischema_ref_con.update_rule,
ischema_ref_con.delete_rule,
ischema_key_col.column_name AS constrained_column
FROM
INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
INNER JOIN
INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
ischema_key_col.[table_schema] = ischema_ref_con.constraint_schema
AND ischema_key_col.constraint_name =
ischema_ref_con.constraint_name
WHERE ischema_key_col.table_name = :tablename
AND ischema_key_col.[table_schema] = :owner
),
constraint_info AS (
SELECT
ischema_key_col.constraint_schema,
ischema_key_col.constraint_name,
ischema_key_col.ordinal_position,
ischema_key_col.[table_schema],
ischema_key_col.table_name,
ischema_key_col.column_name
FROM
INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
),
index_info AS (
SELECT
sys.schemas.name AS index_schema,
sys.indexes.name AS index_name,
sys.index_columns.key_ordinal AS ordinal_position,
sys.schemas.name AS [table_schema],
sys.objects.name AS table_name,
sys.columns.name AS column_name
FROM
sys.indexes
INNER JOIN
sys.objects ON
sys.objects.object_id = sys.indexes.object_id
INNER JOIN
sys.schemas ON
sys.schemas.schema_id = sys.objects.schema_id
INNER JOIN
sys.index_columns ON
sys.index_columns.object_id = sys.objects.object_id
AND sys.index_columns.index_id = sys.indexes.index_id
INNER JOIN
sys.columns ON
sys.columns.object_id = sys.indexes.object_id
AND sys.columns.column_id = sys.index_columns.column_id
)
SELECT
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
fk_info.constrained_column,
constraint_info.[table_schema] AS referred_table_schema,
constraint_info.table_name AS referred_table_name,
constraint_info.column_name AS referred_column,
fk_info.match_option,
fk_info.update_rule,
fk_info.delete_rule
FROM
fk_info INNER JOIN constraint_info ON
constraint_info.constraint_schema =
fk_info.unique_constraint_schema
AND constraint_info.constraint_name =
fk_info.unique_constraint_name
AND constraint_info.ordinal_position = fk_info.ordinal_position
UNION
SELECT
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
fk_info.constrained_column,
index_info.[table_schema] AS referred_table_schema,
index_info.table_name AS referred_table_name,
index_info.column_name AS referred_column,
fk_info.match_option,
fk_info.update_rule,
fk_info.delete_rule
FROM
fk_info INNER JOIN index_info ON
index_info.index_schema = fk_info.unique_constraint_schema
AND index_info.index_name = fk_info.unique_constraint_name
AND index_info.ordinal_position = fk_info.ordinal_position
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
"""
207 changes: 203 additions & 4 deletions ingestion/src/metadata/ingestion/source/database/mssql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
MSSQL SQLAlchemy Helper Methods
"""

from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql
from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql, text
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.dialects.mssql import information_schema as ischema
Expand All @@ -25,7 +25,9 @@
MSString,
MSText,
MSVarBinary,
_db_plus_owner,
_owner_plus_db,
_switch_db,
update_wrapper,
)
from sqlalchemy.engine import reflection
from sqlalchemy.sql import func
Expand All @@ -34,6 +36,7 @@

from metadata.ingestion.source.database.mssql.queries import (
MSSQL_ALL_VIEW_DEFINITIONS,
MSSQL_GET_FOREIGN_KEY,
MSSQL_GET_TABLE_COMMENTS,
)
from metadata.utils.logger import ingestion_logger
Expand All @@ -59,8 +62,39 @@ def get_table_comment(
)


def db_plus_owner_listing(fn):
def wrap(dialect, connection, schema=None, **kw):
schema = f"[{schema}]" if schema and "." in schema else schema
dbname, owner = _owner_plus_db(dialect, schema)
return _switch_db(
dbname, connection, fn, dialect, connection, dbname, owner, schema, **kw
)

return update_wrapper(wrap, fn)


def db_plus_owner(fn):
def wrap(dialect, connection, tablename, schema=None, **kw):
schema = f"[{schema}]" if schema and "." in schema else schema
dbname, owner = _owner_plus_db(dialect, schema)
return _switch_db(
dbname,
connection,
fn,
dialect,
connection,
tablename,
dbname,
owner,
schema,
**kw,
)

return update_wrapper(wrap, fn)


@reflection.cache
@_db_plus_owner
@db_plus_owner
def get_columns(
self, connection, tablename, dbname, owner, schema, **kw
): # pylint: disable=unused-argument, too-many-locals, disable=too-many-branches, too-many-statements
Expand Down Expand Up @@ -271,7 +305,7 @@ def get_columns(


@reflection.cache
@_db_plus_owner
@db_plus_owner
def get_view_definition(
self, connection, viewname, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
Expand All @@ -282,3 +316,168 @@ def get_view_definition(
schema=owner,
query=MSSQL_ALL_VIEW_DEFINITIONS,
)


@reflection.cache
@db_plus_owner
def get_pk_constraint(
self, connection, tablename, dbname, owner=None, schema=None, **kw
): # pylint: disable=unused-argument
"""
This function overrides to get pk constraint
"""
pkeys = []
tc = ischema.constraints
c = ischema.key_constraints.alias("C")

# Primary key constraints
s = (
sql.select(c.c.column_name, tc.c.constraint_type, c.c.constraint_name)
.where(
sql.and_(
tc.c.constraint_name == c.c.constraint_name,
tc.c.table_schema == c.c.table_schema,
c.c.table_name == tablename,
c.c.table_schema == owner,
),
)
.order_by(tc.c.constraint_name, c.c.ordinal_position)
)
cursor = connection.execution_options(future_result=True).execute(s)
constraint_name = None
for row in cursor.mappings():
if "PRIMARY" in row[tc.c.constraint_type.name]:
pkeys.append(row["COLUMN_NAME"])
if constraint_name is None:
constraint_name = row[c.c.constraint_name.name]
return {"constrained_columns": pkeys, "name": constraint_name}


@reflection.cache
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
raise NotImplementedError()


@reflection.cache
@db_plus_owner
def get_foreign_keys(
self, connection, tablename, dbname, owner=None, schema=None, **kw
): # pylint: disable=unused-argument, too-many-locals
"""
This function overrides to get foreign key constraint
"""
s = (
text(MSSQL_GET_FOREIGN_KEY)
.bindparams(
sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
sql.bindparam("owner", owner, ischema.CoerceUnicode()),
)
.columns(
constraint_schema=sqltypes.Unicode(),
constraint_name=sqltypes.Unicode(),
table_schema=sqltypes.Unicode(),
table_name=sqltypes.Unicode(),
constrained_column=sqltypes.Unicode(),
referred_table_schema=sqltypes.Unicode(),
referred_table_name=sqltypes.Unicode(),
referred_column=sqltypes.Unicode(),
)
)

# group rows by constraint ID, to handle multi-column FKs
fkeys = []

def fkey_rec():
return {
"name": None,
"constrained_columns": [],
"referred_schema": None,
"referred_table": None,
"referred_columns": [],
"options": {},
}

fkeys = util.defaultdict(fkey_rec)

for r in connection.execute(s).fetchall():
(
_, # constraint schema
rfknm,
_, # ordinal position
scol,
rschema,
rtbl,
rcol,
# TODO: we support match=<keyword> for foreign keys so
# we can support this also, PG has match=FULL for example
# but this seems to not be a valid value for SQL Server
_, # match rule
fkuprule,
fkdelrule,
) = r

rec = fkeys[rfknm]
rec["name"] = rfknm

if fkuprule != "NO ACTION":
rec["options"]["onupdate"] = fkuprule

if fkdelrule != "NO ACTION":
rec["options"]["ondelete"] = fkdelrule

if not rec["referred_table"]:
rec["referred_table"] = rtbl
if schema is not None or owner != rschema:
if dbname:
rschema = dbname + "." + rschema
rec["referred_schema"] = rschema

local_cols, remote_cols = (
rec["constrained_columns"],
rec["referred_columns"],
)

local_cols.append(scol)
remote_cols.append(rcol)

return list(fkeys.values())


@reflection.cache
@db_plus_owner_listing
def get_table_names(
self, connection, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
tables = ischema.tables
s = (
sql.select(tables.c.table_name)
.where(
sql.and_(
tables.c.table_schema == owner,
tables.c.table_type == "BASE TABLE",
)
)
.order_by(tables.c.table_name)
)
table_names = [r[0] for r in connection.execute(s)]
return table_names


@reflection.cache
@db_plus_owner_listing
def get_view_names(
self, connection, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
tables = ischema.tables
s = (
sql.select(tables.c.table_name)
.where(
sql.and_(
tables.c.table_schema == owner,
tables.c.table_type == "VIEW",
)
)
.order_by(tables.c.table_name)
)
view_names = [r[0] for r in connection.execute(s)]
return view_names
Loading

0 comments on commit 8d925c4

Please sign in to comment.