From 5edad7b4181eb6e09524ee8e3f48c4e6002e60ea Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 14 Sep 2023 15:32:05 +0200 Subject: [PATCH] Part of #12998 - Add Stored Procedures support for Redshift (#13174) * Part of #12998 - Add Stored Procedures support for Redshift * Lint * Fix bq queries * Update ingestion/src/metadata/ingestion/source/database/bigquery/queries.py --- .../source/database/bigquery/metadata.py | 134 ++----------- .../source/database/bigquery/queries.py | 10 +- .../source/database/database_service.py | 24 +-- .../source/database/redshift/metadata.py | 78 +++++++- .../source/database/redshift/models.py | 24 +++ .../source/database/redshift/queries.py | 79 ++++++++ .../source/database/snowflake/metadata.py | 145 ++------------ .../source/database/snowflake/queries.py | 2 + .../database/stored_procedures_mixin.py | 182 ++++++++++++++++++ 9 files changed, 401 insertions(+), 277 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/source/database/redshift/models.py create mode 100644 ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 4a2846c097fe..2019722d9da0 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -12,11 +12,8 @@ We require Taxonomy Admin permissions to fetch all Policy Tags """ import os -import re import traceback -from collections import defaultdict -from functools import lru_cache -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from google import auth from google.cloud.datacatalog_v1 import PolicyTagManagerClient @@ -30,11 +27,9 @@ from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, ) -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database, EntityName from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( @@ -55,14 +50,10 @@ GcpCredentialsValues, SingleProjectId, ) -from metadata.generated.schema.type.basic import SourceUrl, SqlQuery, Timestamp -from metadata.generated.schema.type.entityLineage import Source as LineageSource -from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.basic import SourceUrl from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.bigquery.models import ( @@ -80,7 +71,10 @@ CommonDbSourceService, TableNameAndType, ) -from metadata.ingestion.source.database.database_service import QueryByProcedure +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureMixin, +) from metadata.utils import fqn from metadata.utils.bigquery_utils import get_bigquery_client from metadata.utils.credentials import GOOGLE_CREDENTIALS @@ -88,7 +82,6 @@ from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import is_complex_type -from metadata.utils.stored_procedures import get_procedure_name_from_call from metadata.utils.tag_utils import ( get_ometa_tag_and_classification, get_tag_label, @@ -201,7 +194,7 @@ def _build_formatted_table_id(table): ) -class BigquerySource(CommonDbSourceService): +class BigquerySource(StoredProcedureMixin, CommonDbSourceService): """ Implements the necessary methods to extract Database metadata from Bigquery Source @@ -632,66 +625,18 @@ def yield_stored_procedure( ) ) - @lru_cache - def procedure_queries_dict( - self, schema_name: str, database_name: str - ) -> Dict[str, List[QueryByProcedure]]: - """ - Cache the queries ran for the stored procedures in the last `queryLogDuration` days. - - We will run this for each different and db name. - - The dictionary key will be the case-insensitive procedure name. - """ - start, _ = get_start_and_end(self.source_config.queryLogDuration) - results = self.engine.execute( - BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format( - start_date=start, - ) - ).all() - - queries_dict = defaultdict(list) - - for row in results: - try: - query_by_procedure = QueryByProcedure.parse_obj(dict(row)) - procedure_name = get_procedure_name_from_call( - query_text=query_by_procedure.procedure_text, - schema_name=schema_name, - database_name=database_name, - ) - queries_dict[procedure_name].append(query_by_procedure) - except Exception as exc: - self.status.failed( - StackTraceError( - name="Stored Procedure", - error=f"Error trying to get procedure name due to [{exc}]", - stack_trace=traceback.format_exc(), - ) - ) - - return queries_dict - - @staticmethod - def is_lineage_query(query_type: str, query_text: str) -> bool: - """Check if it's worth it to parse the query for lineage""" - - if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): - return True - - if query_type == "INSERT" and re.search( - "^.*insert.*into.*select.*$", query_text, re.IGNORECASE - ): - return True - - return False - def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """ Pick the stored procedure name from the context and return the list of associated queries """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + region=self.service_connection.usageLocation, + ) queries_dict = self.procedure_queries_dict( + query=query, schema_name=self.context.database_schema.name.__root__, database_name=self.context.database.name.__root__, ) @@ -700,56 +645,3 @@ def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or [] ): yield query_by_procedure - - def yield_procedure_lineage( - self, query_by_procedure: QueryByProcedure - ) -> Iterable[Either[AddLineageRequest]]: - """Add procedure lineage from its query""" - - self.update_context(key="stored_procedure_query_lineage", value=False) - if self.is_lineage_query( - query_type=query_by_procedure.query_type, - query_text=query_by_procedure.query_text, - ): - self.update_context(key="stored_procedure_query_lineage", value=True) - - for either_lineage in get_lineage_by_query( - self.metadata, - query=query_by_procedure.query_text, - service_name=self.context.database_service.name.__root__, - database_name=self.context.database.name.__root__, - schema_name=self.context.database_schema.name.__root__, - dialect=ConnectionTypeDialectMapper.dialect_of( - self.context.database_service.serviceType.value - ), - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - lineage_source=LineageSource.QueryLineage, - ): - if either_lineage.right.edge.lineageDetails: - either_lineage.right.edge.lineageDetails.pipeline = EntityReference( - id=self.context.stored_procedure.id, - type="storedProcedure", - ) - - yield either_lineage - - def yield_procedure_query( - self, query_by_procedure: QueryByProcedure - ) -> Iterable[Either[CreateQueryRequest]]: - """Check the queries triggered by the procedure and add their lineage, if any""" - - yield Either( - right=CreateQueryRequest( - query=SqlQuery(__root__=query_by_procedure.query_text), - query_type=query_by_procedure.query_type, - duration=query_by_procedure.query_duration, - queryDate=Timestamp( - __root__=int(query_by_procedure.query_start_time.timestamp()) * 1000 - ), - triggeredBy=EntityReference( - id=self.context.stored_procedure.id, - type="storedProcedure", - ), - processedLineage=bool(self.context.stored_procedure_query_lineage), - ) - ) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py index a355e4cf2b98..995515d2fae2 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/queries.py @@ -31,7 +31,7 @@ AND job_type = "QUERY" AND state = "DONE" AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE") - AND query NOT LIKE '/*%%{"app": "OpenMetadata", %%}%%*/%%' + AND query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%' LIMIT {result_limit} """ @@ -63,7 +63,7 @@ routine_name as name, routine_definition as definition, external_language as language -FROM test_omd.INFORMATION_SCHEMA.ROUTINES +FROM `{schema_name}`.INFORMATION_SCHEMA.ROUTINES WHERE routine_type in ('PROCEDURE', 'TABLE FUNCTION') AND routine_catalog = '{database_name}' AND routine_schema = '{schema_name}' @@ -79,7 +79,7 @@ start_time, end_time, user_email as user_name - FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT + FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE statement_type = 'SCRIPT' AND start_time >= '{start_date}' AND job_type = "QUERY" @@ -98,8 +98,10 @@ query as query_text, null as schema_name, total_slot_ms/1000 as duration - FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT + FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE statement_type <> 'SCRIPT' + AND query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%' AND start_time >= '{start_date}' AND job_type = "QUERY" AND state = "DONE" diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 7e60189414c4..4f9e1479c705 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -12,10 +12,9 @@ Base class for ingesting database services """ from abc import ABC, abstractmethod -from datetime import datetime from typing import Any, Iterable, List, Optional, Set, Tuple -from pydantic import BaseModel, Field +from pydantic import BaseModel from sqlalchemy.engine import Inspector from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -67,6 +66,7 @@ create_source_context, ) from metadata.ingestion.source.connections import get_test_connection_fn +from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.utils import fqn from metadata.utils.filters import filter_by_schema from metadata.utils.logger import ingestion_logger @@ -84,26 +84,6 @@ class DataModelLink(BaseModel): datamodel: DataModel -class QueryByProcedure(BaseModel): - """ - Query(ies) executed by each stored procedure - """ - - procedure_id: str = Field(..., alias="PROCEDURE_ID") - query_id: str = Field(..., alias="QUERY_ID") - query_type: str = Field(..., alias="QUERY_TYPE") - procedure_text: str = Field(..., alias="PROCEDURE_TEXT") - procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") - procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") - query_start_time: datetime = Field(..., alias="QUERY_START_TIME") - query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") - query_text: str = Field(..., alias="QUERY_TEXT") - query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") - - class Config: - allow_population_by_field_name = True - - class DatabaseServiceTopology(ServiceTopology): """ Defines the hierarchy in Database Services. diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 609303702840..38aa8fdd86e7 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -21,9 +21,17 @@ from sqlalchemy.engine.reflection import Inspector from sqlalchemy_redshift.dialect import RedshiftDialect, RedshiftDialectMixin +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.storedProcedure import ( + Language, + StoredProcedureCode, +) from metadata.generated.schema.entity.data.table import ( ConstraintType, + EntityName, IntervalType, TableConstraint, TablePartition, @@ -38,14 +46,18 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) +from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure from metadata.ingestion.source.database.redshift.queries import ( REDSHIFT_GET_ALL_RELATION_INFO, REDSHIFT_GET_DATABASE_NAMES, + REDSHIFT_GET_STORED_PROCEDURE_QUERIES, + REDSHIFT_GET_STORED_PROCEDURES, REDSHIFT_PARTITION_DETAILS, ) from metadata.ingestion.source.database.redshift.utils import ( @@ -56,8 +68,13 @@ get_columns, get_table_comment, ) +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureMixin, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database +from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import get_all_table_comments @@ -86,7 +103,7 @@ ) -class RedshiftSource(CommonDbSourceService): +class RedshiftSource(StoredProcedureMixin, CommonDbSourceService): """ Implements the necessary methods to extract Database metadata from Redshift Source @@ -218,3 +235,62 @@ def process_additional_table_constraints( columns=[column.get("name")], ) ) + + def get_stored_procedures(self) -> Iterable[RedshiftStoredProcedure]: + """List Snowflake stored procedures""" + if self.source_config.includeStoredProcedures: + results = self.engine.execute( + REDSHIFT_GET_STORED_PROCEDURES.format( + schema_name=self.context.database_schema.name.__root__, + ) + ).all() + for row in results: + stored_procedure = RedshiftStoredProcedure.parse_obj(dict(row)) + yield stored_procedure + + def yield_stored_procedure( + self, stored_procedure: RedshiftStoredProcedure + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Prepare the stored procedure payload""" + + try: + yield Either( + right=CreateStoredProcedureRequest( + name=EntityName(__root__=stored_procedure.name), + storedProcedureCode=StoredProcedureCode( + language=Language.SQL, + code=stored_procedure.definition, + ), + databaseSchema=self.context.database_schema.fullyQualifiedName, + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=stored_procedure.name, + error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: + """ + Pick the stored procedure name from the context + and return the list of associated queries + """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + database_name=self.context.database.name.__root__, + ) + + queries_dict = self.procedure_queries_dict( + query=query, + schema_name=self.context.database_schema.name.__root__, + database_name=self.context.database.name.__root__, + ) + + for query_by_procedure in ( + queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or [] + ): + yield query_by_procedure diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/models.py b/ingestion/src/metadata/ingestion/source/database/redshift/models.py new file mode 100644 index 000000000000..c9deff2adc89 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/redshift/models.py @@ -0,0 +1,24 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Redshift models +""" +from typing import Optional + +from pydantic import BaseModel + + +class RedshiftStoredProcedure(BaseModel): + """Redshift stored procedure list query results""" + + name: str + owner: Optional[str] + definition: str diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py index ba5df6fea9a6..bce9e60ce07c 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/queries.py @@ -24,8 +24,10 @@ WHERE userid > 1 {filters} -- Filter out all automated & cursor queries + AND label NOT IN ('maintenance', 'metrics', 'health') AND querytxt NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' AND querytxt NOT LIKE '/* {{"app": "dbt", %%}} */%%' + AND userid <> 1 AND aborted = 0 AND starttime >= '{start_time}' AND starttime < '{end_time}' @@ -274,3 +276,80 @@ where 1 {schema_clause} {table_clause} ORDER BY "relkind", "schema_oid", "schema"; """ + + +REDSHIFT_GET_STORED_PROCEDURES = textwrap.dedent( + """ +SELECT + p.proname as name, + b.usename as owner, + p.prosrc as definition +FROM + pg_catalog.pg_namespace n +JOIN pg_catalog.pg_proc_info p ON + pronamespace = n.oid +join pg_catalog.pg_user b on + b.usesysid = p.proowner +where nspname = '{schema_name}' + and p.proowner <> 1; + """ +) + + +REDSHIFT_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent( + """ +with SP_HISTORY as ( + select + query as procedure_id, + querytxt as procedure_text, + starttime as procedure_start_time, + endtime as procedure_end_time, + pid as procedure_session_id + from SVL_STORED_PROC_CALL + where database = '{database_name}' + and aborted = 0 + and starttime >= '{start_date}' +), +Q_HISTORY as ( + select + query as query_id, + querytxt as query_text, + case + when querytxt ilike '%%MERGE%%' then 'MERGE' + when querytxt ilike '%%UPDATE%%' then 'UPDATE' + when querytxt ilike '%%CREATE%%AS%%' then 'CREATE_TABLE_AS_SELECT' + when querytxt ilike '%%INSERT%%' then 'INSERT' + else 'UNKNOWN' end query_type, + pid as query_session_id, + starttime as query_start_time, + endtime as query_end_time, + userid as query_user_name + from STL_QUERY q + join pg_catalog.pg_user b + on b.usesysid = q.userid + where label not in ('maintenance', 'metrics', 'health') + and querytxt not like '/* {{"app": "OpenMetadata", %%}} */%%' + and querytxt not like '/* {{"app": "dbt", %%}} */%%' + and database = '{database_name}' + and starttime >= '{start_date}' + and userid <> 1 +) +select + sp.procedure_id, + sp.procedure_text, + sp.procedure_start_time, + sp.procedure_end_time, + q.query_id, + q.query_text, + q.query_type, + q.query_start_time, + q.query_end_time, + q.query_user_name +from SP_HISTORY sp + join Q_HISTORY q + on sp.procedure_session_id = q.query_session_id + and q.query_start_time between sp.procedure_start_time and sp.procedure_end_time + and q.query_end_time between sp.procedure_start_time and sp.procedure_end_time +order by procedure_start_time DESC + """ +) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index 0235b7d16aa4..1b376522c088 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -12,11 +12,8 @@ Snowflake source module """ import json -import re import traceback -from collections import defaultdict -from functools import lru_cache -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional, Tuple import sqlparse from requests.utils import quote @@ -25,11 +22,9 @@ from sqlalchemy.engine.reflection import Inspector from sqlparse.sql import Function, Identifier -from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode from metadata.generated.schema.entity.data.table import ( @@ -46,19 +41,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.generated.schema.type.basic import ( - EntityName, - SourceUrl, - SqlQuery, - Timestamp, -) -from metadata.generated.schema.type.entityLineage import Source as LineageSource -from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.basic import EntityName, SourceUrl from metadata.generated.schema.type.lifeCycle import Created, Deleted from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper -from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type @@ -66,7 +52,6 @@ CommonDbSourceService, TableNameAndType, ) -from metadata.ingestion.source.database.database_service import QueryByProcedure from metadata.ingestion.source.database.snowflake.constants import ( SNOWFLAKE_REGION_ID_MAP, ) @@ -101,13 +86,16 @@ get_view_names, normalize_names, ) +from metadata.ingestion.source.database.stored_procedures_mixin import ( + QueryByProcedure, + StoredProcedureMixin, +) from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.helpers import get_start_and_end from metadata.utils.life_cycle_utils import init_empty_life_cycle_properties from metadata.utils.logger import ingestion_logger from metadata.utils.sqlalchemy_utils import get_all_table_comments -from metadata.utils.stored_procedures import get_procedure_name_from_call from metadata.utils.tag_utils import get_ometa_tag_and_classification from metadata.utils.time_utils import convert_timestamp_to_milliseconds @@ -138,7 +126,7 @@ SnowflakeDialect.get_columns = get_columns -class SnowflakeSource(CommonDbSourceService): +class SnowflakeSource(StoredProcedureMixin, CommonDbSourceService): """ Implements the necessary methods to extract Database metadata from Snowflake Source @@ -571,55 +559,21 @@ def yield_stored_procedure( ) ) - @lru_cache - def procedure_queries_dict( - self, schema_name: str, database_name: str - ) -> Dict[str, List[QueryByProcedure]]: - """ - Cache the queries ran for the stored procedures in the last `queryLogDuration` days. - - We will run this for each different and db name. - - The dictionary key will be the case-insensitive procedure name. - """ - start, _ = get_start_and_end(self.source_config.queryLogDuration) - results = self.engine.execute( - SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format( - start_date=start, - warehouse=self.service_connection.warehouse, - schema_name=schema_name, - database_name=database_name, - ) - ).all() - - queries_dict = defaultdict(list) - - for row in results: - try: - query_by_procedure = QueryByProcedure.parse_obj(dict(row)) - procedure_name = get_procedure_name_from_call( - query_text=query_by_procedure.procedure_text, - schema_name=schema_name, - database_name=database_name, - ) - queries_dict[procedure_name].append(query_by_procedure) - except Exception as exc: - self.status.failed( - StackTraceError( - name="Stored Procedure", - error=f"Error trying to get procedure name due to [{exc}]", - stack_trace=traceback.format_exc(), - ) - ) - - return queries_dict - def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: """ Pick the stored procedure name from the context and return the list of associated queries """ + start, _ = get_start_and_end(self.source_config.queryLogDuration) + query = SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format( + start_date=start, + warehouse=self.service_connection.warehouse, + schema_name=self.context.database_schema.name.__root__, + database_name=self.context.database.name.__root__, + ) + queries_dict = self.procedure_queries_dict( + query=query, schema_name=self.context.database_schema.name.__root__, database_name=self.context.database.name.__root__, ) @@ -628,70 +582,3 @@ def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]: queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or [] ): yield query_by_procedure - - @staticmethod - def is_lineage_query(query_type: str, query_text: str) -> bool: - """Check if it's worth it to parse the query for lineage""" - - if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): - return True - - if query_type == "INSERT" and re.search( - "^.*insert.*into.*select.*$", query_text, re.IGNORECASE - ): - return True - - return False - - def yield_procedure_lineage( - self, query_by_procedure: QueryByProcedure - ) -> Iterable[Either[AddLineageRequest]]: - """Add procedure lineage from its query""" - - self.update_context(key="stored_procedure_query_lineage", value=False) - if self.is_lineage_query( - query_type=query_by_procedure.query_type, - query_text=query_by_procedure.query_text, - ): - self.update_context(key="stored_procedure_query_lineage", value=True) - - for either_lineage in get_lineage_by_query( - self.metadata, - query=query_by_procedure.query_text, - service_name=self.context.database_service.name.__root__, - database_name=self.context.database.name.__root__, - schema_name=self.context.database_schema.name.__root__, - dialect=ConnectionTypeDialectMapper.dialect_of( - self.context.database_service.serviceType.value - ), - timeout_seconds=self.source_config.queryParsingTimeoutLimit, - lineage_source=LineageSource.QueryLineage, - ): - if either_lineage.right.edge.lineageDetails: - either_lineage.right.edge.lineageDetails.pipeline = EntityReference( - id=self.context.stored_procedure.id, - type="storedProcedure", - ) - - yield either_lineage - - def yield_procedure_query( - self, query_by_procedure: QueryByProcedure - ) -> Iterable[Either[CreateQueryRequest]]: - """Check the queries triggered by the procedure and add their lineage, if any""" - - yield Either( - right=CreateQueryRequest( - query=SqlQuery(__root__=query_by_procedure.query_text), - query_type=query_by_procedure.query_type, - duration=query_by_procedure.query_duration, - queryDate=Timestamp( - __root__=int(query_by_procedure.query_start_time.timestamp()) * 1000 - ), - triggeredBy=EntityReference( - id=self.context.stored_procedure.id, - type="storedProcedure", - ), - processedLineage=bool(self.context.stored_procedure_query_lineage), - ) - ) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py index 39cc082c3a68..42c636e9528e 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/queries.py @@ -200,6 +200,8 @@ USER_NAME FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY SP WHERE QUERY_TYPE <> 'CALL' + AND QUERY_TEXT NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND QUERY_TEXT NOT LIKE '/* {{"app": "dbt", %%}} */%%' AND START_TIME >= '{start_date}' AND WAREHOUSE_NAME = '{warehouse}' AND SCHEMA_NAME = '{schema_name}' diff --git a/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py new file mode 100644 index 000000000000..2acbe327fd51 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/stored_procedures_mixin.py @@ -0,0 +1,182 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Mixin class with common Stored Procedures logic aimed at lineage. +""" +import re +import traceback +from collections import defaultdict +from datetime import datetime +from functools import lru_cache +from typing import Dict, Iterable, List, Optional + +from pydantic import BaseModel, Field +from sqlalchemy.engine import Engine + +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.type.basic import SqlQuery, Timestamp +from metadata.generated.schema.type.entityLineage import Source as LineageSource +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.status import Status +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query +from metadata.ingestion.models.topology import TopologyContext +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.stored_procedures import get_procedure_name_from_call + + +class QueryByProcedure(BaseModel): + """ + Query(ies) executed by each stored procedure + """ + + procedure_id: str = Field(..., alias="PROCEDURE_ID") + query_id: str = Field(..., alias="QUERY_ID") + query_type: str = Field(..., alias="QUERY_TYPE") + procedure_text: str = Field(..., alias="PROCEDURE_TEXT") + procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME") + procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME") + query_start_time: datetime = Field(..., alias="QUERY_START_TIME") + query_duration: Optional[float] = Field(None, alias="QUERY_DURATION") + query_text: str = Field(..., alias="QUERY_TEXT") + query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME") + + class Config: + allow_population_by_field_name = True + + +class StoredProcedureMixin: + """ + The full flow is: + 1. List Stored Procedures + 2. Yield Stored Procedures + 3. Get the queries related to the Stored Procedures in the last X days + 4. Ingest the Lineage + 5. Ingest the Query + + This Mixin is in charge from 3 - 5 in order to handle this process efficiently. + + It should be inherited in those Sources that implement Stored Procedure ingestion. + """ + + context: TopologyContext + status: Status + source_config: DatabaseServiceMetadataPipeline + engine: Engine + metadata: OpenMetadata + + @lru_cache( + maxsize=1 + ) # Limit the caching since it cannot be repeated due to the topology ordering + def procedure_queries_dict( + self, query: str, schema_name: str, database_name: str + ) -> Dict[str, List[QueryByProcedure]]: + """ + Cache the queries ran for the stored procedures in the last `queryLogDuration` days. + + We will run this for each different and db name. + + The dictionary key will be the case-insensitive procedure name. + """ + results = self.engine.execute(query).all() + queries_dict = defaultdict(list) + + for row in results: + try: + query_by_procedure = QueryByProcedure.parse_obj(dict(row)) + procedure_name = get_procedure_name_from_call( + query_text=query_by_procedure.procedure_text, + schema_name=schema_name, + database_name=database_name, + ) + queries_dict[procedure_name].append(query_by_procedure) + except Exception as exc: + self.status.failed( + StackTraceError( + name="Stored Procedure", + error=f"Error trying to get procedure name due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + return queries_dict + + @staticmethod + def is_lineage_query(query_type: str, query_text: str) -> bool: + """Check if it's worth it to parse the query for lineage""" + + if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"): + return True + + if query_type == "INSERT" and re.search( + "^.*insert.*into.*select.*$", query_text, re.IGNORECASE + ): + return True + + return False + + def yield_procedure_lineage( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[AddLineageRequest]]: + """Add procedure lineage from its query""" + + self.update_context(key="stored_procedure_query_lineage", value=False) + if self.is_lineage_query( + query_type=query_by_procedure.query_type, + query_text=query_by_procedure.query_text, + ): + self.update_context(key="stored_procedure_query_lineage", value=True) + + for either_lineage in get_lineage_by_query( + self.metadata, + query=query_by_procedure.query_text, + service_name=self.context.database_service.name.__root__, + database_name=self.context.database.name.__root__, + schema_name=self.context.database_schema.name.__root__, + dialect=ConnectionTypeDialectMapper.dialect_of( + self.context.database_service.serviceType.value + ), + timeout_seconds=self.source_config.queryParsingTimeoutLimit, + lineage_source=LineageSource.QueryLineage, + ): + if either_lineage.right.edge.lineageDetails: + either_lineage.right.edge.lineageDetails.pipeline = EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ) + + yield either_lineage + + def yield_procedure_query( + self, query_by_procedure: QueryByProcedure + ) -> Iterable[Either[CreateQueryRequest]]: + """Check the queries triggered by the procedure and add their lineage, if any""" + + yield Either( + right=CreateQueryRequest( + query=SqlQuery(__root__=query_by_procedure.query_text), + query_type=query_by_procedure.query_type, + duration=query_by_procedure.query_duration, + queryDate=Timestamp( + __root__=int(query_by_procedure.query_start_time.timestamp()) * 1000 + ), + triggeredBy=EntityReference( + id=self.context.stored_procedure.id, + type="storedProcedure", + ), + processedLineage=bool(self.context.stored_procedure_query_lineage), + ) + )