Skip to content

Commit

Permalink
#9269 - Add MSSQL Stored Procedure Support (#14739)
Browse files Browse the repository at this point in the history
* Remove unnecessary field

* Remove unnecessary field

* Support query comments in MSSQL

* Remove unnecessary field

* Format

* Add external type

* Add MSSQL SP support
  • Loading branch information
pmbrull authored Jan 17, 2024
1 parent cfbb94a commit eadda0e
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 24 deletions.
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,8 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL,
UNIQUE(id, extension)
);

-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = JSON_SET(json, '$.connection.config.supportsQueryComment', true)
where serviceType = 'Mssql';
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
UNIQUE(id, extension)
);

-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = jsonb_set(json::jsonb, '{connection,config,supportsQueryComment}', 'true', true)
where serviceType = 'Mssql';
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
"""
WITH SP_HISTORY AS (
SELECT
job_id,
query AS query_text,
start_time,
end_time,
Expand All @@ -90,7 +89,6 @@
),
Q_HISTORY AS (
SELECT
job_id,
project_id as database_name,
user_email as user_name,
statement_type as query_type,
Expand All @@ -109,8 +107,6 @@
AND error_result is NULL
)
SELECT
SP.job_id as procedure_id,
Q.job_id as query_id,
Q.query_type as query_type,
SP.query_text as procedure_text,
Q.query_text as query_text,
Expand Down
101 changes: 98 additions & 3 deletions ingestion/src/metadata/ingestion/source/database/mssql/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@
# limitations under the License.
"""MSSQL source module"""
import traceback
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional

from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.models import (
STORED_PROC_LANGUAGE_MAP,
MssqlStoredProcedure,
)
from metadata.ingestion.source.database.mssql.queries import (
MSSQL_GET_DATABASE,
MSSQL_GET_STORED_PROCEDURE_QUERIES,
MSSQL_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_foreign_keys,
Expand All @@ -36,8 +54,13 @@
get_view_names,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqa_utils import update_mssql_ischema_names
from metadata.utils.sqlalchemy_utils import (
Expand Down Expand Up @@ -65,7 +88,7 @@
MSDialect.get_view_names = get_view_names


class MssqlSource(CommonDbSourceService, MultiDBSource):
class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from MSSQL Source
Expand Down Expand Up @@ -122,3 +145,75 @@ def get_database_names(self) -> Iterable[str]:
logger.error(
f"Error trying to connect to database {new_database}: {exc}"
)

def get_stored_procedures(self) -> Iterable[MssqlStoredProcedure]:
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(
MSSQL_GET_STORED_PROCEDURES.format(
database_name=self.context.database,
schema_name=self.context.database_schema,
)
).all()
for row in results:
try:
stored_procedure = MssqlStoredProcedure.parse_obj(dict(row))
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)

def yield_stored_procedure(
self, stored_procedure: MssqlStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""

try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)

except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)

queries_dict = self.procedure_queries_dict(
query=query,
)

return queries_dict
30 changes: 30 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/mssql/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MSSQL models"""
from typing import Optional

from pydantic import BaseModel, Field

from metadata.generated.schema.entity.data.storedProcedure import Language

STORED_PROC_LANGUAGE_MAP = {
"SQL": Language.SQL,
"EXTERNAL": Language.External,
}


class MssqlStoredProcedure(BaseModel):
"""MSSQL stored procedure list query results"""

