Skip to content

Commit

Permalink
Part of open-metadata#12998 - Prep Stored Procedures Skeleton for Sno…
Browse files Browse the repository at this point in the history
…wflake (open-metadata#13121)

* Prep Stored Procedures Skeleton for Snowflake

* Update pylint and add migrations

* Fix test

* Reuse source url computation
  • Loading branch information
pmbrull authored Sep 12, 2023
1 parent 1960c68 commit f0995cb
Show file tree
Hide file tree
Showing 27 changed files with 883 additions and 70 deletions.
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fail-under=6.0
init-hook='from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))'
extension-pkg-allow-list=pydantic
load-plugins=ingestion.plugins.print_checker
max-public-methods=25

[MESSAGES CONTROL]
disable=no-name-in-module,import-error,duplicate-code
Expand Down
10 changes: 10 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,13 @@ CREATE TABLE IF NOT EXISTS table_entity_extension (

ALTER TABLE entity_relationship ADD INDEX from_entity_type_index(fromId, fromEntity), ADD INDEX to_entity_type_index(toId, toEntity);
ALTER TABLE tag DROP CONSTRAINT fqnHash, ADD CONSTRAINT UNIQUE(fqnHash), ADD PRIMARY KEY(id);


-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit
UPDATE ingestion_pipeline_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.sourceConfig.config.viewParsingTimeoutLimit'),
'$.sourceConfig.config.queryParsingTimeoutLimit',
JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit')
)
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
11 changes: 11 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,14 @@ ALTER TABLE tag DROP CONSTRAINT IF EXISTS tag_fqnhash_key;
ALTER TABLE tag ADD CONSTRAINT unique_fqnHash UNIQUE (fqnHash);

ALTER TABLE tag ADD CONSTRAINT tag_pk PRIMARY KEY (id);


-- rename viewParsingTimeoutLimit for queryParsingTimeoutLimit
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(
json::jsonb #- '{sourceConfig,config,viewParsingTimeoutLimit}',
'{sourceConfig,config,queryParsingTimeoutLimit}',
(json #> '{sourceConfig,config,viewParsingTimeoutLimit}')::jsonb,
true
)
WHERE json #>> '{pipelineType}' = 'metadata';
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def build_datamodel_name(model_name: str, explore_name: str) -> str:
return clean_dashboard_name(model_name + "_" + explore_name)


# pylint: disable=too-many-public-methods
class LookerSource(DashboardServiceSource):
"""
Looker Source Class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import traceback
from abc import ABC
from copy import deepcopy
from typing import Iterable, List, Optional, Tuple
from typing import Any, Iterable, List, Optional, Tuple

from pydantic import BaseModel
from sqlalchemy.engine import Connection
Expand All @@ -26,6 +26,10 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import (
Expand All @@ -49,7 +53,10 @@
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.ingestion.source.models import TableView
Expand Down Expand Up @@ -356,6 +363,27 @@ def yield_tag(
Each source should implement its own when needed
"""

def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""

def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""

def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""

def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""

@calculate_execution_time_generator
def yield_table(
self, table_name_and_type: Tuple[str, str]
Expand Down Expand Up @@ -452,7 +480,7 @@ def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
metadata=self.metadata,
service_name=self.context.database_service.name.__root__,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.viewParsingTimeoutLimit,
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
)

def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

import traceback
from abc import ABC, abstractmethod
from typing import Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from pandas import json_normalize

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
Expand All @@ -39,7 +43,10 @@
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.datalake.metadata import DatalakeSource
from metadata.utils import fqn
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE
Expand Down Expand Up @@ -246,6 +253,27 @@ def yield_tag(
tags are not supported with NoSQL
"""

def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""

def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""

def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""

def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""

def get_source_url(
self,
database_name: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,29 @@
Base class for ingesting database services
"""
from abc import ABC, abstractmethod
from typing import Iterable, List, Optional, Set, Tuple
from datetime import datetime
from typing import Any, Iterable, List, Optional, Set, Tuple

from pydantic import BaseModel
from pydantic import BaseModel, Field
from sqlalchemy.engine import Inspector

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
Expand Down Expand Up @@ -77,6 +84,23 @@ class DataModelLink(BaseModel):
datamodel: DataModel


class QueryByProcedure(BaseModel):
"""
Query(ies) executed by each stored procedure
"""

procedure_id: str = Field(..., alias="PROCEDURE_ID")
query_id: str = Field(..., alias="QUERY_ID")
query_type: str = Field(..., alias="QUERY_TYPE")
procedure_text: str = Field(..., alias="PROCEDURE_TEXT")
procedure_start_time: datetime = Field(..., alias="PROCEDURE_START_TIME")
procedure_end_time: datetime = Field(..., alias="PROCEDURE_END_TIME")
query_start_time: datetime = Field(..., alias="QUERY_START_TIME")
query_duration: float = Field(..., alias="QUERY_DURATION")
query_text: str = Field(..., alias="QUERY_TEXT")
query_user_name: Optional[str] = Field(None, alias="QUERY_USER_NAME")


class DatabaseServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Database Services.
Expand Down Expand Up @@ -130,7 +154,7 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["database_service", "database"],
),
],
children=["table"],
children=["table", "stored_procedure"],
post_process=["mark_tables_as_deleted"],
)
table = TopologyNode(
Expand All @@ -151,6 +175,36 @@ class DatabaseServiceTopology(ServiceTopology):
),
],
)
stored_procedure = TopologyNode(
producer="get_stored_procedures",
stages=[
NodeStage(
type_=StoredProcedure,
context="stored_procedure",
processor="yield_stored_procedure",
consumer=["database_service", "database", "database_schema"],
),
],
children=["stored_procedure_queries"],
)
stored_procedure_queries = TopologyNode(
producer="get_stored_procedure_queries",
stages=[
NodeStage(
type_=AddLineageRequest, # TODO: Fix context management for multiple types
processor="yield_procedure_lineage",
context="stored_procedure_query_lineage", # Used to flag if the query has had processed lineage
nullable=True,
ack_sink=False,
),
NodeStage(
type_=Query,
processor="yield_procedure_query",
nullable=True,
ack_sink=False,
),
],
)


