Skip to content

Commit

Permalink
Fix #12167: Support for Stored Procedures as another entity under Dat…
Browse files Browse the repository at this point in the history
…abase Schema (#12999)

* Add Stored Procedure Entity

* Stored Procedure repository

* Stored Procedure repository

* Fix #12998: Support for Stored Procedures as another entity under Database Schema

* Fix #12998: Support for Stored Procedures as another entity under Database Schema
  • Loading branch information
harshach authored Aug 25, 2023
1 parent 76119ae commit 19b5c94
Show file tree
Hide file tree
Showing 14 changed files with 1,158 additions and 28 deletions.
14 changes: 14 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,17 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
UPDATE ingestion_pipeline_entity
SET json = JSON_REPLACE(json, '$.airflowConfig.retries', 0)
WHERE JSON_EXTRACT(json, '$.airflowConfig.retries') IS NOT NULL;


-- create stored procedure entity
CREATE TABLE IF NOT EXISTS stored_procedure_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL COLLATE ascii_bin,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
13 changes: 13 additions & 0 deletions bootstrap/sql/migrations/native/1.2.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,16 @@ CREATE TABLE IF NOT EXISTS search_index_entity (
-- We were hardcoding retries to 0. Since we are now using the IngestionPipeline to set them, keep existing ones to 0.
UPDATE ingestion_pipeline_entity
SET json = jsonb_set(json::jsonb, '{airflowConfig,retries}', '0', true);

-- create stored procedure entity
CREATE TABLE IF NOT EXISTS stored_procedure_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS ((json ->> 'deleted')::boolean) STORED,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
60 changes: 60 additions & 0 deletions ingestion/examples/sample_data/datasets/stored_procedures.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"storedProcedures": [
{
"name": "update_dim_address_table",
"description": "This stored procedure updates dim_address table",
"version": 0.1,
"updatedAt": 1638354087391,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/3cda8ecb-f4c6-4ed4-8506-abe965b54b86",
"storedProcedureCode": {
"langauge": "SQL",
"code": "CREATE OR REPLACE PROCEDURE output_message(message VARCHAR)\nRETURNS VARCHAR NOT NULL\nLANGUAGE SQL\nAS\n$$\nBEGIN\n RETURN message;\nEND;\n$$\n;"
},
"database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database",
"name": "sample_data.ecommerce_db",
"description": "This **mock** database contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databases/50da1ff8-4e1d-4967-8931-45edbf4dd908"
},
"tags": [],
"followers": [],
"databaseSchema": {
"id": "d7be1e2c-b3dc-11ec-b909-0242ac120002",
"type": "databaseSchema",
"name": "sample_data.ecommerce_db.shopify",
"description": "This **mock** Schema contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databaseSchemas/d7be1e2c-b3dc-11ec-b909-0242ac120002"
}
},
{
"name": "update_orders_table",
"description": "This stored procedure is written java script to update the orders table",
"version": 0.1,
"updatedAt": 1638354087391,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/3cda8ecb-f4c6-4ed4-8506-abe965b54b86",
"storedProcedureCode": {
"langauge": "JavaScript",
"code": "create or replace procedure read_result_set()\n returns float not null\n language javascript\n as \n $$ \n var my_sql_command = \"select * from table1\";\n var statement1 = snowflake.createStatement( {sqlText: my_sql_command} );\n var result_set1 = statement1.execute();\n // Loop through the results, processing one row at a time... \n while (result_set1.next()) {\n var column1 = result_set1.getColumnValue(1);\n var column2 = result_set1.getColumnValue(2);\n // Do something with the retrieved values...\n }\n return 0.0; // Replace with something more useful.\n $$\n ;"
},
"database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database",
"name": "sample_data.ecommerce_db",
"description": "This **mock** database contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databases/50da1ff8-4e1d-4967-8931-45edbf4dd908"
},
"tags": [],
"followers": [],
"databaseSchema": {
"id": "d7be1e2c-b3dc-11ec-b909-0242ac120002",
"type": "databaseSchema",
"name": "sample_data.ecommerce_db.shopify",
"description": "This **mock** Schema contains tables related to shopify sales and orders with related dimension tables.",
"href": "http://localhost:8585/api/v1/databaseSchemas/d7be1e2c-b3dc-11ec-b909-0242ac120002"
}
}
]
}
24 changes: 2 additions & 22 deletions ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@
T = TypeVar("T", bound=BaseModel)
C = TypeVar("C", bound=BaseModel)

# Helps us dynamically load the Entity class path in the
# generated module.
MODULE_PATH = {
"policy": "policies",
"service": "services",
"tag": "classification",
"classification": "classification",
"test": "tests",
"user": "teams",
"role": "teams",
"team": "teams",
"workflow": "automations",
}


class MissingEntityTypeException(Exception):
"""
Expand Down Expand Up @@ -195,12 +181,7 @@ def get_module_path(self, entity: Type[T]) -> str:
Based on the entity, return the module path
it is found inside generated
"""

for key, value in MODULE_PATH.items():
if key in entity.__name__.lower():
return value

return self.data_path
return entity.__module__.split(".")[-2]

