Skip to content

Commit

Permalink
Part of #12998 - Add Stored Procedures support in BigQuery (#13160)
Browse files Browse the repository at this point in the history
* Fix snowflake SP source url

* Update sourceUrl on PUT

* Add BigQuery Stored Procedures support

* Linting

* Linting

* lint

* Linting
  • Loading branch information
pmbrull authored Sep 13, 2023
1 parent 8ed22fa commit 42393f5
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 14 deletions.
232 changes: 227 additions & 5 deletions ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
We require Taxonomy Admin permissions to fetch all Policy Tags
"""
import os
import re
import traceback
from typing import Iterable, List, Optional, Tuple
from collections import defaultdict
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple

from google import auth
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
Expand All @@ -27,7 +30,13 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database, EntityName
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
IntervalType,
TablePartition,
Expand All @@ -47,11 +56,22 @@
MultipleProjectId,
SingleProjectId,
)
from metadata.generated.schema.type.basic import SourceUrl, SqlQuery, Timestamp
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.database.bigquery.models import (
STORED_PROC_LANGUAGE_MAP,
BigQueryStoredProcedure,
)
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_GET_STORED_PROCEDURES,
BIGQUERY_SCHEMA_DESCRIPTION,
BIGQUERY_TABLE_AND_TYPE,
)
Expand All @@ -60,12 +80,15 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.database_service import QueryByProcedure
from metadata.utils import fqn
from metadata.utils.bigquery_utils import get_bigquery_client
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import is_complex_type
from metadata.utils.stored_procedures import get_procedure_name_from_call
from metadata.utils.tag_utils import (
get_ometa_tag_and_classification,
get_tag_label,
Expand Down Expand Up @@ -483,12 +506,12 @@ def close(self):
os.remove(tmp_credentials_file)
del os.environ[GOOGLE_CREDENTIALS]

def get_source_url(
def _get_source_url(
self,
database_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
table_type: Optional[TableType] = None,
type_infix: str = "4m3",
) -> Optional[str]:
"""
Method to get the source url for bigquery
Expand All @@ -502,7 +525,7 @@ def get_source_url(
schema_table_url = f"&ws=!1m4!1m3!3m2!1s{database_name}!2s{schema_name}"
if table_name:
schema_table_url = (
f"&ws=!1m5!1m4!4m3!1s{database_name}"
f"&ws=!1m5!1m4!{type_infix}!1s{database_name}"
f"!2s{schema_name}!3s{table_name}"
)
if schema_table_url:
Expand All @@ -512,3 +535,202 @@ def get_source_url(
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get source url: {exc}")
return None

def get_source_url(
self,
database_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
table_type: Optional[TableType] = None,
) -> Optional[str]:
return self._get_source_url(
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
# This infix identifies tables in the URL
type_infix="4m3",
)

def get_stored_procedure_url(
self,
database_name: Optional[str] = None,
schema_name: Optional[str] = None,
table_name: Optional[str] = None,
) -> Optional[str]:
return self._get_source_url(
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
# This infix identifies Stored Procedures in the URL
type_infix="6m3",
)

def get_stored_procedures(self) -> Iterable[BigQueryStoredProcedure]:
"""List BigQuery Stored Procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(
BIGQUERY_GET_STORED_PROCEDURES.format(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
)
).all()
for row in results:
stored_procedure = BigQueryStoredProcedure.parse_obj(dict(row))
yield stored_procedure

def yield_stored_procedure(
self, stored_procedure: BigQueryStoredProcedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""

try:
yield Either(
right=CreateStoredProcedureRequest(
name=EntityName(__root__=stored_procedure.name),
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(
stored_procedure.language or "SQL",
),
code=stored_procedure.definition,
),
databaseSchema=self.context.database_schema.fullyQualifiedName,
sourceUrl=SourceUrl(
__root__=self.get_stored_procedure_url(
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
# Follow the same building strategy as tables
table_name=stored_procedure.name,
)
),
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)

@lru_cache
def procedure_queries_dict(
self, schema_name: str, database_name: str
) -> Dict[str, List[QueryByProcedure]]:
"""
Cache the queries ran for the stored procedures in the last `queryLogDuration` days.
We will run this for each different and db name.
The dictionary key will be the case-insensitive procedure name.
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
results = self.engine.execute(
BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
).all()

queries_dict = defaultdict(list)

for row in results:
try:
query_by_procedure = QueryByProcedure.parse_obj(dict(row))
procedure_name = get_procedure_name_from_call(
query_text=query_by_procedure.procedure_text,
schema_name=schema_name,
database_name=database_name,
)
queries_dict[procedure_name].append(query_by_procedure)
except Exception as exc:
self.status.failed(
StackTraceError(
name="Stored Procedure",
error=f"Error trying to get procedure name due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)

return queries_dict

@staticmethod
def is_lineage_query(query_type: str, query_text: str) -> bool:
"""Check if it's worth it to parse the query for lineage"""

if query_type in ("MERGE", "UPDATE", "CREATE_TABLE_AS_SELECT"):
return True

if query_type == "INSERT" and re.search(
"^.*insert.*into.*select.*$", query_text, re.IGNORECASE
):
return True

return False

def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
queries_dict = self.procedure_queries_dict(
schema_name=self.context.database_schema.name.__root__,
database_name=self.context.database.name.__root__,
)

for query_by_procedure in (
queries_dict.get(self.context.stored_procedure.name.__root__.lower()) or []
):
yield query_by_procedure

def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""

self.update_context(key="stored_procedure_query_lineage", value=False)
if self.is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
self.update_context(key="stored_procedure_query_lineage", value=True)

for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
dialect=ConnectionTypeDialectMapper.dialect_of(
self.context.database_service.serviceType.value
),
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if either_lineage.right.edge.lineageDetails:
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
)

yield either_lineage

def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Check the queries triggered by the procedure and add their lineage, if any"""

yield Either(
right=CreateQueryRequest(
query=SqlQuery(__root__=query_by_procedure.query_text),
query_type=query_by_procedure.query_type,
duration=query_by_procedure.query_duration,
queryDate=Timestamp(
__root__=int(query_by_procedure.query_start_time.timestamp()) * 1000
),
triggeredBy=EntityReference(
id=self.context.stored_procedure.id,
type="storedProcedure",
),
processedLineage=bool(self.context.stored_procedure_query_lineage),
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BigQuery models
"""
from typing import Optional

from pydantic import BaseModel, Field

from metadata.generated.schema.entity.data.storedProcedure import Language
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()

STORED_PROC_LANGUAGE_MAP = {
"SQL": Language.SQL,
"JAVASCRIPT": Language.JavaScript,
}


class BigQueryStoredProcedure(BaseModel):
"""BigQuery Stored Procedure list query results"""

name: str
definition: str
language: Optional[str] = Field(
None, description="Will only be informed for non-SQL routines."
)
Loading

0 comments on commit 42393f5

Please sign in to comment.