Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: postgres integration - create database #932

Merged
merged 8 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from evadb.catalog.models.utils import (
ColumnCatalogEntry,
DatabaseCatalogEntry,
IndexCatalogEntry,
TableCatalogEntry,
UdfCacheCatalogEntry,
Expand All @@ -45,6 +46,7 @@
truncate_catalog_tables,
)
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.services.udf_cache_catalog_service import UdfCacheCatalogService
Expand All @@ -70,6 +72,7 @@ def __init__(self, db_uri: str, config: ConfigurationManager):
self._sql_config = SQLConfig(db_uri)
self._config = config
self._bootstrap_catalog()
self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._udf_service = UdfCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -121,6 +124,48 @@ def _clear_catalog_contents(self):
# clean up the dataset, index, and cache directories
cleanup_storage(self._config)

"Database catalog services"

def insert_database_catalog_entry(self, name: str, engine: str, params: dict):
"""A new entry is persisted in the database catalog."

Args:
name: database name
engine: engine name
params: required params as a dictionary for the database
"""
self._db_catalog_service.insert_entry(name, engine, params)

def get_database_catalog_entry(self, database_name: str) -> DatabaseCatalogEntry:
"""
Returns the database catalog entry for the given database_name
Arguments:
database_name (str): name of the database

Returns:
DatabaseCatalogEntry
"""

table_entry = self._db_catalog_service.get_entry_by_name(database_name)

return table_entry

def delete_database_catalog_entry(
self, database_entry: DatabaseCatalogEntry
) -> bool:
"""
This method deletes the database from catalog.

Arguments:
database_entry: database catalog entry to remove

Returns:
True if successfully deleted else False
"""
# todo: do we need to remove also the associated tables etc or that will be
# taken care by the underlying db
return self._db_catalog_service.delete_entry(database_entry)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
13 changes: 0 additions & 13 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@
from evadb.parser.create_statement import ColConstraintInfo, ColumnDefinition
from evadb.utils.generic_utils import get_str_hash, remove_directory_contents

CATALOG_TABLES = [
"column_catalog",
"table_catalog",
"depend_column_and_udf_cache",
"udf_cache",
"udf_catalog",
"depend_udf_and_udf_cache",
"index_catalog",
"udfio_catalog",
"udf_cost_catalog",
"udf_metadata_catalog",
]


def is_video_table(table: TableCatalogEntry):
return table.table_type == TableType.VIDEO_DATA
Expand Down
47 changes: 47 additions & 0 deletions evadb/catalog/models/database_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy import Column, String

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import DatabaseCatalogEntry, TextPickleType


class DatabaseCatalog(BaseModel):
"""The `DatabaseCatalog` catalog stores information about all databases.
`_row_id:` an autogenerated unique identifier.
`_name:` the database name.
`_engine:` database engine
`_param:` parameters specific to the database engine
"""

__tablename__ = "database_catalog"

_name = Column("name", String(100), unique=True)
_engine = Column("engine", String(100))
_params = Column("params", TextPickleType())

def __init__(self, name: str, engine: str, params: dict):
self._name = name
self._engine = engine
self._params = params

def as_dataclass(self) -> "DatabaseCatalogEntry":
return DatabaseCatalogEntry(
row_id=self._row_id,
name=self._name,
engine=self._engine,
params=self._params,
)
40 changes: 40 additions & 0 deletions evadb/catalog/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import json
from dataclasses import dataclass, field
from typing import List, Tuple

import sqlalchemy
from sqlalchemy.engine import Engine
from sqlalchemy.types import TypeDecorator
from sqlalchemy_utils import create_database, database_exists

from evadb.catalog.catalog_type import (
Expand All @@ -31,6 +33,25 @@
from evadb.utils.logging_manager import logger


class TextPickleType(TypeDecorator):
"""Used to handle serialization and deserialization to Text
https://stackoverflow.com/questions/1378325/python-dicts-in-sqlalchemy
"""

impl = sqlalchemy.String(1024)

def process_bind_param(self, value, dialect):
if value is not None:
value = json.dumps(value)

return value

def process_result_value(self, value, dialect):
if value is not None:
value = json.loads(value)
return value


def init_db(engine: Engine):
"""Create database if doesn't exist and create all tables."""
if not database_exists(engine.url):
Expand Down Expand Up @@ -209,3 +230,22 @@ def _to_str(col):
"impl": self.impl_file_path,
"metadata": self.metadata,
}


