Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: select from native database #942

Merged
merged 5 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ jobs:
name: Run integration tests
command: |
source test_evadb/bin/activate
PYTHONPATH="." python -m pytest test/third_party_tests/test_native_executor.py -k test_should_run_simple_query_in_postgres
PYTHONPATH="." python -m pytest test/third_party_tests/test_native_executor.py -k test_should_run_query_in_postgres

Windows:
executor: win/default
Expand Down
90 changes: 86 additions & 4 deletions evadb/binder/binder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,94 @@
import re
from typing import TYPE_CHECKING, List

from evadb.catalog.catalog_type import TableType
from evadb.catalog.catalog_type import ColumnType, TableType
from evadb.catalog.catalog_utils import (
get_video_table_column_definitions,
is_document_table,
is_pdf_table,
is_string_col,
is_video_table,
)
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry

if TYPE_CHECKING:
from evadb.binder.statement_binder_context import StatementBinderContext
from evadb.catalog.catalog_manager import CatalogManager

from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.parser.alias import Alias
from evadb.parser.create_statement import ColumnDefinition
from evadb.parser.table_ref import TableInfo, TableRef
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger


class BinderError(Exception):
pass


def bind_table_info(
catalog: CatalogManager, table_info: TableInfo
) -> TableCatalogEntry:
def check_data_source_and_table_are_valid(
catalog: CatalogManager, database_name: str, table_name: str
):
"""
Validate the database is valid and the requested table in database is
also valid.
"""
db_catalog_entry = catalog.get_database_catalog_entry(database_name)

if db_catalog_entry is not None:
handler = get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
)
handler.connect()

# Get table definition.
resp = handler.get_tables()
if resp.error is not None:
error = "There is no table in data source {}. Create the table using native query.".format(
database_name,
)
logger.error(error)
raise BinderError(error)

# Check table existance.
table_df = resp.data
if table_name not in table_df["table_name"].values:
error = "Table {} does not exist in data source {}. Create the table using native query.".format(
table_name,
database_name,
)
logger.error(error)
raise BinderError(error)
else:
error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format(
database_name,
)
logger.error(error)
raise BinderError(error)


def create_table_catalog_entry_for_data_source(
table_name: str, column_name_list: List[str]
):
column_list = []
for column_name in column_name_list:
column_list.append(ColumnCatalogEntry(column_name, ColumnType.ANY))

# Assemble table.
table_catalog_entry = TableCatalogEntry(
name=table_name,
file_url=None,
table_type=TableType.NATIVE_DATA,
columns=column_list,
)
return table_catalog_entry


def bind_table_info(catalog: CatalogManager, table_info: TableInfo):
"""
Uses catalog to bind the table information .

Expand All @@ -56,6 +115,29 @@ def bind_table_info(
Returns:
TableCatalogEntry - corresponding table catalog entry for the input table info
"""
if table_info.database_name is not None:
bind_native_table_info(catalog, table_info)
else:
bind_evadb_table_info(catalog, table_info)


def bind_native_table_info(catalog: CatalogManager, table_info: TableInfo):
check_data_source_and_table_are_valid(
catalog, table_info.database_name, table_info.table_name
)

db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name)
handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params)
handler.connect()

# Assemble columns.
column_df = handler.get_columns(table_info.table_name).data
table_info.table_obj = create_table_catalog_entry_for_data_source(
table_info.table_name, list(column_df["column_name"])
)


