From c90d8bf0887b89e3325c583312389489f633fa25 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Mon, 21 Aug 2023 17:21:49 -0400 Subject: [PATCH 1/5] update binder and parser --- evadb/binder/binder_utils.py | 64 ++++++++++++++++++- evadb/catalog/catalog_type.py | 4 ++ .../lark_visitor/_common_clauses_ids.py | 12 +++- test/parser/test_parser.py | 18 ++++++ 4 files changed, 94 insertions(+), 4 deletions(-) diff --git a/evadb/binder/binder_utils.py b/evadb/binder/binder_utils.py index b40521994d..d2b7bfd823 100644 --- a/evadb/binder/binder_utils.py +++ b/evadb/binder/binder_utils.py @@ -26,6 +26,7 @@ is_video_table, ) from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.models.column_catalog import ColumnCatalogEntry if TYPE_CHECKING: from evadb.binder.statement_binder_context import StatementBinderContext @@ -37,6 +38,7 @@ from evadb.parser.create_statement import ColumnDefinition from evadb.parser.table_ref import TableInfo, TableRef from evadb.utils.logging_manager import logger +from evadb.third_party.databases.interface import get_database_handler class BinderError(Exception): @@ -45,7 +47,7 @@ class BinderError(Exception): def bind_table_info( catalog: CatalogManager, table_info: TableInfo -) -> TableCatalogEntry: +): """ Uses catalog to bind the table information . @@ -56,6 +58,66 @@ def bind_table_info( Returns: TableCatalogEntry - corresponding table catalog entry for the input table info """ + if table_info.database_name is not None: + bind_native_table_info(catalog, table_info) + else: + bind_evadb_table_info(catalog, table_info) + + +def bind_native_table_info( + catalog: CatalogManager, table_info: TableInfo +): + db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name) + + if db_catalog_entry is not None: + handler = get_database_handler(db_catalog_entry.engine) + + # Get table definition. + resp = handler.get_tables() + if resp.error is not None: + error = "There is no table in data source {}. Create the table using native query.".format( + table_info.database_name, + ) + logger.error(error) + raise BinderError(error) + + # Check table existance. + table_df = resp.data + if table_info.table_name not in table_df["table_name"].values: + error = "Table {} does not exist in data source {}. Create the table using native query.".format( + table_info.table_name, + table_info.database_name, + ) + logger.error(error) + raise BinderError(error) + + # Assemble columns. + column_df = handler.get_columns(table_info.table_name).data + column_list = [] + for column_name in column_df["column_name"]: + column_list.append( + ColumnCatalogEntry(column_name) + ) + + # Assemble table. + table_catalog_entry = TableCatalogEntry( + name=table_info.table_name, + table_type=TableType.NATIVE_DATA, + columns=column_list, + ) + table_info.table_obj = table_catalog_entry + + else: + error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format( + table_info.database_name, + ) + logger.error(error) + raise BinderError(error) + + +def bind_evadb_table_info( + catalog: CatalogManager, table_info: TableInfo +): obj = catalog.get_table_catalog_entry( table_info.table_name, table_info.database_name, diff --git a/evadb/catalog/catalog_type.py b/evadb/catalog/catalog_type.py index 8ca674c55d..f4643bed04 100644 --- a/evadb/catalog/catalog_type.py +++ b/evadb/catalog/catalog_type.py @@ -31,6 +31,10 @@ class TableType(EvaDBEnum): # cannot be accessed/modified directly by user SYSTEM_STRUCTURED_DATA # noqa: F821 + # Reserved for tables that are stored in native + # database backend. + NATIVE_DATA + class ColumnType(EvaDBEnum): BOOLEAN # noqa: F821 diff --git a/evadb/parser/lark_visitor/_common_clauses_ids.py b/evadb/parser/lark_visitor/_common_clauses_ids.py index e04206924a..dcf22e388e 100644 --- a/evadb/parser/lark_visitor/_common_clauses_ids.py +++ b/evadb/parser/lark_visitor/_common_clauses_ids.py @@ -21,15 +21,21 @@ class CommonClauses: def table_name(self, tree): - table_name = self.visit(tree.children[0]) + database_name, table_name = self.visit(tree.children[0]) if table_name is not None: - return TableInfo(table_name=table_name) + return TableInfo(table_name=table_name, database_name=database_name) else: error = "Invalid Table Name" logger.error(error) def full_id(self, tree): - return self.visit(tree.children[0]) + if len(tree.children) == 1: + # Table only + return [None, self.visit(tree.children[0])] + elif len(tree.children) == 2: + # Data source and table + # Ex. DemoDB.TestTable + return [self.visit(tree.children[0]), self.visit(tree.children[1])] def uid(self, tree): return self.visit(tree.children[0]) diff --git a/test/parser/test_parser.py b/test/parser/test_parser.py index 47cfcbbad7..d232f9c8c3 100644 --- a/test/parser/test_parser.py +++ b/test/parser/test_parser.py @@ -51,6 +51,24 @@ class ParserTests(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + def test_select_from_data_source(self): + parser = Parser() + + query = "SELECT * FROM DemoDB.DemoTable" + evadb_stmt_list = parser.parse(query) + + # check stmt itself + self.assertIsInstance(evadb_stmt_list, list) + self.assertEqual(len(evadb_stmt_list), 1) + self.assertEqual(evadb_stmt_list[0].stmt_type, StatementType.SELECT) + + # from_table + select_stmt = evadb_stmt_list[0] + self.assertIsNotNone(select_stmt.from_table) + self.assertIsInstance(select_stmt.from_table, TableRef) + self.assertEqual(select_stmt.from_table.table.table_name, "DemoTable") + self.assertEqual(select_stmt.from_table.table.database_name, "DemoDB") + def test_use_statement(self): parser = Parser() From f421ea126d78c42a85050bca642a9d2f6eb5b200 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Mon, 21 Aug 2023 23:15:05 -0400 Subject: [PATCH 2/5] draft for binder and storage update for running query on native data --- evadb/binder/binder_utils.py | 116 ++++++++++-------- evadb/binder/statement_binder.py | 2 +- evadb/binder/statement_binder_context.py | 40 +++++- evadb/catalog/catalog_type.py | 2 +- evadb/executor/storage_executor.py | 4 +- .../lark_visitor/_common_clauses_ids.py | 11 +- evadb/storage/native_storage_engine.py | 54 ++++++++ evadb/storage/storage_engine.py | 2 + 8 files changed, 172 insertions(+), 59 deletions(-) create mode 100644 evadb/storage/native_storage_engine.py diff --git a/evadb/binder/binder_utils.py b/evadb/binder/binder_utils.py index d2b7bfd823..9458b716b1 100644 --- a/evadb/binder/binder_utils.py +++ b/evadb/binder/binder_utils.py @@ -17,7 +17,7 @@ import re from typing import TYPE_CHECKING, List -from evadb.catalog.catalog_type import TableType +from evadb.catalog.catalog_type import ColumnType, TableType from evadb.catalog.catalog_utils import ( get_video_table_column_definitions, is_document_table, @@ -25,99 +25,119 @@ is_string_col, is_video_table, ) -from evadb.catalog.models.table_catalog import TableCatalogEntry from evadb.catalog.models.column_catalog import ColumnCatalogEntry +from evadb.catalog.models.table_catalog import TableCatalogEntry if TYPE_CHECKING: from evadb.binder.statement_binder_context import StatementBinderContext from evadb.catalog.catalog_manager import CatalogManager + from evadb.expression.abstract_expression import AbstractExpression, ExpressionType from evadb.expression.function_expression import FunctionExpression from evadb.expression.tuple_value_expression import TupleValueExpression from evadb.parser.alias import Alias from evadb.parser.create_statement import ColumnDefinition from evadb.parser.table_ref import TableInfo, TableRef -from evadb.utils.logging_manager import logger from evadb.third_party.databases.interface import get_database_handler +from evadb.utils.logging_manager import logger class BinderError(Exception): pass -def bind_table_info( - catalog: CatalogManager, table_info: TableInfo +def check_data_source_and_table_are_valid( + catalog: CatalogManager, database_name: str, table_name: str ): """ - Uses catalog to bind the table information . - - Arguments: - catalog (CatalogManager): catalog manager to use - table_info (TableInfo): table information obtained from SQL query - - Returns: - TableCatalogEntry - corresponding table catalog entry for the input table info + Validate the database is valid and the requested table in database is + also valid. """ - if table_info.database_name is not None: - bind_native_table_info(catalog, table_info) - else: - bind_evadb_table_info(catalog, table_info) - - -def bind_native_table_info( - catalog: CatalogManager, table_info: TableInfo -): - db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name) + db_catalog_entry = catalog.get_database_catalog_entry(database_name) if db_catalog_entry is not None: - handler = get_database_handler(db_catalog_entry.engine) + handler = get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) + handler.connect() # Get table definition. resp = handler.get_tables() if resp.error is not None: error = "There is no table in data source {}. Create the table using native query.".format( - table_info.database_name, + database_name, ) logger.error(error) raise BinderError(error) # Check table existance. table_df = resp.data - if table_info.table_name not in table_df["table_name"].values: + if table_name not in table_df["table_name"].values: error = "Table {} does not exist in data source {}. Create the table using native query.".format( - table_info.table_name, - table_info.database_name, + table_name, + database_name, ) logger.error(error) raise BinderError(error) - - # Assemble columns. - column_df = handler.get_columns(table_info.table_name).data - column_list = [] - for column_name in column_df["column_name"]: - column_list.append( - ColumnCatalogEntry(column_name) - ) - - # Assemble table. - table_catalog_entry = TableCatalogEntry( - name=table_info.table_name, - table_type=TableType.NATIVE_DATA, - columns=column_list, - ) - table_info.table_obj = table_catalog_entry - else: error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format( - table_info.database_name, + database_name, ) logger.error(error) raise BinderError(error) -def bind_evadb_table_info( - catalog: CatalogManager, table_info: TableInfo +def create_table_catalog_entry_for_data_source( + table_name: str, column_name_list: List[str] ): + column_list = [] + for column_name in column_name_list: + column_list.append(ColumnCatalogEntry(column_name, ColumnType.ANY)) + + # Assemble table. + table_catalog_entry = TableCatalogEntry( + name=table_name, + file_url=None, + table_type=TableType.NATIVE_DATA, + columns=column_list, + ) + return table_catalog_entry + + +def bind_table_info(catalog: CatalogManager, table_info: TableInfo): + """ + Uses catalog to bind the table information . + + Arguments: + catalog (CatalogManager): catalog manager to use + table_info (TableInfo): table information obtained from SQL query + + Returns: + TableCatalogEntry - corresponding table catalog entry for the input table info + """ + if table_info.database_name is not None: + bind_native_table_info(catalog, table_info) + else: + bind_evadb_table_info(catalog, table_info) + + +def bind_native_table_info(catalog: CatalogManager, table_info: TableInfo): + check_data_source_and_table_are_valid( + catalog, table_info.database_name, table_info.table_name + ) + + db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name) + handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params) + handler.connect() + + # Assemble columns. + column_df = handler.get_columns(table_info.table_name).data + table_info.table_obj = create_table_catalog_entry_for_data_source( + table_info.table_name, list(column_df["column_name"]) + ) + + +def bind_evadb_table_info(catalog: CatalogManager, table_info: TableInfo): obj = catalog.get_table_catalog_entry( table_info.table_name, table_info.database_name, diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index fd80ca38b8..f57dbf6291 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -212,7 +212,7 @@ def _bind_tableref(self, node: TableRef): if node.is_table_atom(): # Table self._binder_context.add_table_alias( - node.alias.alias_name, node.table.table_name + node.alias.alias_name, node.table.database_name, node.table.table_name ) bind_table_info(self._catalog(), node.table) elif node.is_select(): diff --git a/evadb/binder/statement_binder_context.py b/evadb/binder/statement_binder_context.py index 73cc25c8fe..84ee310fe1 100644 --- a/evadb/binder/statement_binder_context.py +++ b/evadb/binder/statement_binder_context.py @@ -14,12 +14,18 @@ # limitations under the License. from typing import Callable, Dict, List, Tuple, Union -from evadb.binder.binder_utils import BinderError +from evadb.binder.binder_utils import ( + BinderError, + check_data_source_and_table_are_valid, + create_table_catalog_entry_for_data_source, +) +from evadb.catalog.catalog_type import TableType from evadb.catalog.models.column_catalog import ColumnCatalogEntry from evadb.catalog.models.table_catalog import TableCatalogEntry from evadb.catalog.models.udf_io_catalog import UdfIOCatalogEntry from evadb.expression.function_expression import FunctionExpression from evadb.expression.tuple_value_expression import TupleValueExpression +from evadb.third_party.databases.interface import get_database_handler from evadb.utils.logging_manager import logger CatalogColumnType = Union[ColumnCatalogEntry, UdfIOCatalogEntry] @@ -64,7 +70,7 @@ def _check_duplicate_alias(self, alias: str): logger.error(err_msg) raise BinderError(err_msg) - def add_table_alias(self, alias: str, table_name: str): + def add_table_alias(self, alias: str, database_name: str, table_name: str): """ Add a alias -> table_name mapping Arguments: @@ -72,7 +78,26 @@ def add_table_alias(self, alias: str, table_name: str): table_name (str): name of the table """ self._check_duplicate_alias(alias) - table_obj = self._catalog().get_table_catalog_entry(table_name) + + if database_name is not None: + check_data_source_and_table_are_valid( + self._catalog(), database_name, table_name + ) + + db_catalog_entry = self._catalog().get_database_catalog_entry(database_name) + handler = get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) + handler.connect() + + # Assemble columns. + column_df = handler.get_columns(table_name).data + table_obj = create_table_catalog_entry_for_data_source( + table_name, list(column_df["column_name"]) + ) + else: + table_obj = self._catalog().get_table_catalog_entry(table_name) + self._table_alias_map[alias] = table_obj def add_derived_table_alias( @@ -143,8 +168,13 @@ def _check_table_alias_map(self, alias, col_name) -> ColumnCatalogEntry: column object """ table_obj = self._table_alias_map.get(alias, None) - if table_obj: - return self._catalog().get_column_catalog_entry(table_obj, col_name) + if table_obj is not None: + if table_obj.table_type == TableType.NATIVE_DATA: + for column_catalog_entry in table_obj.columns: + if column_catalog_entry.name == col_name: + return column_catalog_entry + else: + return self._catalog().get_column_catalog_entry(table_obj, col_name) def _check_derived_table_alias_map(self, alias, col_name) -> CatalogColumnType: """ diff --git a/evadb/catalog/catalog_type.py b/evadb/catalog/catalog_type.py index f4643bed04..eadaa50da5 100644 --- a/evadb/catalog/catalog_type.py +++ b/evadb/catalog/catalog_type.py @@ -33,7 +33,7 @@ class TableType(EvaDBEnum): # Reserved for tables that are stored in native # database backend. - NATIVE_DATA + NATIVE_DATA # noqa: F821 class ColumnType(EvaDBEnum): diff --git a/evadb/executor/storage_executor.py b/evadb/executor/storage_executor.py index d4b0149337..38181cd472 100644 --- a/evadb/executor/storage_executor.py +++ b/evadb/executor/storage_executor.py @@ -48,11 +48,13 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: return storage_engine.read(self.node.table, self.node.chunk_params) elif self.node.table.table_type == TableType.STRUCTURED_DATA: return storage_engine.read(self.node.table, self.node.batch_mem_size) + elif self.node.table.table_type == TableType.NATIVE_DATA: + return storage_engine.read(self.node.table_ref.table.database_name, self.node.table) elif self.node.table.table_type == TableType.PDF_DATA: return storage_engine.read(self.node.table) else: raise ExecutorError( - f"Unsupported TableType {self.node.table.table_type} encountered" + f"Unsupported TableType {self.node.table.table_type} encountered" ) except Exception as e: logger.error(e) diff --git a/evadb/parser/lark_visitor/_common_clauses_ids.py b/evadb/parser/lark_visitor/_common_clauses_ids.py index dcf22e388e..4dd3080dd3 100644 --- a/evadb/parser/lark_visitor/_common_clauses_ids.py +++ b/evadb/parser/lark_visitor/_common_clauses_ids.py @@ -21,7 +21,12 @@ class CommonClauses: def table_name(self, tree): - database_name, table_name = self.visit(tree.children[0]) + child = self.visit(tree.children[0]) + if isinstance(child, tuple): + database_name, table_name = child[0], child[1] + else: + database_name, table_name = None, child + if table_name is not None: return TableInfo(table_name=table_name, database_name=database_name) else: @@ -31,11 +36,11 @@ def table_name(self, tree): def full_id(self, tree): if len(tree.children) == 1: # Table only - return [None, self.visit(tree.children[0])] + return self.visit(tree.children[0]) elif len(tree.children) == 2: # Data source and table # Ex. DemoDB.TestTable - return [self.visit(tree.children[0]), self.visit(tree.children[1])] + return (self.visit(tree.children[0]), self.visit(tree.children[1])) def uid(self, tree): return self.visit(tree.children[0]) diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py new file mode 100644 index 0000000000..517c42bc73 --- /dev/null +++ b/evadb/storage/native_storage_engine.py @@ -0,0 +1,54 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator + +import pandas as pd + +from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.sql_config import IDENTIFIER_COLUMN +from evadb.database import EvaDBDatabase +from evadb.models.storage.batch import Batch +from evadb.storage.abstract_storage_engine import AbstractStorageEngine +from evadb.utils.logging_manager import logger +from evadb.third_party.databases.interface import get_database_handler + + +class NativeStorageEngine(AbstractStorageEngine): + def __init__(self, db: EvaDBDatabase): + super().__init__(db) + + def create(self, table: TableCatalogEntry): + pass + + def write(self, table: TableCatalogEntry, rows: Batch): + pass + + def read( + self, database_name: str, table: TableCatalogEntry + ) -> Iterator[Batch]: + try: + db_catalog_entry = self.db.catalog().get_database_catalog_entry(database_name) + handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params) + handler.connect() + + data_df = handler.execute_native_query(f"SELECT * FROM {table.name}").data + pk_df = pd.DataFrame({IDENTIFIER_COLUMN: [i + 1 for i in range(len(data_df))]}) + data_df = pd.concat([data_df, pk_df], axis=1) + yield Batch(pd.DataFrame(data_df)) + + except Exception as e: + err_msg = f"Failed to read the table {table.name} in data source {database_name} with exception {str(e)}" + logger.exception(err_msg) + raise Exception(err_msg) diff --git a/evadb/storage/storage_engine.py b/evadb/storage/storage_engine.py index bdba0f38a0..5d325fde9c 100644 --- a/evadb/storage/storage_engine.py +++ b/evadb/storage/storage_engine.py @@ -21,6 +21,7 @@ from evadb.storage.pdf_storage_engine import PDFStorageEngine from evadb.storage.sqlite_storage_engine import SQLStorageEngine from evadb.storage.video_storage_engine import DecordStorageEngine +from evadb.storage.native_storage_engine import NativeStorageEngine class StorageEngine: @@ -35,6 +36,7 @@ def _lazy_initialize_storages(cls, db: EvaDBDatabase): TableType.IMAGE_DATA: ImageStorageEngine, TableType.DOCUMENT_DATA: DocumentStorageEngine, TableType.PDF_DATA: PDFStorageEngine, + TableType.NATIVE_DATA: NativeStorageEngine, } @classmethod From 8a52af7f15c3250513530f0f0c274bc2f4d11dc8 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Tue, 22 Aug 2023 13:53:23 -0400 Subject: [PATCH 3/5] add test cases and fix binder test --- evadb/executor/storage_executor.py | 4 +- evadb/storage/native_storage_engine.py | 17 +++-- evadb/storage/storage_engine.py | 2 +- test/binder/test_statement_binder.py | 5 +- test/binder/test_statement_binder_context.py | 7 +- .../third_party_tests/test_native_executor.py | 71 ++++++++++++------- 6 files changed, 65 insertions(+), 41 deletions(-) diff --git a/evadb/executor/storage_executor.py b/evadb/executor/storage_executor.py index 38181cd472..ce2853d686 100644 --- a/evadb/executor/storage_executor.py +++ b/evadb/executor/storage_executor.py @@ -49,7 +49,9 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: elif self.node.table.table_type == TableType.STRUCTURED_DATA: return storage_engine.read(self.node.table, self.node.batch_mem_size) elif self.node.table.table_type == TableType.NATIVE_DATA: - return storage_engine.read(self.node.table_ref.table.database_name, self.node.table) + return storage_engine.read( + self.node.table_ref.table.database_name, self.node.table + ) elif self.node.table.table_type == TableType.PDF_DATA: return storage_engine.read(self.node.table) else: diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py index 517c42bc73..d56557ed9f 100644 --- a/evadb/storage/native_storage_engine.py +++ b/evadb/storage/native_storage_engine.py @@ -17,12 +17,11 @@ import pandas as pd from evadb.catalog.models.table_catalog import TableCatalogEntry -from evadb.catalog.sql_config import IDENTIFIER_COLUMN from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.storage.abstract_storage_engine import AbstractStorageEngine -from evadb.utils.logging_manager import logger from evadb.third_party.databases.interface import get_database_handler +from evadb.utils.logging_manager import logger class NativeStorageEngine(AbstractStorageEngine): @@ -35,17 +34,17 @@ def create(self, table: TableCatalogEntry): def write(self, table: TableCatalogEntry, rows: Batch): pass - def read( - self, database_name: str, table: TableCatalogEntry - ) -> Iterator[Batch]: + def read(self, database_name: str, table: TableCatalogEntry) -> Iterator[Batch]: try: - db_catalog_entry = self.db.catalog().get_database_catalog_entry(database_name) - handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params) + db_catalog_entry = self.db.catalog().get_database_catalog_entry( + database_name + ) + handler = get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) handler.connect() data_df = handler.execute_native_query(f"SELECT * FROM {table.name}").data - pk_df = pd.DataFrame({IDENTIFIER_COLUMN: [i + 1 for i in range(len(data_df))]}) - data_df = pd.concat([data_df, pk_df], axis=1) yield Batch(pd.DataFrame(data_df)) except Exception as e: diff --git a/evadb/storage/storage_engine.py b/evadb/storage/storage_engine.py index 5d325fde9c..95c5c76f48 100644 --- a/evadb/storage/storage_engine.py +++ b/evadb/storage/storage_engine.py @@ -18,10 +18,10 @@ from evadb.storage.abstract_storage_engine import AbstractStorageEngine from evadb.storage.document_storage_engine import DocumentStorageEngine from evadb.storage.image_storage_engine import ImageStorageEngine +from evadb.storage.native_storage_engine import NativeStorageEngine from evadb.storage.pdf_storage_engine import PDFStorageEngine from evadb.storage.sqlite_storage_engine import SQLStorageEngine from evadb.storage.video_storage_engine import DecordStorageEngine -from evadb.storage.native_storage_engine import NativeStorageEngine class StorageEngine: diff --git a/test/binder/test_statement_binder.py b/test/binder/test_statement_binder.py index de942bc6b4..ae6227bcff 100644 --- a/test/binder/test_statement_binder.py +++ b/test/binder/test_statement_binder.py @@ -48,7 +48,9 @@ def test_bind_tableref(self, mock_bind_table_info): tableref.is_table_atom.return_value = True binder._bind_tableref(tableref) mock.assert_called_with( - tableref.alias.alias_name, tableref.table.table_name + tableref.alias.alias_name, + tableref.table.database_name, + tableref.table.table_name, ) mock_bind_table_info.assert_called_once_with(catalog(), tableref.table) @@ -176,7 +178,6 @@ def test_bind_func_expr(self, mock_load_udf_class_from_file): udf_obj.impl_file_path, udf_obj.name ) self.assertEqual(func_expr.output_objs, [obj1]) - print(str(func_expr.alias)) self.assertEqual( func_expr.alias, Alias("func_expr", ["out1"]), diff --git a/test/binder/test_statement_binder_context.py b/test/binder/test_statement_binder_context.py index 53a93f7180..5b7e2225a6 100644 --- a/test/binder/test_statement_binder_context.py +++ b/test/binder/test_statement_binder_context.py @@ -50,7 +50,7 @@ def test_add_table_alias(self): ctx = StatementBinderContext(mock_catalog) mock_check = ctx._check_duplicate_alias = MagicMock() - ctx.add_table_alias("alias", "table_name") + ctx.add_table_alias("alias", None, "table_name") mock_check.assert_called_with("alias") mock_get.assert_called_with("table_name") self.assertEqual(ctx._table_alias_map["alias"], "table_obj") @@ -127,9 +127,10 @@ def test_check_table_alias_map(self): mock_get_column_object.return_value = "catalog_value" # key exists ctx = StatementBinderContext(mock_catalog) - ctx._table_alias_map["alias"] = "table_obj" + table_obj = MagicMock() + ctx._table_alias_map["alias"] = table_obj result = ctx._check_table_alias_map("alias", "col_name") - mock_get_column_object.assert_called_once_with("table_obj", "col_name") + mock_get_column_object.assert_called_once_with(table_obj, "col_name") self.assertEqual(result, "catalog_value") # key does not exist diff --git a/test/third_party_tests/test_native_executor.py b/test/third_party_tests/test_native_executor.py index e2a619a1c3..8ac803fd57 100644 --- a/test/third_party_tests/test_native_executor.py +++ b/test/third_party_tests/test_native_executor.py @@ -29,54 +29,75 @@ def setUp(self): def tearDown(self): shutdown_ray() + self._drop_table_in_native_database() - def _simple_execute(self): - # Create table. + def _create_table_in_native_database(self): execute_query_fetch_all( self.evadb, """USE test_data_source { CREATE TABLE test_table ( name VARCHAR(10), age INT, - comment VARCHAR(100) + comment VARCHAR (100) ) - };""", + }""", ) + + def _insert_value_into_native_database(self, col1, col2, col3): execute_query_fetch_all( self.evadb, - """USE test_data_source { + f"""USE test_data_source {{ INSERT INTO test_table ( name, age, comment ) VALUES ( - 'aa', 1, 'aaaa' + '{col1}', {col2}, '{col3}' ) - } - """, + }}""", + ) + + def _drop_table_in_native_database(self): + execute_query_fetch_all( + self.evadb, + """USE test_data_source { + DROP TABLE IF EXISTS test_table + }""", ) - # Select. + def _execute_evadb_query(self): + self._create_table_in_native_database() + self._insert_value_into_native_database("aa", 1, "aaaa") + self._insert_value_into_native_database("bb", 2, "bbbb") + + res_batch = execute_query_fetch_all( + self.evadb, + "SELECT * FROM test_data_source.test_table", + ) + self.assertEqual(len(res_batch), 2) + self.assertEqual(res_batch.frames["test_table.name"][0], "aa") + self.assertEqual(res_batch.frames["test_table.age"][0], 1) + self.assertEqual(res_batch.frames["test_table.name"][1], "bb") + self.assertEqual(res_batch.frames["test_table.age"][1], 2) + + self._drop_table_in_native_database() + + def _execute_native_query(self): + self._create_table_in_native_database() + self._insert_value_into_native_database("aa", 1, "aaaa") + res_batch = execute_query_fetch_all( self.evadb, """USE test_data_source { SELECT * FROM test_table - } - """, + }""", ) self.assertEqual(len(res_batch), 1) self.assertEqual(res_batch.frames["name"][0], "aa") self.assertEqual(res_batch.frames["age"][0], 1) self.assertEqual(res_batch.frames["comment"][0], "aaaa") - # DROP table. - execute_query_fetch_all( - self.evadb, - """USE test_data_source { - DROP TABLE test_table - } - """, - ) + self._drop_table_in_native_database() - def test_should_run_simple_query_in_postgres(self): + def test_should_run_query_in_postgres(self): # Create database. params = { "user": "eva", @@ -85,11 +106,11 @@ def test_should_run_simple_query_in_postgres(self): "port": "5432", "database": "evadb", } - query = """CREATE DATABASE test_data_source + query = f"""CREATE DATABASE test_data_source WITH ENGINE = "postgres", - PARAMETERS = {};""".format( - params - ) + PARAMETERS = {params};""" execute_query_fetch_all(self.evadb, query) - self._simple_execute() + # Test executions. + self._execute_native_query() + self._execute_evadb_query() From 3074eac1689e58476f72cb594de7f7669c2fee54 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Tue, 22 Aug 2023 14:02:46 -0400 Subject: [PATCH 4/5] add a binder test for non-existent data source --- test/integration_tests/test_select_executor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/integration_tests/test_select_executor.py b/test/integration_tests/test_select_executor.py index b92def493e..030a12585c 100644 --- a/test/integration_tests/test_select_executor.py +++ b/test/integration_tests/test_select_executor.py @@ -118,6 +118,12 @@ def test_should_load_and_select_in_table(self): expected_batch = list(create_dummy_batches()) self.assertEqual([actual_batch], expected_batch) + def test_should_raise_binder_error_on_native_datasource(self): + select_query = "SELECT * FROM test.MyVideo" + self.assertRaises( + BinderError, execute_query_fetch_all, self.evadb, select_query + ) + def test_should_select_star_in_table(self): select_query = "SELECT * FROM MyVideo;" actual_batch = execute_query_fetch_all(self.evadb, select_query) From 9c8df5fad7b0ce34d5b4526a43da24a116b727c2 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Tue, 22 Aug 2023 17:29:53 -0400 Subject: [PATCH 5/5] update circle ci configuration --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 1a6ac2ace6..76dfac659c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -249,7 +249,7 @@ jobs: name: Run integration tests command: | source test_evadb/bin/activate - PYTHONPATH="." python -m pytest test/third_party_tests/test_native_executor.py -k test_should_run_simple_query_in_postgres + PYTHONPATH="." python -m pytest test/third_party_tests/test_native_executor.py -k test_should_run_query_in_postgres Windows: executor: win/default