@dataclass(unsafe_hash=True)
class DatabaseCatalogEntry:
"""Dataclass representing an entry in the `DatabaseCatalog`.
This is done to ensure we don't expose the sqlalchemy dependencies beyond catalog service. Further, sqlalchemy does not allow sharing of objects across threads.
"""

name: str
engine: str
params: dict
row_id: int = None

def display_format(self):
return {
"name": self.name,
"engine": self.engine,
"params": self.params,
}
83 changes: 83 additions & 0 deletions evadb/catalog/services/database_catalog_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

from evadb.catalog.models.database_catalog import DatabaseCatalog
from evadb.catalog.models.utils import DatabaseCatalogEntry
from evadb.catalog.services.base_service import BaseService
from evadb.utils.errors import CatalogError
from evadb.utils.logging_manager import logger


class DatabaseCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(DatabaseCatalog, db_session)

def insert_entry(
self,
name: str,
engine: str,
params: dict,
):
try:
db_catalog_obj = self.model(
name=name,
engine=engine,
params=params,
)
db_catalog_obj = db_catalog_obj.save(self.session)

except Exception as e:
logger.exception(
f"Failed to insert entry into database catalog with exception {str(e)}"
)
raise CatalogError(e)

def get_entry_by_name(self, database_name: str) -> DatabaseCatalogEntry:
"""
Get the table catalog entry with given table name.
Arguments:
database_name (str): Database name
Returns:
DatabaseCatalogEntry - catalog entry for given database name
"""
entry = self.session.execute(
select(self.model).filter(self.model._name == database_name)
).scalar_one_or_none()
if entry:
return entry.as_dataclass()
return entry

def delete_entry(self, database_entry: DatabaseCatalogEntry):
"""Delete database from the catalog
Arguments:
database (DatabaseCatalogEntry): database to delete
Returns:
True if successfully removed else false
"""
try:
db_catalog_obj = self.session.execute(
select(self.model).filter(self.model._row_id == database_entry.row_id)
).scalar_one_or_none()
db_catalog_obj.delete(self.session)
return True
except Exception as e:
err_msg = (
f"Delete database failed for {database_entry} with error {str(e)}."
)
logger.exception(err_msg)
raise CatalogError(err_msg)
1 change: 1 addition & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
CATALOG_TABLES = [
"column_catalog",
"table_catalog",
"database_catalog",
"depend_column_and_udf_cache",
"udf_cache",
"udf_catalog",
Expand Down
48 changes: 48 additions & 0 deletions evadb/executor/create_database_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateDatabaseStatement
from evadb.utils.logging_manager import logger


class CreateDatabaseExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: CreateDatabaseStatement):
super().__init__(db, node)

def exec(self, *args, **kwargs):
# todo handle if_not_exists

logger.debug(
f"Trying to connect to the provided engine {self.node.engine} with params {self.node.param_dict}"
)
# todo handle if the provided database params are valid

logger.debug(f"Creating database {self.node}")

self.catalog().insert_database_catalog_entry(
self.node.database_name, self.node.engine, self.node.param_dict
)

yield Batch(
pd.DataFrame(
[
f"The database {self.node.database_name} has been successfully created."
]
)
)
13 changes: 11 additions & 2 deletions evadb/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Iterator, Union

from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.apply_and_merge_executor import ApplyAndMergeExecutor
from evadb.executor.create_database_executor import CreateDatabaseExecutor
from evadb.executor.create_executor import CreateExecutor
from evadb.executor.create_index_executor import CreateIndexExecutor
from evadb.executor.create_udf_executor import CreateUDFExecutor
Expand Down Expand Up @@ -46,6 +47,8 @@
from evadb.executor.union_executor import UnionExecutor
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
from evadb.models.storage.batch import Batch
from evadb.parser.create_statement import CreateDatabaseStatement
from evadb.parser.statement import AbstractStatement
from evadb.plan_nodes.abstract_plan import AbstractPlan
from evadb.plan_nodes.types import PlanOprType
from evadb.utils.logging_manager import logger
Expand All @@ -65,7 +68,9 @@ def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
self._db = evadb
self._plan = plan

def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
def _build_execution_tree(
self, plan: Union[AbstractPlan, AbstractStatement]
) -> AbstractExecutor:
"""build the execution tree from plan tree

Arguments:
Expand All @@ -78,6 +83,10 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
if plan is None:
return root

# First handle cases when the plan is actually a parser statement
if isinstance(plan, CreateDatabaseStatement):
return CreateDatabaseExecutor(db=self._db, node=plan)

# Get plan node type
plan_opr_type = plan.opr_type

Expand Down
Loading