Skip to content

Commit

Permalink
draft for binder and storage update for running query on native data
Browse files Browse the repository at this point in the history
  • Loading branch information
jiashenC committed Aug 22, 2023
1 parent bb3e372 commit 174b306
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 59 deletions.
116 changes: 68 additions & 48 deletions evadb/binder/binder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,127 @@
import re
from typing import TYPE_CHECKING, List

from evadb.catalog.catalog_type import TableType
from evadb.catalog.catalog_type import ColumnType, TableType
from evadb.catalog.catalog_utils import (
get_video_table_column_definitions,
is_document_table,
is_pdf_table,
is_string_col,
is_video_table,
)
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry

if TYPE_CHECKING:
from evadb.binder.statement_binder_context import StatementBinderContext
from evadb.catalog.catalog_manager import CatalogManager

from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.parser.alias import Alias
from evadb.parser.create_statement import ColumnDefinition
from evadb.parser.table_ref import TableInfo, TableRef
from evadb.utils.logging_manager import logger
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger


class BinderError(Exception):
pass


def bind_table_info(
catalog: CatalogManager, table_info: TableInfo
def check_data_source_and_table_are_valid(
catalog: CatalogManager, database_name: str, table_name: str
):
"""
Uses catalog to bind the table information .
Arguments:
catalog (CatalogManager): catalog manager to use
table_info (TableInfo): table information obtained from SQL query
Returns:
TableCatalogEntry - corresponding table catalog entry for the input table info
Validate the database is valid and the requested table in database is
also valid.
"""
if table_info.database_name is not None:
bind_native_table_info(catalog, table_info)
else:
bind_evadb_table_info(catalog, table_info)


def bind_native_table_info(
catalog: CatalogManager, table_info: TableInfo
):
db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name)
db_catalog_entry = catalog.get_database_catalog_entry(database_name)

if db_catalog_entry is not None:
handler = get_database_handler(db_catalog_entry.engine)
handler = get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
)
handler.connect()

# Get table definition.
resp = handler.get_tables()
if resp.error is not None:
error = "There is no table in data source {}. Create the table using native query.".format(
table_info.database_name,
database_name,
)
logger.error(error)
raise BinderError(error)

# Check table existance.
table_df = resp.data
if table_info.table_name not in table_df["table_name"].values:
if table_name not in table_df["table_name"].values:
error = "Table {} does not exist in data source {}. Create the table using native query.".format(
table_info.table_name,
table_info.database_name,
table_name,
database_name,
)
logger.error(error)
raise BinderError(error)

# Assemble columns.
column_df = handler.get_columns(table_info.table_name).data
column_list = []
for column_name in column_df["column_name"]:
column_list.append(
ColumnCatalogEntry(column_name)
)

# Assemble table.
table_catalog_entry = TableCatalogEntry(
name=table_info.table_name,
table_type=TableType.NATIVE_DATA,
columns=column_list,
)
table_info.table_obj = table_catalog_entry

else:
error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format(
table_info.database_name,
database_name,
)
logger.error(error)
raise BinderError(error)


def bind_evadb_table_info(
catalog: CatalogManager, table_info: TableInfo
def create_table_catalog_entry_for_data_source(
table_name: str, column_name_list: List[str]
):
column_list = []
for column_name in column_name_list:
column_list.append(ColumnCatalogEntry(column_name, ColumnType.ANY))

# Assemble table.
table_catalog_entry = TableCatalogEntry(
name=table_name,
file_url=None,
table_type=TableType.NATIVE_DATA,
columns=column_list,
)
return table_catalog_entry


def bind_table_info(catalog: CatalogManager, table_info: TableInfo):
"""
Uses catalog to bind the table information .
Arguments:
catalog (CatalogManager): catalog manager to use
table_info (TableInfo): table information obtained from SQL query
Returns:
TableCatalogEntry - corresponding table catalog entry for the input table info
"""
if table_info.database_name is not None:
bind_native_table_info(catalog, table_info)
else:
bind_evadb_table_info(catalog, table_info)


def bind_native_table_info(catalog: CatalogManager, table_info: TableInfo):
check_data_source_and_table_are_valid(
catalog, table_info.database_name, table_info.table_name
)

db_catalog_entry = catalog.get_database_catalog_entry(table_info.database_name)
handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params)
handler.connect()

# Assemble columns.
column_df = handler.get_columns(table_info.table_name).data
table_info.table_obj = create_table_catalog_entry_for_data_source(
table_info.table_name, list(column_df["column_name"])
)


def bind_evadb_table_info(catalog: CatalogManager, table_info: TableInfo):
obj = catalog.get_table_catalog_entry(
table_info.table_name,
table_info.database_name,
Expand Down
2 changes: 1 addition & 1 deletion evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def _bind_tableref(self, node: TableRef):
if node.is_table_atom():
# Table
self._binder_context.add_table_alias(
node.alias.alias_name, node.table.table_name
node.alias.alias_name, node.table.database_name, node.table.table_name
)
bind_table_info(self._catalog(), node.table)
elif node.is_select():
Expand Down
40 changes: 35 additions & 5 deletions evadb/binder/statement_binder_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
# limitations under the License.
from typing import Callable, Dict, List, Tuple, Union

from evadb.binder.binder_utils import BinderError
from evadb.binder.binder_utils import (
BinderError,
check_data_source_and_table_are_valid,
create_table_catalog_entry_for_data_source,
)
from evadb.catalog.catalog_type import TableType
from evadb.catalog.models.column_catalog import ColumnCatalogEntry
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.models.udf_io_catalog import UdfIOCatalogEntry
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.logging_manager import logger

CatalogColumnType = Union[ColumnCatalogEntry, UdfIOCatalogEntry]
Expand Down Expand Up @@ -64,15 +70,34 @@ def _check_duplicate_alias(self, alias: str):
logger.error(err_msg)
raise BinderError(err_msg)

def add_table_alias(self, alias: str, table_name: str):
def add_table_alias(self, alias: str, database_name: str, table_name: str):
"""
Add a alias -> table_name mapping
Arguments:
alias (str): name of alias
table_name (str): name of the table
"""
self._check_duplicate_alias(alias)
table_obj = self._catalog().get_table_catalog_entry(table_name)

if database_name is not None:
check_data_source_and_table_are_valid(
self._catalog(), database_name, table_name
)

db_catalog_entry = self._catalog().get_database_catalog_entry(database_name)
handler = get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
)
handler.connect()

