Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#9269 - Add MSSQL Stored Procedure Support #14739

Merged
merged 9 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,8 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL,
UNIQUE(id, extension)
);

-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = JSON_SET(json, '$.connection.config.supportsQueryComment', true)
where serviceType = 'Mssql';
Comment on lines +162 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pmbrull as supportsQueryComment by default is set to true, if it doesn't find it, it will be set to true, do we need a migration in these cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it the server did not start. I don't think the problem is the value but the server expecting the key there I guess

Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ CREATE TABLE IF NOT EXISTS consumers_dlq (
UNIQUE(id, extension)
);

-- Add supportsQueryComment to MSSQL
update dbservice_entity
set json = jsonb_set(json::jsonb, '{connection,config,supportsQueryComment}', 'true', true)
where serviceType = 'Mssql';
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
"""
WITH SP_HISTORY AS (
SELECT
job_id,
query AS query_text,
start_time,
end_time,
Expand All @@ -90,7 +89,6 @@
),
Q_HISTORY AS (
SELECT
job_id,
project_id as database_name,
user_email as user_name,
statement_type as query_type,
Expand All @@ -109,8 +107,6 @@
AND error_result is NULL
)
SELECT
SP.job_id as procedure_id,
Q.job_id as query_id,
Q.query_type as query_type,
SP.query_text as procedure_text,
Q.query_text as query_text,
Expand Down
101 changes: 98 additions & 3 deletions ingestion/src/metadata/ingestion/source/database/mssql/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@
# limitations under the License.
"""MSSQL source module"""
import traceback
from typing import Iterable, Optional
from typing import Dict, Iterable, List, Optional

from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import EntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.models import (
STORED_PROC_LANGUAGE_MAP,
MssqlStoredProcedure,
)
from metadata.ingestion.source.database.mssql.queries import (
MSSQL_GET_DATABASE,
MSSQL_GET_STORED_PROCEDURE_QUERIES,
MSSQL_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_foreign_keys,
Expand All @@ -36,8 +54,13 @@
get_view_names,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqa_utils import update_mssql_ischema_names
from metadata.utils.sqlalchemy_utils import (
Expand Down Expand Up @@ -65,7 +88,7 @@
MSDialect.get_view_names = get_view_names


class MssqlSource(CommonDbSourceService, MultiDBSource):
class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from MSSQL Source
Expand Down Expand Up @@ -122,3 +145,75 @@ def get_database_names(self) -> Iterable[str]:
logger.error(
f"Error trying to connect to database {new_database}: {exc}"
)

def get_stored_procedures(self) -> Iterable[MssqlStoredProcedure]:
"""List Snowflake stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(
MSSQL_GET_STORED_PROCEDURES.format(
database_name=self.context.database,
schema_name=self.context.database_schema,
)
).all()
for row in results:
try:
stored_procedure = MssqlStoredProcedure.parse_obj(dict(row))
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)

def yield_stored_procedure(
self, stored_procedure: MssqlStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""

try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)

except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)

queries_dict = self.procedure_queries_dict(
query=query,
)

return queries_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MSSQL models"""
from typing import Optional

from pydantic import BaseModel, Field

from metadata.generated.schema.entity.data.storedProcedure import Language

STORED_PROC_LANGUAGE_MAP = {
"SQL": Language.SQL,
"EXTERNAL": Language.External,
}


class MssqlStoredProcedure(BaseModel):
"""MSSQL stored procedure list query results"""

name: str = Field(...)
owner: Optional[str] = Field(None)
language: str = Field(Language.SQL)
definition: str = Field(None)
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,81 @@
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
"""

MSSQL_GET_STORED_PROCEDURES = textwrap.dedent(
"""
SELECT
ROUTINE_NAME AS name,
NULL AS owner,
ROUTINE_BODY AS language,
ROUTINE_DEFINITION AS definition
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
AND ROUTINE_CATALOG = '{database_name}'
AND ROUTINE_SCHEMA = '{schema_name}'
AND LEFT(ROUTINE_NAME, 3) NOT IN ('sp_', 'xp_', 'ms_')
"""
)