def bind_evadb_table_info(catalog: CatalogManager, table_info: TableInfo):
obj = catalog.get_table_catalog_entry(
table_info.table_name,
table_info.database_name,
Expand Down
2 changes: 1 addition & 1 deletion evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _bind_tableref(self, node: TableRef):
if node.is_table_atom():
# Table
self._binder_context.add_table_alias(
node.alias.alias_name, node.table.table_name
node.alias.alias_name, node.table.database_name, node.table.table_name
)
bind_table_info(self._catalog(), node.table)
elif node.is_select():
Expand Down
40 changes: 35 additions & 5 deletions evadb/binder/statement_binder_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
# limitations under the License.
from typing import Callable, Dict, List, Tuple, Union

from evadb.binder.binder_utils import BinderError
from evadb.binder.binder_utils import (
BinderError,
check_data_source_and_table_are_valid,
create_table_catalog_entry_for_data_source,
)
from evadb.catalog.catalog_type import TableType
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.models.udf_io_catalog import UdfIOCatalogEntry
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger

CatalogColumnType = Union[ColumnCatalogEntry, UdfIOCatalogEntry]
Expand Down Expand Up @@ -64,15 +70,34 @@ def _check_duplicate_alias(self, alias: str):
logger.error(err_msg)
raise BinderError(err_msg)

def add_table_alias(self, alias: str, table_name: str):
def add_table_alias(self, alias: str, database_name: str, table_name: str):
"""
Add a alias -> table_name mapping
Arguments:
alias (str): name of alias
table_name (str): name of the table
"""
self._check_duplicate_alias(alias)
table_obj = self._catalog().get_table_catalog_entry(table_name)

if database_name is not None:
check_data_source_and_table_are_valid(
self._catalog(), database_name, table_name
)

db_catalog_entry = self._catalog().get_database_catalog_entry(database_name)
handler = get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
)
handler.connect()

# Assemble columns.
column_df = handler.get_columns(table_name).data
table_obj = create_table_catalog_entry_for_data_source(
table_name, list(column_df["column_name"])
)
else:
table_obj = self._catalog().get_table_catalog_entry(table_name)

self._table_alias_map[alias] = table_obj

def add_derived_table_alias(
Expand Down Expand Up @@ -143,8 +168,13 @@ def _check_table_alias_map(self, alias, col_name) -> ColumnCatalogEntry:
column object
"""
table_obj = self._table_alias_map.get(alias, None)
if table_obj:
return self._catalog().get_column_catalog_entry(table_obj, col_name)
if table_obj is not None:
if table_obj.table_type == TableType.NATIVE_DATA:
for column_catalog_entry in table_obj.columns:
if column_catalog_entry.name == col_name:
return column_catalog_entry
else:
return self._catalog().get_column_catalog_entry(table_obj, col_name)

def _check_derived_table_alias_map(self, alias, col_name) -> CatalogColumnType:
"""
Expand Down
4 changes: 4 additions & 0 deletions evadb/catalog/catalog_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class TableType(EvaDBEnum):
# cannot be accessed/modified directly by user
SYSTEM_STRUCTURED_DATA # noqa: F821

# Reserved for tables that are stored in native
# database backend.
NATIVE_DATA # noqa: F821


class ColumnType(EvaDBEnum):
BOOLEAN # noqa: F821
Expand Down
6 changes: 5 additions & 1 deletion evadb/executor/storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
return storage_engine.read(self.node.table, self.node.chunk_params)
elif self.node.table.table_type == TableType.STRUCTURED_DATA:
return storage_engine.read(self.node.table, self.node.batch_mem_size)
elif self.node.table.table_type == TableType.NATIVE_DATA:
return storage_engine.read(
self.node.table_ref.table.database_name, self.node.table
)
elif self.node.table.table_type == TableType.PDF_DATA:
return storage_engine.read(self.node.table)
else:
raise ExecutorError(
f"Unsupported TableType {self.node.table.table_type} encountered"
f"Unsupported TableType {self.node.table.table_type} encountered"
)
except Exception as e:
logger.error(e)
Expand Down
17 changes: 14 additions & 3 deletions evadb/parser/lark_visitor/_common_clauses_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@

class CommonClauses:
def table_name(self, tree):
table_name = self.visit(tree.children[0])
child = self.visit(tree.children[0])
if isinstance(child, tuple):
database_name, table_name = child[0], child[1]
else:
database_name, table_name = None, child

if table_name is not None:
return TableInfo(table_name=table_name)
return TableInfo(table_name=table_name, database_name=database_name)
else:
error = "Invalid Table Name"
logger.error(error)

def full_id(self, tree):
return self.visit(tree.children[0])
if len(tree.children) == 1:
# Table only
return self.visit(tree.children[0])
elif len(tree.children) == 2:
# Data source and table
# Ex. DemoDB.TestTable
return (self.visit(tree.children[0]), self.visit(tree.children[1]))

def uid(self, tree):
return self.visit(tree.children[0])
Expand Down
53 changes: 53 additions & 0 deletions evadb/storage/native_storage_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

import pandas as pd

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger


class NativeStorageEngine(AbstractStorageEngine):
def __init__(self, db: EvaDBDatabase):
super().__init__(db)

def create(self, table: TableCatalogEntry):
pass

def write(self, table: TableCatalogEntry, rows: Batch):
pass

def read(self, database_name: str, table: TableCatalogEntry) -> Iterator[Batch]:
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
database_name
)
handler = get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
)
handler.connect()