# Assemble columns.
column_df = handler.get_columns(table_name).data
table_obj = create_table_catalog_entry_for_data_source(
table_name, list(column_df["column_name"])
)
else:
table_obj = self._catalog().get_table_catalog_entry(table_name)

self._table_alias_map[alias] = table_obj

def add_derived_table_alias(
Expand Down Expand Up @@ -143,8 +168,13 @@ def _check_table_alias_map(self, alias, col_name) -> ColumnCatalogEntry:
column object
"""
table_obj = self._table_alias_map.get(alias, None)
if table_obj:
return self._catalog().get_column_catalog_entry(table_obj, col_name)
if table_obj is not None:
if table_obj.table_type == TableType.NATIVE_DATA:
for column_catalog_entry in table_obj.columns:
if column_catalog_entry.name == col_name:
return column_catalog_entry
else:
return self._catalog().get_column_catalog_entry(table_obj, col_name)

def _check_derived_table_alias_map(self, alias, col_name) -> CatalogColumnType:
"""
Expand Down
2 changes: 1 addition & 1 deletion evadb/catalog/catalog_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TableType(EvaDBEnum):

# Reserved for tables that are stored in native
# database backend.
NATIVE_DATA
NATIVE_DATA # noqa: F821


class ColumnType(EvaDBEnum):
Expand Down
4 changes: 3 additions & 1 deletion evadb/executor/storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
return storage_engine.read(self.node.table, self.node.chunk_params)
elif self.node.table.table_type == TableType.STRUCTURED_DATA:
return storage_engine.read(self.node.table, self.node.batch_mem_size)
elif self.node.table.table_type == TableType.NATIVE_DATA:
return storage_engine.read(self.node.table_ref.table.database_name, self.node.table)
elif self.node.table.table_type == TableType.PDF_DATA:
return storage_engine.read(self.node.table)
else:
raise ExecutorError(
f"Unsupported TableType {self.node.table.table_type} encountered"
f"Unsupported TableType {self.node.table.table_type} encountered"
)
except Exception as e:
logger.error(e)
Expand Down
11 changes: 8 additions & 3 deletions evadb/parser/lark_visitor/_common_clauses_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

class CommonClauses:
def table_name(self, tree):
database_name, table_name = self.visit(tree.children[0])
child = self.visit(tree.children[0])
if isinstance(child, tuple):
database_name, table_name = child[0], child[1]
else:
database_name, table_name = None, child

if table_name is not None:
return TableInfo(table_name=table_name, database_name=database_name)
else:
Expand All @@ -31,11 +36,11 @@ def table_name(self, tree):
def full_id(self, tree):
if len(tree.children) == 1:
# Table only
return [None, self.visit(tree.children[0])]
return self.visit(tree.children[0])
elif len(tree.children) == 2:
# Data source and table
# Ex. DemoDB.TestTable
return [self.visit(tree.children[0]), self.visit(tree.children[1])]
return (self.visit(tree.children[0]), self.visit(tree.children[1]))

def uid(self, tree):
return self.visit(tree.children[0])
Expand Down
54 changes: 54 additions & 0 deletions evadb/storage/native_storage_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

import pandas as pd

from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
from evadb.utils.logging_manager import logger
from evadb.third_party.databases.interface import get_database_handler


class NativeStorageEngine(AbstractStorageEngine):
def __init__(self, db: EvaDBDatabase):
super().__init__(db)

def create(self, table: TableCatalogEntry):
pass

def write(self, table: TableCatalogEntry, rows: Batch):
pass

def read(
self, database_name: str, table: TableCatalogEntry
) -> Iterator[Batch]:
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(database_name)
handler = get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params)
handler.connect()

data_df = handler.execute_native_query(f"SELECT * FROM {table.name}").data
pk_df = pd.DataFrame({IDENTIFIER_COLUMN: [i + 1 for i in range(len(data_df))]})
data_df = pd.concat([data_df, pk_df], axis=1)
yield Batch(pd.DataFrame(data_df))

except Exception as e:
err_msg = f"Failed to read the table {table.name} in data source {database_name} with exception {str(e)}"
logger.exception(err_msg)
raise Exception(err_msg)
2 changes: 2 additions & 0 deletions evadb/storage/storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from evadb.storage.pdf_storage_engine import PDFStorageEngine
from evadb.storage.sqlite_storage_engine import SQLStorageEngine
from evadb.storage.video_storage_engine import DecordStorageEngine
from evadb.storage.native_storage_engine import NativeStorageEngine


class StorageEngine:
Expand All @@ -35,6 +36,7 @@ def _lazy_initialize_storages(cls, db: EvaDBDatabase):
TableType.IMAGE_DATA: ImageStorageEngine,
TableType.DOCUMENT_DATA: DocumentStorageEngine,
TableType.PDF_DATA: PDFStorageEngine,
TableType.NATIVE_DATA: NativeStorageEngine,
}

@classmethod
Expand Down

0 comments on commit 174b306

Please sign in to comment.