Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEN-1552: Postgres stored procedures support #18083

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@
"""
import traceback
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

from sqlalchemy import String as SqlAlchemyString
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine import Inspector

from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
PartitionColumnDetails,
PartitionIntervalTypes,
Expand All @@ -36,7 +41,7 @@
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
Expand All @@ -46,10 +51,13 @@
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.mssql.models import STORED_PROC_LANGUAGE_MAP
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.postgres.models import PostgresStoredProcedure
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES,
POSTGRES_GET_STORED_PROCEDURES,
POSTGRES_GET_TABLE_NAMES,
POSTGRES_PARTITION_DETAILS,
POSTGRES_SCHEMA_COMMENTS,
Expand All @@ -64,6 +72,10 @@
get_table_owner,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.importer import import_side_effects
Expand Down Expand Up @@ -144,7 +156,7 @@
PGDialect.get_foreign_keys = get_foreign_keys


class PostgresSource(CommonDbSourceService, MultiDBSource):
class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
Expand Down Expand Up @@ -300,3 +312,62 @@ def yield_tag(
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedures(self) -> Iterable[PostgresStoredProcedure]:
"""List stored procedures"""
if self.source_config.includeStoredProcedures:
results = self.engine.execute(POSTGRES_GET_STORED_PROCEDURES).all()
for row in results:
try:
stored_procedure = PostgresStoredProcedure.model_validate(
dict(row._mapping)
)
yield stored_procedure
except Exception as exc:
logger.error()
self.status.failed(
error=StackTraceError(
name=dict(row).get("name", "UNKNOWN"),
error=f"Error parsing Stored Procedure payload: {exc}",
stackTrace=traceback.format_exc(),
)
)

def yield_stored_procedure(
self, stored_procedure
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""
try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(stored_procedure.name),
description=None,
storedProcedureCode=StoredProcedureCode(
language=STORED_PROC_LANGUAGE_MAP.get(stored_procedure.language),
code=stored_procedure.definition,
),
databaseSchema=fqn.build(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
),
)
yield Either(right=stored_procedure_request)
self.register_record_stored_proc_request(stored_procedure_request)

except Exception as exc:
yield Either(
left=StackTraceError(
name=stored_procedure.name,
error=f"Error yielding Stored Procedure [{stored_procedure.name}] due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
return {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Postgres models
"""
from typing import Optional

from pydantic import BaseModel, Field


class PostgresStoredProcedure(BaseModel):
"""Postgres stored procedure list query results"""

name: str = Field(alias="procedure_name")
schema: str = Field(alias="schema_name")
definition: str
language: Optional[str] = None
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,14 @@
FROM
aggregate_hierarchy;
"""

POSTGRES_GET_STORED_PROCEDURES = """
SELECT proname AS procedure_name,
nspname AS schema_name,
proargtypes AS argument_types,
prorettype::regtype AS return_type,
prosrc AS definition
FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE prokind = 'p';
"""
Loading