data_df = handler.execute_native_query(f"SELECT * FROM {table.name}").data
yield Batch(pd.DataFrame(data_df))

except Exception as e:
err_msg = f"Failed to read the table {table.name} in data source {database_name} with exception {str(e)}"
logger.exception(err_msg)
raise Exception(err_msg)
2 changes: 2 additions & 0 deletions evadb/storage/storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
from evadb.storage.document_storage_engine import DocumentStorageEngine
from evadb.storage.image_storage_engine import ImageStorageEngine
from evadb.storage.native_storage_engine import NativeStorageEngine
from evadb.storage.pdf_storage_engine import PDFStorageEngine
from evadb.storage.sqlite_storage_engine import SQLStorageEngine
from evadb.storage.video_storage_engine import DecordStorageEngine
Expand All @@ -35,6 +36,7 @@ def _lazy_initialize_storages(cls, db: EvaDBDatabase):
TableType.IMAGE_DATA: ImageStorageEngine,
TableType.DOCUMENT_DATA: DocumentStorageEngine,
TableType.PDF_DATA: PDFStorageEngine,
TableType.NATIVE_DATA: NativeStorageEngine,
}

@classmethod
Expand Down
5 changes: 3 additions & 2 deletions test/binder/test_statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def test_bind_tableref(self, mock_bind_table_info):
tableref.is_table_atom.return_value = True
binder._bind_tableref(tableref)
mock.assert_called_with(
tableref.alias.alias_name, tableref.table.table_name
tableref.alias.alias_name,
tableref.table.database_name,
tableref.table.table_name,
)
mock_bind_table_info.assert_called_once_with(catalog(), tableref.table)

Expand Down Expand Up @@ -176,7 +178,6 @@ def test_bind_func_expr(self, mock_load_udf_class_from_file):
udf_obj.impl_file_path, udf_obj.name
)
self.assertEqual(func_expr.output_objs, [obj1])
print(str(func_expr.alias))
self.assertEqual(
func_expr.alias,
Alias("func_expr", ["out1"]),
Expand Down
7 changes: 4 additions & 3 deletions test/binder/test_statement_binder_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_add_table_alias(self):
ctx = StatementBinderContext(mock_catalog)

mock_check = ctx._check_duplicate_alias = MagicMock()
ctx.add_table_alias("alias", "table_name")
ctx.add_table_alias("alias", None, "table_name")
mock_check.assert_called_with("alias")
mock_get.assert_called_with("table_name")
self.assertEqual(ctx._table_alias_map["alias"], "table_obj")
Expand Down Expand Up @@ -127,9 +127,10 @@ def test_check_table_alias_map(self):
mock_get_column_object.return_value = "catalog_value"
# key exists
ctx = StatementBinderContext(mock_catalog)
ctx._table_alias_map["alias"] = "table_obj"
table_obj = MagicMock()
ctx._table_alias_map["alias"] = table_obj
result = ctx._check_table_alias_map("alias", "col_name")
mock_get_column_object.assert_called_once_with("table_obj", "col_name")
mock_get_column_object.assert_called_once_with(table_obj, "col_name")
self.assertEqual(result, "catalog_value")

# key does not exist
Expand Down
Loading