class DatabaseServiceSource(
Expand Down Expand Up @@ -274,6 +328,32 @@ def yield_table(
Also, update the self.inspector value to the current db.
"""

@abstractmethod
def get_stored_procedures(self) -> Iterable[Any]:
"""List stored procedures to process"""

@abstractmethod
def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Process the stored procedure information"""

@abstractmethod
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""List the queries associated to a stored procedure"""

@abstractmethod
def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Process the stored procedure query"""

@abstractmethod
def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""

def get_raw_database_schema_names(self) -> Iterable[str]:
"""
fetch all schema names without any filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
import json
import traceback
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

from databricks.sdk.service.catalog import ColumnInfo
from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint
Expand All @@ -23,6 +23,10 @@
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
Expand Down Expand Up @@ -52,7 +56,10 @@
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.database_service import (
DatabaseServiceSource,
QueryByProcedure,
)
from metadata.ingestion.source.database.databricks.connection import get_connection
from metadata.ingestion.source.database.databricks.models import (
ColumnJson,
Expand Down Expand Up @@ -484,5 +491,26 @@ def yield_tag(
) -> Iterable[Either[OMetaTagAndClassification]]:
"""No tags being processed"""

def get_stored_procedures(self) -> Iterable[Any]:
"""Not implemented"""

def yield_stored_procedure(
self, stored_procedure: Any
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Not implemented"""

def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
"""Not Implemented"""

def yield_procedure_query(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[CreateQueryRequest]]:
"""Not implemented"""

def yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Not implemented"""

def close(self):
"""Nothing to close"""
Loading

0 comments on commit f0995cb

Please sign in to comment.