Skip to content

Commit

Permalink
GEN-1552: Postgres stored procedures support (#18083)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 authored Oct 7, 2024
1 parent a748114 commit 421ad60
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
"""
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

from sqlalchemy import String as SqlAlchemyString
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine import Inspector

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
Expand All @@ -36,7 +41,7 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
Expand All @@ -46,10 +51,13 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.mssql.models import STORED_PROC_LANGUAGE_MAP
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.postgres.models import PostgresStoredProcedure
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES,
POSTGRES_GET_STORED_PROCEDURES,
POSTGRES_GET_TABLE_NAMES,
POSTGRES_PARTITION_DETAILS,
POSTGRES_SCHEMA_COMMENTS,
Expand All @@ -64,6 +72,10 @@
get_table_owner,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.importer import import_side_effects
Expand Down Expand Up @@ -144,7 +156,7 @@
PGDialect.get_foreign_keys = get_foreign_keys


class PostgresSource(CommonDbSourceService, MultiDBSource):
class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
Expand Down Expand Up @@ -300,3 +312,62 @@ def yield_tag(
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]:
"""List stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(POSTGRES_GET_STORED_PROCEDURES).all()
for row in results:
try:
stored_procedure = PostgresStoredProcedure.model_validate(
dict(row._mapping)
)
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)

def yield_stored_procedure(
self, stored_procedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)

except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres models
"""
from typing import Optional

from pydantic import BaseModel, Field


class PostgresStoredProcedure(BaseModel):
"""Postgres stored procedure list query results"""

name: str = Field(alias="procedure_name")
schema: str = Field(alias="schema_name")
definition: str
language: Optional[str] = None
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,14 @@
FROM
aggregate_hierarchy;
"""

POSTGRES_GET_STORED_PROCEDURES = """
SELECT proname AS procedure_name,
nspname AS schema_name,
proargtypes AS argument_types,
prorettype::regtype AS return_type,
prosrc AS definition
FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE prokind = 'p';
"""

0 comments on commit 421ad60

Please sign in to comment.