MSSQL_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY (start_time, end_time, procedure_name, query_text) AS (
select
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
OBJECT_NAME(object_id, database_id) as procedure_name,
text as query_text
from sys.dm_exec_procedure_stats s
CROSS APPLY sys.dm_exec_sql_text(s.plan_handle)
WHERE OBJECT_NAME(object_id, database_id) IS NOT NULL
AND s.last_execution_time > '{start_date}'
),
Q_HISTORY (database_name, query_text, start_time, end_time, duration,query_type, schema_name, user_name) AS (
select
db.NAME database_name,
t.text query_text,
s.last_execution_time start_time,
DATEADD(s, s.total_elapsed_time/1000, s.last_execution_time) end_time,
s.total_elapsed_time/1000 duration,
case
when t.text LIKE '%%MERGE%%' then 'MERGE'
when t.text LIKE '%%UPDATE%%' then 'UPDATE'
when t.text LIKE '%%SELECT%%INTO%%' then 'CREATE_TABLE_AS_SELECT'
when t.text LIKE '%%INSERT%%' then 'INSERT'
else 'UNKNOWN' end query_type,
NULL schema_name,
NULL user_name
FROM sys.dm_exec_cached_plans AS p
INNER JOIN sys.dm_exec_query_stats AS s
ON p.plan_handle = s.plan_handle
CROSS APPLY sys.dm_exec_sql_text(p.plan_handle) AS t
INNER JOIN sys.databases db
ON db.database_id = t.dbid
WHERE s.last_execution_time between '2024-01-13' and '2024-01-20'
AND t.text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND t.text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND p.objtype NOT IN ('Prepared', 'Proc')
AND s.last_execution_time > '{start_date}'
)
select
Q.query_type AS QUERY_TYPE,
Q.database_name AS QUERY_DATABASE_NAME,
Q.schema_name AS QUERY_SCHEMA_NAME,
Q.query_text AS QUERY_TEXT,
Q.user_name AS QUERY_USER_NAME,
Q.start_time AS QUERY_START_TIME,
Q.duration AS QUERY_DURATION,
SP.procedure_name AS PROCEDURE_NAME,
SP.query_text AS PROCEDURE_TEXT,
SP.start_time AS PROCEDURE_START_TIME,
SP.end_time AS PROCEDURE_END_TIME
from SP_HISTORY SP
JOIN Q_HISTORY Q
ON (
Q.start_time BETWEEN SP.start_time and SP.end_time
OR Q.end_time BETWEEN SP.start_time and SP.end_time
)
order by PROCEDURE_START_TIME desc
;
"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
WITH SP_HISTORY AS (SELECT
SQL_ID,
sql_text AS query_text,
TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') AS start_time,
TO_TIMESTAMP(LAST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') + NUMTODSINTERVAL(ELAPSED_TIME / 1000, 'SECOND') AS end_time,
Expand All @@ -90,7 +89,6 @@
AND TO_TIMESTAMP(FIRST_LOAD_TIME, 'YYYY-MM-DD HH24:MI:SS') >= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
),
Q_HISTORY AS (SELECT
sql_id,
sql_text AS query_text,
CASE
WHEN UPPER(SQL_TEXT) LIKE 'INSERT%' THEN 'INSERT'
Expand All @@ -111,8 +109,6 @@
>= TO_TIMESTAMP('{start_date}', 'YYYY-MM-DD HH24:MI:SS')
)
SELECT
SP.sql_id AS PROCEDURE_ID,
Q.sql_id AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@
"""
with SP_HISTORY as (
select
query as procedure_id,
querytxt as procedure_text,
starttime as procedure_start_time,
endtime as procedure_end_time,
Expand All @@ -311,7 +310,6 @@
),
Q_HISTORY as (
select
query as query_id,
querytxt as query_text,
case
when querytxt ilike '%%MERGE%%' then 'MERGE'
Expand All @@ -334,11 +332,9 @@
and userid <> 1
)
select
sp.procedure_id,
sp.procedure_text,
sp.procedure_start_time,
sp.procedure_end_time,
q.query_id,
q.query_text,
q.query_type,
q.query_database_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@
"""
WITH SP_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TEXT,
SESSION_ID,
START_TIME,
Expand All @@ -198,7 +197,6 @@
),
Q_HISTORY AS (
SELECT
QUERY_ID,
QUERY_TYPE,
QUERY_TEXT,
SESSION_ID,
Expand All @@ -215,8 +213,6 @@
AND START_TIME >= '{start_date}'
)
SELECT
SP.QUERY_ID AS PROCEDURE_ID,
Q.QUERY_ID AS QUERY_ID,
Q.QUERY_TYPE AS QUERY_TYPE,
Q.DATABASE_NAME AS QUERY_DATABASE_NAME,
Q.SCHEMA_NAME AS QUERY_SCHEMA_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class QueryByProcedure(BaseModel):
Query(ies) executed by each stored procedure
"""

procedure_id: str = Field(..., alias="PROCEDURE_ID")
query_id: str = Field(..., alias="QUERY_ID")
procedure_name: str = Field(None, alias="PROCEDURE_NAME")
query_type: str = Field(..., alias="QUERY_TYPE")
query_database_name: str = Field(None, alias="QUERY_DATABASE_NAME")
query_schema_name: str = Field(None, alias="QUERY_SCHEMA_NAME")
Expand Down Expand Up @@ -109,8 +108,11 @@ def procedure_queries_dict(self, query: str) -> Dict[str, List[QueryByProcedure]
for row in results:
try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
procedure_name = (
query_by_procedure.procedure_name
or get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
)
)
queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc:
Expand Down
Loading
Loading