name: str = Field(...)
owner: Optional[str] = Field(None)
language: str = Field(Language.SQL)
definition: str = Field(None)
78 changes: 78 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/mssql/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,81 @@
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
"""

MSSQL_GET_STORED_PROCEDURES = textwrap.dedent(
"""
SELECT
ROUTINE_NAME AS name,
NULL AS owner,
ROUTINE_BODY AS language,
ROUTINE_DEFINITION AS definition
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
AND ROUTINE_CATALOG = '{database_name}'
AND ROUTINE_SCHEMA = '{schema_name}'
AND LEFT(ROUTINE_NAME, 3) NOT IN ('sp_', 'xp_', 'ms_')
"""
)

MSSQL_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY (start_time, end_time, procedure_name, query_text) AS (
select
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
OBJECT_NAME(object_id, database_id) as procedure_name,
text as query_text
from sys.dm_exec_procedure_stats s
CROSS APPLY sys.dm_exec_sql_text(s.plan_handle)
WHERE OBJECT_NAME(object_id, database_id) IS NOT NULL
AND s.last_execution_time > '{start_date}'
),
Q_HISTORY (database_name, query_text, start_time, end_time, duration,query_type, schema_name, user_name) AS (
select
db.NAME database_name,
t.text query_text,
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
s.total_elapsed_time/1000 duration,
case
when t.text LIKE '%%MERGE%%' then 'MERGE'
when t.text LIKE '%%UPDATE%%' then 'UPDATE'
when t.text LIKE '%%SELECT%%INTO%%' then 'CREATE_TABLE_AS_SELECT'
when t.text LIKE '%%INSERT%%' then 'INSERT'
else 'UNKNOWN' end query_type,
NULL schema_name,
NULL user_name
FROM sys.dm_exec_cached_plans AS p
INNER JOIN sys.dm_exec_query_stats AS s
ON p.plan_handle = s.plan_handle
CROSS APPLY sys.dm_exec_sql_text(p.plan_handle) AS t
INNER JOIN sys.databases db
ON db.database_id = t.dbid
WHERE s.last_execution_time between '2024-01-13' and '2024-01-20'
AND t.text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND t.text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND p.objtype NOT IN ('Prepared', 'Proc')
AND s.last_execution_time > '{start_date}'
)
select
Q.query_type AS QUERY_TYPE,
Q.database_name AS QUERY_DATABASE_NAME,
Q.schema_name AS QUERY_SCHEMA_NAME,
Q.query_text AS QUERY_TEXT,
Q.user_name AS QUERY_USER_NAME,
Q.start_time AS QUERY_START_TIME,
Q.duration AS QUERY_DURATION,
SP.procedure_name AS PROCEDURE_NAME,
SP.query_text AS PROCEDURE_TEXT,
SP.start_time AS PROCEDURE_START_TIME,
SP.end_time AS PROCEDURE_END_TIME
from SP_HISTORY SP
JOIN Q_HISTORY Q
ON (
Q.start_time BETWEEN SP.start_time and SP.end_time
OR Q.end_time BETWEEN SP.start_time and SP.end_time
)
order by PROCEDURE_START_TIME desc
;
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY AS (SELECT
SQL_ID,
sql_text AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time,
TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time,
Expand All @@ -90,7 +89,6 @@
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
),
Q_HISTORY AS (SELECT
sql_id,
sql_text AS query_text,
CASE
WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT'
Expand All @@ -111,8 +109,6 @@
>= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
)
SELECT
SP.sql_id AS PROCEDURE_ID,
Q.sql_id AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@
"""
with SP_HISTORY as (
select
query as procedure_id,
querytxt as procedure_text,
starttime as procedure_start_time,
endtime as procedure_end_time,
Expand All @@ -311,7 +310,6 @@
),
Q_HISTORY as (
select
query as query_id,
querytxt as query_text,
case
when querytxt ilike '%%MERGE%%' then 'MERGE'
Expand All @@ -334,11 +332,9 @@
and userid <> 1
)
select
sp.procedure_id,
sp.procedure_text,
sp.procedure_start_time,
sp.procedure_end_time,
q.query_id,
q.query_text,
q.query_type,
q.query_database_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@
"""
WITH SP_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TEXT,
SESSION_ID,
START_TIME,
Expand All @@ -198,7 +197,6 @@
),
Q_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TYPE,
QUERY_TEXT,
SESSION_ID,
Expand All @@ -215,8 +213,6 @@
AND START_TIME >= '{start_date}'
)
SELECT
SP.QUERY_ID AS PROCEDURE_ID,
Q.QUERY_ID AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class QueryByProcedure(BaseModel):
Query(ies) executed by each stored procedure
"""

procedure_id: str = Field(..., alias="PROCEDURE_ID")
query_id: str = Field(..., alias="QUERY_ID")
procedure_name: str = Field(None, alias="PROCEDURE_NAME")
query_type: str = Field(..., alias="QUERY_TYPE")
query_database_name: str = Field(None, alias="QUERY_DATABASE_NAME")
query_schema_name: str = Field(None, alias="QUERY_SCHEMA_NAME")
Expand Down Expand Up @@ -109,8 +108,11 @@ def procedure_queries_dict(self, query: str) -> Dict[str, List[QueryByProcedure]
for row in results:
try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
procedure_name = (
query_by_procedure.procedure_name
or get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
)
)
queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc:
Expand Down
Loading

0 comments on commit eadda0e

Please sign in to comment.