def get_create_entity_type(self, entity: Type[T]) -> Type[C]:
"""
Expand Down Expand Up @@ -247,8 +228,8 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
.replace("testdefinition", "testDefinition")
.replace("testcase", "testCase")
.replace("searchindex", "searchIndex")
.replace("storedprocedure", "storedProcedure")
)

class_path = ".".join(
filter(
None,
Expand All @@ -260,7 +241,6 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
],
)
)

entity_class = getattr(
__import__(class_path, globals(), locals(), [class_name]), class_name
)
Expand Down
6 changes: 6 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
Expand Down Expand Up @@ -98,6 +101,7 @@
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.report import Report
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.policies.policy import Policy
Expand Down Expand Up @@ -150,6 +154,8 @@
CreateContainerRequest.__name__: "/containers",
SearchIndex.__name__: "/searchIndexes",
CreateSearchIndexRequest.__name__: "/searchIndexes",
StoredProcedure.__name__: "/storedProcedures",
CreateStoredProcedureRequest.__name__: "/storedProcedures",
# Classifications
Tag.__name__: "/tags",
CreateTagRequest.__name__: "/tags",
Expand Down
71 changes: 71 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
Expand All @@ -60,6 +63,7 @@
MlStore,
)
from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
SystemProfile,
Expand Down Expand Up @@ -244,6 +248,13 @@ def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnecti
encoding=UTF_8,
)
)
self.stored_procedures = json.load(
open( # pylint: disable=consider-using-with
sample_data_folder + "/datasets/stored_procedures.json",
"r",
encoding=UTF_8,
)
)
self.database_service_json = json.load(
open( # pylint: disable=consider-using-with
sample_data_folder + "/datasets/service.json",
Expand Down Expand Up @@ -509,6 +520,7 @@ def next_record(self) -> Iterable[Entity]:
yield from self.ingest_users()
yield from self.ingest_glue()
yield from self.ingest_tables()
yield from self.ingest_stored_procedures()
yield from self.ingest_topics()
yield from self.ingest_charts()
yield from self.ingest_data_models()
Expand Down Expand Up @@ -687,6 +699,65 @@ def ingest_tables(self):
),
)

def ingest_stored_procedures(self):
"""
Ingest Sample Stored Procedures
"""

db = CreateDatabaseRequest(
name=self.database["name"],
description=self.database["description"],
service=self.database_service.fullyQualifiedName.__root__,
)
yield db

database_entity = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
)

database_object = self.metadata.get_by_name(
entity=Database, fqn=database_entity
)

schema = CreateDatabaseSchemaRequest(
name=self.database_schema["name"],
description=self.database_schema["description"],
database=database_object.fullyQualifiedName,
)
yield schema

database_schema_entity = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
schema_name=schema.name.__root__,
)

database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_entity
)

resp = self.metadata.list_entities(entity=User, limit=5)
self.user_entity = resp.entities

for stored_procedure in self.stored_procedures["storedProcedures"]:
stored_procedure = CreateStoredProcedureRequest(
name=stored_procedure["name"],
description=stored_procedure["description"],
storedProcedureCode=StoredProcedureCode(
**stored_procedure["storedProcedureCode"]
),
databaseSchema=database_schema_object.fullyQualifiedName,
tags=stored_procedure["tags"],
)

self.status.scanned(f"StoredProcedure Scanned: {stored_procedure.name}")
yield stored_procedure

def ingest_topics(self) -> Iterable[CreateTopicRequest]:
"""
Ingest Sample Topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class Entity {
// Data asset entities
//
public static final String TABLE = "table";
public static final String STORED_PROCEDURE = "storedProcedure";
public static final String DATABASE = "database";
public static final String DATABASE_SCHEMA = "databaseSchema";
public static final String METRICS = "metrics";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.Report;
import org.openmetadata.schema.entity.data.SearchIndex;
import org.openmetadata.schema.entity.data.StoredProcedure;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.domains.DataProduct;
Expand Down Expand Up @@ -167,6 +168,9 @@ public interface CollectionDAO {
@CreateSqlObject
TableDAO tableDAO();

@CreateSqlObject
QueryDAO queryDAO();

@CreateSqlObject
UsageDAO usageDAO();

Expand Down Expand Up @@ -249,7 +253,7 @@ public interface CollectionDAO {
FeedDAO feedDAO();

@CreateSqlObject
QueryDAO queryDAO();
StoredProcedureDAO storedProcedureDAO();

@CreateSqlObject
ChangeEventDAO changeEventDAO();
Expand Down Expand Up @@ -1809,6 +1813,23 @@ default List<String> listAfter(ListFilter filter, int limit, String after) {
}
}

interface StoredProcedureDAO extends EntityDAO<StoredProcedure> {
@Override
default String getTableName() {
return "stored_procedure_entity";
}

@Override
default Class<StoredProcedure> getEntityClass() {
return StoredProcedure.class;
}

@Override
default String getNameHashColumn() {
return "fqnHash";
}
}

interface QueryDAO extends EntityDAO<Query> {
@Override
default String getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
import org.openmetadata.service.util.JsonUtils;

public class SearchIndexRepository extends EntityRepository<SearchIndex> {

public SearchIndexRepository(CollectionDAO dao) {
super(
SearchIndexResource.COLLECTION_PATH, Entity.SEARCH_INDEX, SearchIndex.class, dao.searchIndexDAO(), dao, "", "");
}

@Override
public void setFullyQualifiedName(SearchIndex searchIndex) {
searchIndex.setFullyQualifiedName(
Expand All @@ -63,11 +69,6 @@ public void setFullyQualifiedName(SearchIndex searchIndex) {
}
}

public SearchIndexRepository(CollectionDAO dao) {
super(
SearchIndexResource.COLLECTION_PATH, Entity.SEARCH_INDEX, SearchIndex.class, dao.searchIndexDAO(), dao, "", "");
}

@Override
public void prepare(SearchIndex searchIndex) {
SearchService searchService = Entity.getEntity(searchIndex.getService(), "", ALL);
Expand Down
Loading

0 comments on commit 19b5c94

Please sign in to comment.