Skip to content

Commit

Permalink
Part of #12998 - Add Stored Procedures support for Redshift (#13174)
Browse files Browse the repository at this point in the history
* Part of #12998 - Add Stored Procedures support for Redshift

* Lint

* Fix bq queries

* Update ingestion/src/metadata/ingestion/source/database/bigquery/queries.py
  • Loading branch information
pmbrull authored Sep 14, 2023
1 parent 4396cad commit 5edad7b
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 277 deletions.
134 changes: 13 additions & 121 deletions ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@
We require Taxonomy Admin permissions to fetch all Policy Tags
"""
import os
import re
import traceback
from collections import defaultdict
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple

from google import auth
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
Expand All @@ -30,11 +27,9 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database, EntityName
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
Expand All @@ -55,14 +50,10 @@
GcpCredentialsValues,
SingleProjectId,
)
from metadata.generated.schema.type.basic import SourceUrl, SqlQuery, Timestamp
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.basic import SourceUrl
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.bigquery.models import (
Expand All @@ -80,15 +71,17 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.database_service import QueryByProcedure
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.bigquery_utils import get_bigquery_client
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import is_complex_type
from metadata.utils.stored_procedures import get_procedure_name_from_call
from metadata.utils.tag_utils import (
get_ometa_tag_and_classification,
get_tag_label,
Expand Down Expand Up @@ -201,7 +194,7 @@ def _build_formatted_table_id(table):
)


class BigquerySource(CommonDbSourceService):
class BigquerySource(StoredProcedureMixin, CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
Expand Down Expand Up @@ -632,66 +625,18 @@ def yield_stored_procedure(
)
)

@lru_cache
def procedure_queries_dict(
self, schema_name: str, database_name: str
) -> Dict[str, List[QueryByProcedure]]:
"""
Cache the queries ran for the stored procedures in the last `queryLogDuration` days.
We will run this for each different and db name.
The dictionary key will be the case-insensitive procedure name.
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
results = self.engine.execute(
BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
).all()

queries_dict = defaultdict(list)

for row in results:
try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
schema_name=schema_name,
database_name=database_name,
)
queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc:
self.status.failed(
StackTraceError(
name="Stored Procedure",
error=f"Error trying to get procedure name due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)

return queries_dict

@staticmethod
def is_lineage_query(query_type: str, query_text: str) -> bool:
"""Check if it's worth it to parse the query for lineage"""

if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
return True

if query_type == "INSERT" and re.search(
"^.*insert.*into.*select.*$", query_text, re.IGNORECASE
):
return True

return False

def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)
Expand All @@ -700,56 +645,3 @@ def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure

def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""

self.update_context(key="stored_procedure_query_lineage", value=False)
if self.is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
self.update_context(key="stored_procedure_query_lineage", value=True)

for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
dialect=ConnectionTypeDialectMapper.dialect_of(
self.context.database_service.serviceType.value
),
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if either_lineage.right.edge.lineageDetails:
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
)

yield either_lineage

def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Check the queries triggered by the procedure and add their lineage, if any"""

yield Either(
right=CreateQueryRequest(
query=SqlQuery(__root__=query_by_procedure.query_text),
query_type=query_by_procedure.query_type,
duration=query_by_procedure.query_duration,
queryDate=Timestamp(
__root__=int(query_by_procedure.query_start_time.timestamp()) * 1000
),
triggeredBy=EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
),
processedLineage=bool(self.context.stored_procedure_query_lineage),
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
AND job_type = "QUERY"
AND state = "DONE"
AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE")
AND query NOT LIKE '/*%%{"app": "OpenMetadata", %%}%%*/%%'
AND query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
LIMIT {result_limit}
"""
Expand Down Expand Up @@ -63,7 +63,7 @@
routine_name as name,
routine_definition as definition,
external_language as language
FROM test_omd.INFORMATION_SCHEMA.ROUTINES
FROM `{schema_name}`.INFORMATION_SCHEMA.ROUTINES
WHERE routine_type in ('PROCEDURE', 'TABLE FUNCTION')
AND routine_catalog = '{database_name}'
AND routine_schema = '{schema_name}'
Expand All @@ -79,7 +79,7 @@
start_time,
end_time,
user_email as user_name
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE statement_type = 'SCRIPT'
AND start_time >= '{start_date}'
AND job_type = "QUERY"
Expand All @@ -98,8 +98,10 @@
query as query_text,
null as schema_name,
total_slot_ms/1000 as duration
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE statement_type <> 'SCRIPT'
AND query NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND start_time >= '{start_date}'
AND job_type = "QUERY"
AND state = "DONE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
Base class for ingesting database services
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterable, List, Optional, Set, Tuple

from pydantic import BaseModel, Field
from pydantic import BaseModel
from sqlalchemy.engine import Inspector

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
Expand Down Expand Up @@ -67,6 +66,7 @@
create_source_context,
)
from metadata.ingestion.source.connections import get_test_connection_fn
from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure
from metadata.utils import fqn
from metadata.utils.filters import filter_by_schema
from metadata.utils.logger import ingestion_logger
Expand All @@ -84,26 +84,6 @@ class DataModelLink(BaseModel):
datamodel: DataModel


class QueryByProcedure(BaseModel):
"""
Query(ies) executed by each stored procedure
"""

procedure_id: str = Field(..., alias="PROCEDURE_ID")
query_id: str = Field(..., alias="QUERY_ID")
query_type: str = Field(..., alias="QUERY_TYPE")
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: datetime = Field(..., alias="QUERY_START_TIME")
query_duration: Optional[float] = Field(None, alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")

class Config:
allow_population_by_field_name = True


class DatabaseServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Database Services.
Expand Down
Loading

0 comments on commit 5edad7b

Please sign in to comment.