From 4ba5cf3905e225f1a59a755cfcf0341959062a5d Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Sun, 31 Jul 2022 12:47:14 -0400 Subject: [PATCH 01/18] docs: clean up --- .pre-commit-config.yaml | 18 ----- CONTRIBUTING.md | 116 ---------------------------- README.md | 2 +- api-docs/conf.py | 11 ++- api-docs/source/contribute/index.md | 4 +- 5 files changed, 8 insertions(+), 143 deletions(-) delete mode 100644 .pre-commit-config.yaml delete mode 100644 CONTRIBUTING.md diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index f93de8729..000000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,18 +0,0 @@ -repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v3.1.0 - hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-added-large-files -- repo: https://github.com/pre-commit/mirrors-autopep8 - rev: v1.5.3 - hooks: - - id: autopep8 - args: ['-i', '--select=E,F', '--max-line-length=88', '--exclude=eva/filters, eva/parser/evaql'] -- repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.3 - hooks: - - id: flake8 - additional_dependencies: [flake8-typing-imports==1.6.0] - args: ['--select=E,F', '--max-line-length=88', '--exclude=eva/filters, eva/parser/evaql'] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md deleted file mode 100644 index dba205ee3..000000000 --- a/CONTRIBUTING.md +++ /dev/null @@ -1,116 +0,0 @@ -# Contributing to EVA - -## Setting up Development Environment - -### Installation - -Installation of EVA involves setting a virtual environment using [miniconda](https://conda.io/projects/conda/en/latest/user-guide/install/index.html) and configuring git hooks. - -1. Clone the repository -```shell -git clone https://github.com/georgia-tech-db/eva.git -``` - -2. Install the dependencies. -```shell -sh script/install/before_install.sh -export PATH="$HOME/miniconda/bin:$PATH" -sh script/install/install.sh -``` - - - -### Client Testing - -1. Set up the server and client - -- Activate the conda environment: `conda activate eva` - -- Launch EVA database Server: `python eva/eva_server.py` - -- Launch CLI: `python eva/eva_cmd_client.py` - -2. Run the `UPLOAD` command in the client terminal: -```mysql -UPLOAD INFILE 'data/ua_detrac/ua_detrac.mp4' PATH 'test_video.mp4'; -``` - -3. Run the `LOAD` command in the client terminal: (may take a while) -```mysql -LOAD DATA INFILE 'test_video.mp4' INTO MyVideo; -``` - -4. Below is a basic query that should work on the client -```mysql -SELECT id, data FROM MyVideo WHERE id < 5; -``` - -### Configure GPU (Recommended) - -1. If your workstation has a GPU, you need to first set it up and configure it. You can run the following command first to check your hardware capabilities. - - ``` - ubuntu-drivers devices - ``` - - If you do have an NVIDIA GPU, and its not been configured yet, follow all the steps in this link carefully. `https://towardsdatascience.com/deep-learning-gpu-installation-on-ubuntu-18-4-9b12230a1d31`. - - Some pointers: - - When installing NVIDIA drivers, check the correct driver version for your GPU to avoid compatibiility issues. - - When installing cuDNN, you will have to create an account. Make sure you get the correct deb files for your OS and architecture. - -2. You can run the following code in a jupyter instance to verify your GPU is working well along with PyTorch. - - ``` - import torch - device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') - print(device) - ``` - - Output of `cuda:0` indicates the presence of a GPU. (Note: 0 indicates the index of the GPU in system. Incase you have multiple GPUs, the index needs to be accordingly changed) - -2. Now configure the `executor` section in `eva/eva.yml` as follows: - - ``` - gpus: {'127.0.1.1': [0]} - ``` - - `127.0.1.1` is the loopback address on which the eva server is started. 0 refers to the GPU index to be used. - -## Commiting and Testing - -1. Install git hooks in your .git/ directory. [optional, but recommended] -```shell -conda activate eva -pre-commit install -``` - -2. Ensure that all the unit test cases (including the ones you have added) run succesfully and the coding style conventions are followed. -```shell -bash script/test/test.sh -``` - -## Packaging New Version of EVA - -1. Generate EVA grammar files. -```shell -bash script/antlr4/generate_parser.sh -``` - -2. Bump up version number in `setup.cfg` along with any additional dependencies. - -3. Create a new build locally. -```shell -python -m build -``` - -4. Upload build to pypi using credentials. -```shell -python -m twine upload dist/* -``` - - -## Issues and PR's - -To file a bug or request a feature, please file a GitHub issue. Pull requests are welcome. diff --git a/README.md b/README.md index c9cca1366..dbe625d05 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ python3 -m venv env38 # to create a virtual envir pip install --upgrade pip sudo -E apt install -y openjdk-8-jdk openjdk-8-jre # to install JAVA sh script/antlr4/generate_parser.sh # to generate the EVA parser -python -m pip install install -e .[dev] +python -m pip install -e . ```

diff --git a/api-docs/conf.py b/api-docs/conf.py index e1b6798b6..88a764afe 100644 --- a/api-docs/conf.py +++ b/api-docs/conf.py @@ -14,7 +14,6 @@ import warnings import os import sys - sys.path.append(os.path.abspath('.')) sys.path.append(os.path.abspath('../')) @@ -26,13 +25,13 @@ # -- Project information ----------------------------------------------------- - -project = 'EVA' -copyright = str(datetime.now().year) + ', Georgia Tech Database Group' -author = 'Georgia Tech Database Group' +project = "evadb" +author = "Georgia Tech Database Group" +copyright = str(datetime.now().year) + f', {author}' # The full version, including alpha/beta/rc tags -release = '0.0.1' +from eva.version import __version__ as version +release = version master_doc = 'index' diff --git a/api-docs/source/contribute/index.md b/api-docs/source/contribute/index.md index a65fe3109..eb1d65e71 100644 --- a/api-docs/source/contribute/index.md +++ b/api-docs/source/contribute/index.md @@ -16,7 +16,7 @@ python3 -m venv env38 # to create a virtual envir pip install --upgrade pip sudo -E apt install -y openjdk-8-jdk openjdk-8-jre # to install JAVA sh script/antlr4/generate_parser.sh # to generate the EVA parser -python -m pip install install -e .[dev] +python -m pip install -e . ``` #### Submitting a contribution @@ -69,4 +69,4 @@ On your local machine, run the following script to auto-format using `black` ``` python script/formatting/formatter.py -``` \ No newline at end of file +``` From cd27791ad4ef03ee260a433d4dc5be0f2de424e9 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Sun, 31 Jul 2022 15:32:46 -0400 Subject: [PATCH 02/18] docs: development guide clean up and version add to docs --- README.md | 2 +- api-docs/conf.py | 25 ++++++++++++++++++------- api-docs/requirements.txt | 4 ++-- api-docs/source/contribute/index.md | 2 +- setup.py | 2 +- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index dbe625d05..8e04cc3c8 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ python3 -m venv env38 # to create a virtual envir pip install --upgrade pip sudo -E apt install -y openjdk-8-jdk openjdk-8-jre # to install JAVA sh script/antlr4/generate_parser.sh # to generate the EVA parser -python -m pip install -e . +pip install -e ".[dev]" ```

diff --git a/api-docs/conf.py b/api-docs/conf.py index 88a764afe..b2fc1a8bb 100644 --- a/api-docs/conf.py +++ b/api-docs/conf.py @@ -1,19 +1,29 @@ -# Configuration file for the Sphinx documentation builder. +# coding=utf-8 +# Copyright 2018-2022 EVA # -# This file only contains a selection of the most common options. For a full -# list see the documentation: -# https://www.sphinx-doc.org/en/master/usage/configuration.html +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # -- Path setup -------------------------------------------------------------- +import os +import sys +import warnings # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # from datetime import datetime -import warnings -import os -import sys + sys.path.append(os.path.abspath('.')) sys.path.append(os.path.abspath('../')) @@ -31,6 +41,7 @@ # The full version, including alpha/beta/rc tags from eva.version import __version__ as version + release = version master_doc = 'index' diff --git a/api-docs/requirements.txt b/api-docs/requirements.txt index e7044d183..f2bd0e30e 100644 --- a/api-docs/requirements.txt +++ b/api-docs/requirements.txt @@ -11,9 +11,9 @@ commonmark==0.8.1 recommonmark==0.5.0 sphinx==4.3.2 readthedocs-sphinx-ext<1.1 -sphinx-book-theme==0.1.7 +sphinx-book-theme==0.3.3 sphinx-sitemap==2.2.0 -sphinx-external-toc==0.2.3 +sphinx-external-toc==0.3.0 sphinx-thebe==0.1.1 sphinx-jsonschema==1.17.2 diff --git a/api-docs/source/contribute/index.md b/api-docs/source/contribute/index.md index eb1d65e71..40f104758 100644 --- a/api-docs/source/contribute/index.md +++ b/api-docs/source/contribute/index.md @@ -16,7 +16,7 @@ python3 -m venv env38 # to create a virtual envir pip install --upgrade pip sudo -E apt install -y openjdk-8-jdk openjdk-8-jre # to install JAVA sh script/antlr4/generate_parser.sh # to generate the EVA parser -python -m pip install -e . +pip install -e ".[dev]" ``` #### Submitting a contribution diff --git a/setup.py b/setup.py index 6faac69b5..9425dc2ad 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ def version(path): "pandas==1.2.3", "torch==1.7.1", "torchvision==0.8.2", - "Pillow==8.1.2", + "Pillow==9.0.1", "sqlalchemy==1.3.20", "sqlalchemy-utils==0.36.6", "pyspark==3.0.2", From f74b81a963f120aeefbacc4ca150e353d204e0ff Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Sun, 31 Jul 2022 15:35:11 -0400 Subject: [PATCH 03/18] style: fix --- api-docs/conf.py | 93 +++++++++++++++++++++++++++++------------------- 1 file changed, 56 insertions(+), 37 deletions(-) diff --git a/api-docs/conf.py b/api-docs/conf.py index b2fc1a8bb..b146aedab 100644 --- a/api-docs/conf.py +++ b/api-docs/conf.py @@ -18,33 +18,35 @@ import os import sys import warnings + # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # from datetime import datetime -sys.path.append(os.path.abspath('.')) -sys.path.append(os.path.abspath('../')) +from eva.version import __version__ as version + +sys.path.append(os.path.abspath(".")) +sys.path.append(os.path.abspath("../")) # Temp. workaround for # https://github.com/agronholm/sphinx-autodoc-typehints/issues/133 warnings.filterwarnings( - 'ignore', message='sphinx.util.inspect.Signature\(\) is deprecated') + "ignore", message="sphinx.util.inspect.Signature() is deprecated" +) # -- Project information ----------------------------------------------------- project = "evadb" author = "Georgia Tech Database Group" -copyright = str(datetime.now().year) + f', {author}' +copyright = str(datetime.now().year) + f", {author}" # The full version, including alpha/beta/rc tags -from eva.version import __version__ as version +release = version -release = version - -master_doc = 'index' +master_doc = "index" # -- General configuration --------------------------------------------------- @@ -54,17 +56,17 @@ # ones. extensions = [ "sphinx_external_toc", - 'sphinx.ext.autosummary', - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.intersphinx', - 'sphinx.ext.todo', - 'sphinx.ext.mathjax', - 'sphinx.ext.viewcode', - 'sphinx.ext.napoleon', - 'sphinx.ext.graphviz', + "sphinx.ext.autosummary", + "sphinx.ext.autodoc", + "sphinx.ext.doctest", + "sphinx.ext.intersphinx", + "sphinx.ext.todo", + "sphinx.ext.mathjax", + "sphinx.ext.viewcode", + "sphinx.ext.napoleon", + "sphinx.ext.graphviz", "myst_nb", - "sphinx-jsonschema" + "sphinx-jsonschema", ] myst_enable_extensions = [ @@ -92,27 +94,37 @@ # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] +exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"] # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" # highlight_language = 'python' # If true, `todo` and `todoList` produce output, else they produce nothing. todo_include_todos = True -autodoc_mock_imports = ["numpy", "sqlalchemy", "sqlalchemy_utils", - "sqlalchemy.orm", "sqlalchemy.orm.exc", - "sqlalchemy.types", - "petastorm", "yaml", "pyspark", "torch", - "pandas", "cv2", "eva.catalog"] +autodoc_mock_imports = [ + "numpy", + "sqlalchemy", + "sqlalchemy_utils", + "sqlalchemy.orm", + "sqlalchemy.orm.exc", + "sqlalchemy.types", + "petastorm", + "yaml", + "pyspark", + "torch", + "pandas", + "cv2", + "eva.catalog", +] # -- Options for HTML output ------------------------------------------------- @@ -130,7 +142,7 @@ "use_issues_button": True, "use_edit_page_button": True, "path_to_docs": "api-docs/", - "home_page_in_toc": False + "home_page_in_toc": False, } # Add any paths that contain custom themes here, relative to this directory. @@ -140,22 +152,29 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] - -autodoc_default_flags = ['members', 'private-members', 'special-members', - # 'undoc-members', - 'show-inheritance'] +html_static_path = ["_static"] + +autodoc_default_flags = [ + "members", + "private-members", + "special-members", + # 'undoc-members', + "show-inheritance", +] def autodoc_skip_member(app, what, name, obj, skip, options): # Ref: https://stackoverflow.com/a/21449475/ - exclusions = ('__weakref__', # special-members - '__doc__', '__module__', '__dict__', # undoc-members - ) + exclusions = ( + "__weakref__", # special-members + "__doc__", + "__module__", + "__dict__", # undoc-members + ) exclude = name in exclusions return True if exclude else None def setup(app): - app.connect('autodoc-skip-member', autodoc_skip_member) - app.add_css_file('custom.css') + app.connect("autodoc-skip-member", autodoc_skip_member) + app.add_css_file("custom.css") From d88fae5fe9c69cb5d3d15a21ed70613aafc9eca1 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Sun, 31 Jul 2022 16:02:19 -0400 Subject: [PATCH 04/18] style: only style the defualt dir --- script/formatting/formatter.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/script/formatting/formatter.py b/script/formatting/formatter.py index 08f274554..0236afcd6 100755 --- a/script/formatting/formatter.py +++ b/script/formatting/formatter.py @@ -19,6 +19,7 @@ import re import subprocess import sys +from pathlib import Path import pkg_resources @@ -258,7 +259,6 @@ def format_dir(dir_path, add_header, strip_header, format_code): is_tool(BLACK_BINARY) is_tool(FLAKE_BINARY) is_tool(ISORT_BINARY) - if args.file_name: LOG.info("Scanning file: " + "".join(args.file_name)) format_file( @@ -272,7 +272,7 @@ def format_dir(dir_path, add_header, strip_header, format_code): format_dir(args.dir_name, args.add_header, args.strip_header, args.format_code) # BY DEFAULT, WE FIX THE MODIFIED FILES else: - LOG.info("Default fix modified files") + # LOG.info("Default fix modified files") MERGEBASE = subprocess.check_output( "git merge-base origin/master HEAD", shell=True, text=True ).rstrip() @@ -285,14 +285,21 @@ def format_dir(dir_path, add_header, strip_header, format_code): .rstrip() .split("\n") ) - for file in files: - if file != "script/formatting/formatter.py": + valid = False + ## only format the defualt directories + file_path = str(Path(file).absolute()) + for source_dir in DEFAULT_DIRS: + source_path = str(Path(source_dir).resolve()) + if file_path.startswith(source_path): + valid = True + + if valid: LOG.info("Stripping headers : " + file) format_file(file, False, True, False) LOG.info("Adding headers : " + file) format_file(file, True, False, False) - LOG.info("Formatting File : " + file) - format_file(file, False, False, True) + LOG.info("Formatting File : " + file) + format_file(file, False, False, True) From e555aae12d92acc72e96d1a82b683bd34322c8c8 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Sun, 31 Jul 2022 16:10:10 -0400 Subject: [PATCH 05/18] merge --- api-docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api-docs/conf.py b/api-docs/conf.py index b146aedab..982b45196 100644 --- a/api-docs/conf.py +++ b/api-docs/conf.py @@ -25,7 +25,7 @@ # from datetime import datetime -from eva.version import __version__ as version +from eva.version import VERSION as version sys.path.append(os.path.abspath(".")) sys.path.append(os.path.abspath("../")) From dc95ee1fc0ed21e71ba86049896c40f5360b1ae7 Mon Sep 17 00:00:00 2001 From: Rajveer Bachkaniwala Date: Sat, 22 Oct 2022 16:55:45 -0400 Subject: [PATCH 06/18] remove spark --- eva/catalog/schema_utils.py | 60 +++++++++++- eva/storage/storage_engine.py | 2 +- eva/storage/structured_storage_engine.py | 115 +++++++++++++++++++++++ 3 files changed, 173 insertions(+), 4 deletions(-) create mode 100644 eva/storage/structured_storage_engine.py diff --git a/eva/catalog/schema_utils.py b/eva/catalog/schema_utils.py index 6985e8965..4b2c74184 100644 --- a/eva/catalog/schema_utils.py +++ b/eva/catalog/schema_utils.py @@ -12,13 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Dict, List import numpy as np import pandas as pd from petastorm.codecs import NdarrayCodec, ScalarCodec from petastorm.unischema import Unischema, UnischemaField from pyspark.sql.types import FloatType, IntegerType, StringType +from sqlalchemy import ( + FLOAT, + INTEGER, + TEXT, + Column, + Float, + Integer, + LargeBinary, +) from eva.catalog.column_type import ColumnType, NdArrayType +from eva.catalog.models.df_column import DataFrameColumn from eva.utils.logging_manager import logger @@ -55,7 +66,11 @@ def get_petastorm_column(df_column): ) elif column_type == ColumnType.TEXT: petastorm_column = UnischemaField( - column_name, np.str_, (), ScalarCodec(StringType()), column_is_nullable + column_name, + np.str_, + (), + ScalarCodec(StringType()), + column_is_nullable, ) elif column_type == ColumnType.NDARRAY: np_type = NdArrayType.to_numpy_type(column_array_type) @@ -82,7 +97,9 @@ def get_petastorm_schema(name, column_list): return petastorm_schema @staticmethod - def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) -> pd.DataFrame: + def petastorm_type_cast( + schema: Unischema, df: pd.DataFrame + ) -> pd.DataFrame: """ Try to cast the type if schema defined in UnischemeField for Petastorm is not consistent with panda DataFrame provided. @@ -96,5 +113,42 @@ def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) -> pd.DataFrame: try: df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False)) except Exception: - logger.exception("Failed to cast %s to %s for Petastorm" % (col, dtype)) + logger.exception( + "Failed to cast %s to %s for Petastorm" % (col, dtype) + ) return df + + @staticmethod + def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column: + column_type = df_column.type + + sqlalchemy_column = None + if column_type == ColumnType.INTEGER: + sqlalchemy_column = Column(Integer) + elif column_type == ColumnType.FLOAT: + sqlalchemy_column = Column(Float) + elif column_type == ColumnType.TEXT: + sqlalchemy_column = Column(TEXT) + elif column_type == ColumnType.NDARRAY: + sqlalchemy_column = Column(LargeBinary) + else: + logger.error("Invalid column type: " + str(column_type)) + + return sqlalchemy_column + + @staticmethod + def get_sqlalchemy_schema( + column_list: List[DataFrameColumn], + ) -> Dict[str, Column]: + """Converts the list of DataFrameColumns to SQLAlchemyColumns + + Args: + column_list (List[DataFrameColumn]): columns to be converted + + Returns: + Dict[str, Column]: mapping from column_name to sqlalchemy column object + """ + return { + column.name: SchemaUtils.get_sqlalchemy_column(column) + for column in column_list + } diff --git a/eva/storage/storage_engine.py b/eva/storage/storage_engine.py index daa2cf413..93927e6a9 100644 --- a/eva/storage/storage_engine.py +++ b/eva/storage/storage_engine.py @@ -15,7 +15,7 @@ from eva.configuration.configuration_manager import ConfigurationManager from eva.utils.generic_utils import str_to_class -StorageEngine = str_to_class(ConfigurationManager().get_value("storage", "engine"))() +StorageEngine = str_to_class("eva.storage.structured_storage_engine.SQLStorageEngine")() VideoStorageEngine = str_to_class( ConfigurationManager().get_value("storage", "video_engine") )() diff --git a/eva/storage/structured_storage_engine.py b/eva/storage/structured_storage_engine.py new file mode 100644 index 000000000..3c83bcbd1 --- /dev/null +++ b/eva/storage/structured_storage_engine.py @@ -0,0 +1,115 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import shutil +from pathlib import Path +from typing import Iterator, List + +from petastorm.etl.dataset_metadata import materialize_dataset +from petastorm.predicates import in_lambda +from petastorm.unischema import dict_to_spark_row + +from eva.catalog.models.df_metadata import DataFrameMetadata +from eva.catalog.sql_config import SQLConfig +from eva.configuration.configuration_manager import ConfigurationManager +from eva.models.storage.batch import Batch +from eva.readers.petastorm_reader import PetastormReader +from eva.storage.abstract_storage_engine import AbstractStorageEngine +from eva.utils.logging_manager import logger +from eva.catalog.models.base_model import BaseModel +from eva.catalog.schema_utils import SchemaUtils + +# Leveraging Dynamic schema in SQLAlchemy +# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + + +class SQLStorageEngine(AbstractStorageEngine): + def __init__(self): + """ + Grab the existing sql session + """ + self._sql_session = SQLConfig().session + + def create(self, table: DataFrameMetadata, **kwargs): + """ + Create an empty table in sql. + """ + attr_dict = {"__tablename__": table.name} + sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) + attr_dict.update(sqlalchemy_schema) + new_table = type("new_table", (BaseModel,), attr_dict) + BaseModel.metadata.create_table(tables=[new_table.__table__]) + + def drop(self, table: DataFrameMetadata): + dir_path = self._spark_url(table) + try: + shutil.rmtree(str(dir_path)) + except Exception as e: + logger.exception(f"Failed to drop the video table {e}") + + def write(self, table: DataFrameMetadata, rows: Batch): + """ + Write rows into the sql table. + + Arguments: + table: table metadata object to write into + rows : batch to be persisted in the storage. + """ + attr_dict = {"__tablename__": table.name} + sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) + attr_dict.update(sqlalchemy_schema) + new_table = type("new_table", (BaseModel,), attr_dict) + + columns = rows.frames.keys() + data = [] + for record in rows.frames.values: + row_data = { col : record[col] for col in columns} + data.append(row_data) + self._sql_session.bulk_insert_mappings(new_table, data) + self._sql_session.commit() + + def read( + self, + table: DataFrameMetadata, + batch_mem_size: int, + columns: List[str] = None, + predicate_func=None, + ) -> Iterator[Batch]: + """ + Reads the table and return a batch iterator for the + tuples that passes the predicate func. + + Argument: + table: table metadata object to write into + batch_mem_size (int): memory size of the batch read from storage + columns (List[str]): A list of column names to be + considered in predicate_func + predicate_func: customized predicate function returns bool + + Return: + Iterator of Batch read. + """ + predicate = None + if predicate_func and columns: + predicate = in_lambda(columns, predicate_func) + + # ToDo: Handle the sharding logic. We might have to maintain a + # context for deciding which shard to read + reader = PetastormReader( + self._spark_url(table), + batch_mem_size=batch_mem_size, + predicate=predicate, + ) + for batch in reader.read(): + yield batch From 5b9cdd668ac3f93638e6b3f88fd314fe0166477f Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 03:29:00 -0400 Subject: [PATCH 07/18] feat: replace spark+petastorm with sqlalchemy+sqlite --- eva/catalog/models/base_model.py | 2 +- eva/catalog/models/df_column.py | 2 +- eva/catalog/models/udf_io.py | 2 +- eva/readers/petastorm_reader.py | 80 ------------- eva/spark/__init__.py | 15 --- eva/spark/session.py | 81 ------------- eva/storage/petastorm_storage_engine.py | 133 ---------------------- eva/storage/sqlite_storage_engine.py | 139 +++++++++++++++++++++++ eva/storage/storage_engine.py | 2 +- eva/storage/structured_storage_engine.py | 115 ------------------- eva/utils/generic_utils.py | 11 ++ test/spark/__init__.py | 14 --- test/spark/test_session.py | 39 ------- 13 files changed, 154 insertions(+), 481 deletions(-) delete mode 100644 eva/readers/petastorm_reader.py delete mode 100644 eva/spark/__init__.py delete mode 100644 eva/spark/session.py delete mode 100644 eva/storage/petastorm_storage_engine.py create mode 100644 eva/storage/sqlite_storage_engine.py delete mode 100644 eva/storage/structured_storage_engine.py delete mode 100644 test/spark/__init__.py delete mode 100644 test/spark/test_session.py diff --git a/eva/catalog/models/base_model.py b/eva/catalog/models/base_model.py index 3d194432d..c32af1afd 100644 --- a/eva/catalog/models/base_model.py +++ b/eva/catalog/models/base_model.py @@ -33,7 +33,7 @@ class CustomModel: """ query = db_session.query_property() - _id = Column("id", Integer, primary_key=True) + _id = Column("_row_id", Integer, primary_key=True) def __init__(self, **kwargs): cls_ = type(self) diff --git a/eva/catalog/models/df_column.py b/eva/catalog/models/df_column.py index 95b348a9e..04164820d 100644 --- a/eva/catalog/models/df_column.py +++ b/eva/catalog/models/df_column.py @@ -31,7 +31,7 @@ class DataFrameColumn(BaseModel): _is_nullable = Column("is_nullable", Boolean, default=False) _array_type = Column("array_type", Enum(NdArrayType), nullable=True) _array_dimensions = Column("array_dimensions", String(100)) - _metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata.id")) + _metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata._row_id")) _dataset = relationship("DataFrameMetadata", back_populates="_columns") diff --git a/eva/catalog/models/udf_io.py b/eva/catalog/models/udf_io.py index 2b5b74a80..f0da3bea4 100644 --- a/eva/catalog/models/udf_io.py +++ b/eva/catalog/models/udf_io.py @@ -32,7 +32,7 @@ class UdfIO(BaseModel): _array_type = Column("array_type", Enum(NdArrayType), nullable=True) _array_dimensions = Column("array_dimensions", String(100)) _is_input = Column("is_input", Boolean, default=True) - _udf_id = Column("udf_id", Integer, ForeignKey("udf.id")) + _udf_id = Column("udf_id", Integer, ForeignKey("udf._row_id")) _udf = relationship("UdfMetadata", back_populates="_cols") __table_args__ = (UniqueConstraint("name", "udf_id"), {}) diff --git a/eva/readers/petastorm_reader.py b/eva/readers/petastorm_reader.py deleted file mode 100644 index 41f3f9073..000000000 --- a/eva/readers/petastorm_reader.py +++ /dev/null @@ -1,80 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Dict, Iterator - -from petastorm import make_reader - -from eva.configuration.configuration_manager import ConfigurationManager -from eva.readers.abstract_reader import AbstractReader - - -class PetastormReader(AbstractReader): - def __init__( - self, *args, cur_shard=None, shard_count=None, predicate=None, **kwargs - ): - """ - Reads data from the petastorm parquet stores. Note this won't - work for any arbitary parquet store apart from one materialized - using petastorm. In order to generalize, we might have to replace - `make_reader` with `make_batch_reader`. - https://petastorm.readthedocs.io/en/latest/api.html#module-petastorm.reader - - Attributes: - cur_shard (int, optional): Shard number to load from if sharded - shard_count (int, optional): Specify total number of shards if - applicable - predicate (PredicateBase, optional): instance of predicate object - to filter rows to be returned by reader - cache_type (str): the cache type, if desired. - Options are [None, ‘null’, ‘local-disk’] to either have a - null/noop cache or a cache implemented using diskcache. - cache_location (int): the location or path of the cache. - cache_size_limit (int): the size limit of the cache in bytes - cache_row_size_estimate (int): the estimated size of a row - """ - self.cur_shard = cur_shard - self.shard_count = shard_count - self.predicate = predicate - petastorm_config = ConfigurationManager().get_value("storage", "petastorm") - # cache not allowed with predicates - if self.predicate or petastorm_config is None: - petastorm_config = {} - self.cache_type = petastorm_config.get("cache_type", None) - self.cache_location = petastorm_config.get("cache_location", None) - self.cache_size_limit = petastorm_config.get("cache_size_limit", None) - self.cache_row_size_estimate = petastorm_config.get( - "cache_row_size_estimate", None - ) - super().__init__(*args, **kwargs) - if self.cur_shard is not None and self.cur_shard <= 0: - self.cur_shard = None - - if self.shard_count is not None and self.shard_count <= 0: - self.shard_count = None - - def _read(self) -> Iterator[Dict]: - # `Todo`: Generalize this reader - with make_reader( - self.file_url, - shard_count=self.shard_count, - cur_shard=self.cur_shard, - predicate=self.predicate, - cache_type=self.cache_type, - cache_location=self.cache_location, - cache_size_limit=self.cache_size_limit, - cache_row_size_estimate=self.cache_row_size_estimate, - ) as reader: - for row in reader: - yield row._asdict() diff --git a/eva/spark/__init__.py b/eva/spark/__init__.py deleted file mode 100644 index 3765a7c41..000000000 --- a/eva/spark/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""creates a spark session""" diff --git a/eva/spark/session.py b/eva/spark/session.py deleted file mode 100644 index b62c44528..000000000 --- a/eva/spark/session.py +++ /dev/null @@ -1,81 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from pyspark.conf import SparkConf -from pyspark.sql import SparkSession - -from eva.configuration.configuration_manager import ConfigurationManager - - -class Session(object): - """ - Wrapper around Spark Session - """ - - _instance = None - _session = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super(Session, cls).__new__(cls) - return cls._instance - - def __init__(self): - self._config = ConfigurationManager() - name = self._config.get_value("core", "application") - self.init_spark_session(name) - - def init_spark_session(self, application_name, spark_master=None): - """Setup a spark session. - - :param spark_master: A master parameter used by spark session builder. - Use default value (None) to use system - environment configured spark cluster. - Use 'local[*]' to run on a local box. - - :return: spark_session: A spark session - """ - - eva_spark_conf = SparkConf() - pyspark_config = self._config.get_value("pyspark", "property") - for key, value in pyspark_config.items(): - eva_spark_conf.set(key, value) - - session_builder = SparkSession.builder.appName(application_name).config( - conf=eva_spark_conf - ) - - if spark_master: - session_builder.master(spark_master) - - # Gets an existing SparkSession or, - # if there is no existing one, creates a new one based - # on the options set in this builder. - self._session = session_builder.getOrCreate() - - # Configure logging - spark_context = self._session.sparkContext - spark_context.setLogLevel("OFF") - - def get_session(self): - return self._session - - def get_context(self): - return self._session.sparkContext - - def stop(self): - self._session.stop() - - def __del__(self): - self._session.stop() diff --git a/eva/storage/petastorm_storage_engine.py b/eva/storage/petastorm_storage_engine.py deleted file mode 100644 index ba4937168..000000000 --- a/eva/storage/petastorm_storage_engine.py +++ /dev/null @@ -1,133 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import shutil -from pathlib import Path -from typing import Iterator, List - -from petastorm.etl.dataset_metadata import materialize_dataset -from petastorm.predicates import in_lambda -from petastorm.unischema import dict_to_spark_row - -from eva.catalog.models.df_metadata import DataFrameMetadata -from eva.configuration.configuration_manager import ConfigurationManager -from eva.models.storage.batch import Batch -from eva.readers.petastorm_reader import PetastormReader -from eva.spark.session import Session -from eva.storage.abstract_storage_engine import AbstractStorageEngine -from eva.utils.logging_manager import logger - - -class PetastormStorageEngine(AbstractStorageEngine): - def __init__(self): - """ - Maintain a long live spark session and context. - """ - self._spark = Session() - self.spark_session = self._spark.get_session() - self.spark_context = self._spark.get_context() - self.coalesce = ConfigurationManager().get_value("pyspark", "coalesce") - - def _spark_url(self, table: DataFrameMetadata) -> str: - """ - Generate a spark/petastorm url given a table - """ - return Path(table.file_url).resolve().as_uri() - - def create(self, table: DataFrameMetadata, **kwargs): - """ - Create an empty dataframe in petastorm. - """ - empty_rdd = self.spark_context.emptyRDD() - - with materialize_dataset( - self.spark_session, self._spark_url(table), table.schema.petastorm_schema - ): - - self.spark_session.createDataFrame( - empty_rdd, table.schema.pyspark_schema - ).coalesce(self.coalesce).write.mode("overwrite").parquet( - self._spark_url(table) - ) - - def drop(self, table: DataFrameMetadata): - dir_path = self._spark_url(table) - try: - shutil.rmtree(str(dir_path)) - except Exception as e: - logger.exception(f"Failed to drop the video table {e}") - - def write(self, table: DataFrameMetadata, rows: Batch): - """ - Write rows into the dataframe. - - Arguments: - table: table metadata object to write into - rows : batch to be persisted in the storage. - """ - - if rows.empty(): - return - # ToDo - # Throw an error if the row schema doesn't match the table schema - - with materialize_dataset( - self.spark_session, self._spark_url(table), table.schema.petastorm_schema - ): - - records = rows.frames - columns = records.keys() - rows_rdd = ( - self.spark_context.parallelize(records.values) - .map(lambda x: dict(zip(columns, x))) - .map(lambda x: dict_to_spark_row(table.schema.petastorm_schema, x)) - ) - self.spark_session.createDataFrame( - rows_rdd, table.schema.pyspark_schema - ).coalesce(self.coalesce).write.mode("append").parquet( - self._spark_url(table) - ) - - def read( - self, - table: DataFrameMetadata, - batch_mem_size: int, - columns: List[str] = None, - predicate_func=None, - ) -> Iterator[Batch]: - """ - Reads the table and return a batch iterator for the - tuples that passes the predicate func. - - Argument: - table: table metadata object to write into - batch_mem_size (int): memory size of the batch read from storage - columns (List[str]): A list of column names to be - considered in predicate_func - predicate_func: customized predicate function returns bool - - Return: - Iterator of Batch read. - """ - predicate = None - if predicate_func and columns: - predicate = in_lambda(columns, predicate_func) - - # ToDo: Handle the sharding logic. We might have to maintain a - # context for deciding which shard to read - reader = PetastormReader( - self._spark_url(table), batch_mem_size=batch_mem_size, predicate=predicate - ) - for batch in reader.read(): - yield batch diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py new file mode 100644 index 000000000..10b3dcb96 --- /dev/null +++ b/eva/storage/sqlite_storage_engine.py @@ -0,0 +1,139 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import shutil +from typing import Iterator, List +import pandas as pd + +from eva.catalog.column_type import ColumnType +from eva.catalog.models.df_column import DataFrameColumn + +from eva.catalog.models.df_metadata import DataFrameMetadata +from eva.catalog.sql_config import SQLConfig +from eva.models.storage.batch import Batch +from eva.storage.abstract_storage_engine import AbstractStorageEngine +from eva.utils.generic_utils import PickleSerializer, get_size +from eva.utils.logging_manager import logger +from eva.catalog.models.base_model import BaseModel +from eva.catalog.schema_utils import SchemaUtils +from sqlalchemy.ext.declarative import declarative_base + +# Leveraging Dynamic schema in SQLAlchemy +# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + +Base = declarative_base() + + +class SQLStorageEngine(AbstractStorageEngine): + def __init__(self): + """ + Grab the existing sql session + """ + self._sql_session = SQLConfig().session + self._sql_engine = SQLConfig().engine + self._serializer = PickleSerializer() + + def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): + # Serialize numpy data + for col in columns: + if col.type == ColumnType.NDARRAY: + dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) + return dict_row + + def _sql_row_to_dict(self, sql_row: tuple, columns: List[DataFrameColumn]): + # Deserialize numpy data + dict_row = {} + for idx, col in enumerate(columns): + if col.type == ColumnType.NDARRAY: + dict_row[col.name] = self._serializer.deserialize(sql_row[idx]) + else: + dict_row[col.name] = sql_row[idx] + return dict_row + + def create(self, table: DataFrameMetadata, **kwargs): + """ + Create an empty table in sql. + It dynamically constructs schema in sqlaclchemy + to create the table + """ + attr_dict = {"__tablename__": table.name} + sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) + attr_dict.update(sqlalchemy_schema) + # dynamic schema generation + # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + new_table = type("__placeholder_class_name", (BaseModel,), attr_dict)() + BaseModel.metadata.tables[table.name].create(self._sql_engine) + self._sql_session.commit() + return new_table + + def drop(self, table: DataFrameMetadata): + dir_path = self._spark_url(table) + try: + shutil.rmtree(str(dir_path)) + except Exception as e: + logger.exception(f"Failed to drop the video table {e}") + + def write(self, table: DataFrameMetadata, rows: Batch): + """ + Write rows into the sql table. + + Arguments: + table: table metadata object to write into + rows : batch to be persisted in the storage. + """ + new_table = BaseModel.metadata.tables[table.name] + columns = rows.frames.keys() + data = [] + # ToDo: validate the data type before inserting into the table + for record in rows.frames.values: + row_data = {col: record[idx] for idx, col in enumerate(columns)} + data.append(self._dict_to_sql_row(row_data, table.columns)) + self._sql_engine.execute(new_table.insert(), data) + self._sql_session.commit() + + def read( + self, + table: DataFrameMetadata, + batch_mem_size: int, + ) -> Iterator[Batch]: + """ + Reads the table and return a batch iterator for the + tuples. + + Argument: + table: table metadata object of teh table to read + batch_mem_size (int): memory size of the batch read from storage + Return: + Iterator of Batch read. + """ + + new_table = BaseModel.metadata.tables[table.name] + result = self._sql_engine.execute( + new_table.select() + ) + data_batch = [] + row_size = None + for row in result: + # Todo: Verfiy the order of columns in row matches the table.columns + # ignore the first dummy (_row_id) primary column + data_batch.append(self._sql_row_to_dict(row[1:], table.columns)) + if row_size is None: + row_size = 0 + row_size = get_size(data_batch) + if len(data_batch) * row_size >= batch_mem_size: + yield Batch(pd.DataFrame(data_batch)) + data_batch = [] + if data_batch: + yield Batch(pd.DataFrame(data_batch)) + diff --git a/eva/storage/storage_engine.py b/eva/storage/storage_engine.py index 93927e6a9..fa54e7da8 100644 --- a/eva/storage/storage_engine.py +++ b/eva/storage/storage_engine.py @@ -15,7 +15,7 @@ from eva.configuration.configuration_manager import ConfigurationManager from eva.utils.generic_utils import str_to_class -StorageEngine = str_to_class("eva.storage.structured_storage_engine.SQLStorageEngine")() +StorageEngine = str_to_class("eva.storage.sqlite_storage_engine.SQLStorageEngine")() VideoStorageEngine = str_to_class( ConfigurationManager().get_value("storage", "video_engine") )() diff --git a/eva/storage/structured_storage_engine.py b/eva/storage/structured_storage_engine.py deleted file mode 100644 index 3c83bcbd1..000000000 --- a/eva/storage/structured_storage_engine.py +++ /dev/null @@ -1,115 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import shutil -from pathlib import Path -from typing import Iterator, List - -from petastorm.etl.dataset_metadata import materialize_dataset -from petastorm.predicates import in_lambda -from petastorm.unischema import dict_to_spark_row - -from eva.catalog.models.df_metadata import DataFrameMetadata -from eva.catalog.sql_config import SQLConfig -from eva.configuration.configuration_manager import ConfigurationManager -from eva.models.storage.batch import Batch -from eva.readers.petastorm_reader import PetastormReader -from eva.storage.abstract_storage_engine import AbstractStorageEngine -from eva.utils.logging_manager import logger -from eva.catalog.models.base_model import BaseModel -from eva.catalog.schema_utils import SchemaUtils - -# Leveraging Dynamic schema in SQLAlchemy -# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html - - -class SQLStorageEngine(AbstractStorageEngine): - def __init__(self): - """ - Grab the existing sql session - """ - self._sql_session = SQLConfig().session - - def create(self, table: DataFrameMetadata, **kwargs): - """ - Create an empty table in sql. - """ - attr_dict = {"__tablename__": table.name} - sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) - attr_dict.update(sqlalchemy_schema) - new_table = type("new_table", (BaseModel,), attr_dict) - BaseModel.metadata.create_table(tables=[new_table.__table__]) - - def drop(self, table: DataFrameMetadata): - dir_path = self._spark_url(table) - try: - shutil.rmtree(str(dir_path)) - except Exception as e: - logger.exception(f"Failed to drop the video table {e}") - - def write(self, table: DataFrameMetadata, rows: Batch): - """ - Write rows into the sql table. - - Arguments: - table: table metadata object to write into - rows : batch to be persisted in the storage. - """ - attr_dict = {"__tablename__": table.name} - sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) - attr_dict.update(sqlalchemy_schema) - new_table = type("new_table", (BaseModel,), attr_dict) - - columns = rows.frames.keys() - data = [] - for record in rows.frames.values: - row_data = { col : record[col] for col in columns} - data.append(row_data) - self._sql_session.bulk_insert_mappings(new_table, data) - self._sql_session.commit() - - def read( - self, - table: DataFrameMetadata, - batch_mem_size: int, - columns: List[str] = None, - predicate_func=None, - ) -> Iterator[Batch]: - """ - Reads the table and return a batch iterator for the - tuples that passes the predicate func. - - Argument: - table: table metadata object to write into - batch_mem_size (int): memory size of the batch read from storage - columns (List[str]): A list of column names to be - considered in predicate_func - predicate_func: customized predicate function returns bool - - Return: - Iterator of Batch read. - """ - predicate = None - if predicate_func and columns: - predicate = in_lambda(columns, predicate_func) - - # ToDo: Handle the sharding logic. We might have to maintain a - # context for deciding which shard to read - reader = PetastormReader( - self._spark_url(table), - batch_mem_size=batch_mem_size, - predicate=predicate, - ) - for batch in reader.read(): - yield batch diff --git a/eva/utils/generic_utils.py b/eva/utils/generic_utils.py index f852fe7cb..968df0261 100644 --- a/eva/utils/generic_utils.py +++ b/eva/utils/generic_utils.py @@ -16,6 +16,8 @@ import importlib import sys import uuid +import pickle + from pathlib import Path from eva.configuration.configuration_manager import ConfigurationManager @@ -130,3 +132,12 @@ def get_size(obj, seen=None): elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)): size += sum([get_size(i, seen) for i in obj]) return size + + +class PickleSerializer(object): + + def serialize(self, data): + return pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) + + def deserialize(self, data): + return pickle.loads(data) \ No newline at end of file diff --git a/test/spark/__init__.py b/test/spark/__init__.py deleted file mode 100644 index ccbb30dee..000000000 --- a/test/spark/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/test/spark/test_session.py b/test/spark/test_session.py deleted file mode 100644 index 35f034102..000000000 --- a/test/spark/test_session.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -from pyspark.sql import SparkSession - -from eva.spark.session import Session - - -class SparkSessionTest(unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def setUp(self): - self.session = Session() - - # def tearDown(self): - # self.session = Session() - # self.session.stop() - - def test_session(self): - - spark_session = self.session.get_session() - - session2 = Session() - self.assertEqual(self.session, session2) - self.assertIsInstance(spark_session, SparkSession) From 7ea159e6b886ddeaedadc8977d937160872fcba1 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 03:39:03 -0400 Subject: [PATCH 08/18] feat: remove petastorm related code --- eva/catalog/df_schema.py | 13 --- eva/catalog/schema_utils.py | 102 +------------------ eva/eva.yml | 15 +-- eva/storage/sqlite_storage_engine.py | 34 +++---- eva/storage/storage_engine.py | 2 +- eva/utils/generic_utils.py | 6 +- test/catalog/test_schema.py | 96 ----------------- test/integration_tests/test_load_executor.py | 7 ++ 8 files changed, 30 insertions(+), 245 deletions(-) diff --git a/eva/catalog/df_schema.py b/eva/catalog/df_schema.py index 2536945db..0c7a2090c 100644 --- a/eva/catalog/df_schema.py +++ b/eva/catalog/df_schema.py @@ -15,7 +15,6 @@ from typing import List from eva.catalog.models.df_column import DataFrameColumn -from eva.catalog.schema_utils import SchemaUtils class DataFrameSchema(object): @@ -23,10 +22,6 @@ def __init__(self, name: str, column_list: List[DataFrameColumn]): self._name = name self._column_list = column_list - self._petastorm_schema = SchemaUtils.get_petastorm_schema( - self._name, self._column_list - ) - self._pyspark_schema = self._petastorm_schema.as_spark_schema() def __str__(self): schema_str = "SCHEMA:: (" + self._name + ")\n" @@ -42,14 +37,6 @@ def name(self): def column_list(self): return self._column_list - @property - def petastorm_schema(self): - return self._petastorm_schema - - @property - def pyspark_schema(self): - return self._pyspark_schema - def __eq__(self, other): return self.name == other.name and self._column_list == other.column_list diff --git a/eva/catalog/schema_utils.py b/eva/catalog/schema_utils.py index 4b2c74184..08c58cdfb 100644 --- a/eva/catalog/schema_utils.py +++ b/eva/catalog/schema_utils.py @@ -13,111 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, List -import numpy as np -import pandas as pd -from petastorm.codecs import NdarrayCodec, ScalarCodec -from petastorm.unischema import Unischema, UnischemaField -from pyspark.sql.types import FloatType, IntegerType, StringType -from sqlalchemy import ( - FLOAT, - INTEGER, - TEXT, - Column, - Float, - Integer, - LargeBinary, -) -from eva.catalog.column_type import ColumnType, NdArrayType +from sqlalchemy import TEXT, Column, Float, Integer, LargeBinary + +from eva.catalog.column_type import ColumnType from eva.catalog.models.df_column import DataFrameColumn from eva.utils.logging_manager import logger class SchemaUtils(object): - @staticmethod - def get_petastorm_column(df_column): - - column_type = df_column.type - column_name = df_column.name - column_is_nullable = df_column.is_nullable - column_array_type = df_column.array_type - column_array_dimensions = df_column.array_dimensions - - # Reference: - # https://github.com/uber/petastorm/blob/master/petastorm/ - # tests/test_common.py - - petastorm_column = None - if column_type == ColumnType.INTEGER: - petastorm_column = UnischemaField( - column_name, - np.int64, - (), - ScalarCodec(IntegerType()), - column_is_nullable, - ) - elif column_type == ColumnType.FLOAT: - petastorm_column = UnischemaField( - column_name, - np.float32, - (), - ScalarCodec(FloatType()), - column_is_nullable, - ) - elif column_type == ColumnType.TEXT: - petastorm_column = UnischemaField( - column_name, - np.str_, - (), - ScalarCodec(StringType()), - column_is_nullable, - ) - elif column_type == ColumnType.NDARRAY: - np_type = NdArrayType.to_numpy_type(column_array_type) - petastorm_column = UnischemaField( - column_name, - np_type, - column_array_dimensions, - NdarrayCodec(), - column_is_nullable, - ) - else: - logger.error("Invalid column type: " + str(column_type)) - - return petastorm_column - - @staticmethod - def get_petastorm_schema(name, column_list): - petastorm_column_list = [] - for _column in column_list: - petastorm_column = SchemaUtils.get_petastorm_column(_column) - petastorm_column_list.append(petastorm_column) - - petastorm_schema = Unischema(name, petastorm_column_list) - return petastorm_schema - - @staticmethod - def petastorm_type_cast( - schema: Unischema, df: pd.DataFrame - ) -> pd.DataFrame: - """ - Try to cast the type if schema defined in UnischemeField for - Petastorm is not consistent with panda DataFrame provided. - """ - for unischema in schema.fields.values(): - if not isinstance(unischema.codec, NdarrayCodec): - continue - # We only care when the cell data is np.ndarray - col = unischema.name - dtype = unischema.numpy_dtype - try: - df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False)) - except Exception: - logger.exception( - "Failed to cast %s to %s for Petastorm" % (col, dtype) - ) - return df - @staticmethod def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column: column_type = df_column.type diff --git a/eva/eva.yml b/eva/eva.yml index f53143754..fafae81f8 100644 --- a/eva/eva.yml +++ b/eva/eva.yml @@ -17,23 +17,10 @@ executor: storage: upload_dir: "" - engine: "eva.storage.petastorm_storage_engine.PetastormStorageEngine" + engine: "eva.storage.sqlite_storage_engine.SQLStorageEngine" video_engine: "eva.storage.opencv_storage_engine.OpenCVStorageEngine" video_engine_version: 0 - # https://petastorm.readthedocs.io/en/latest/api.html#module-petastorm.reader - petastorm: {'cache_type' : 'local-disk', - 'cache_location' : '.cache', - 'cache_size_limit' : 4000000000, #4gb - 'cache_row_size_estimate' : 512} - -pyspark: - property: {'spark.logConf': 'true', - 'spark.driver.memory': '10g', - 'spark.sql.execution.arrow.pyspark.enabled': 'true'} - coalesce: 2 - - server: host: "0.0.0.0" port: 5432 diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 10b3dcb96..7b12c46f9 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -12,22 +12,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import shutil from typing import Iterator, List + import pandas as pd +from sqlalchemy.ext.declarative import declarative_base from eva.catalog.column_type import ColumnType +from eva.catalog.models.base_model import BaseModel from eva.catalog.models.df_column import DataFrameColumn - from eva.catalog.models.df_metadata import DataFrameMetadata +from eva.catalog.schema_utils import SchemaUtils from eva.catalog.sql_config import SQLConfig from eva.models.storage.batch import Batch from eva.storage.abstract_storage_engine import AbstractStorageEngine from eva.utils.generic_utils import PickleSerializer, get_size from eva.utils.logging_manager import logger -from eva.catalog.models.base_model import BaseModel -from eva.catalog.schema_utils import SchemaUtils -from sqlalchemy.ext.declarative import declarative_base # Leveraging Dynamic schema in SQLAlchemy # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html @@ -49,7 +48,7 @@ def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): for col in columns: if col.type == ColumnType.NDARRAY: dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) - return dict_row + return dict_row def _sql_row_to_dict(self, sql_row: tuple, columns: List[DataFrameColumn]): # Deserialize numpy data @@ -59,8 +58,8 @@ def _sql_row_to_dict(self, sql_row: tuple, columns: List[DataFrameColumn]): dict_row[col.name] = self._serializer.deserialize(sql_row[idx]) else: dict_row[col.name] = sql_row[idx] - return dict_row - + return dict_row + def create(self, table: DataFrameMetadata, **kwargs): """ Create an empty table in sql. @@ -78,11 +77,13 @@ def create(self, table: DataFrameMetadata, **kwargs): return new_table def drop(self, table: DataFrameMetadata): - dir_path = self._spark_url(table) try: - shutil.rmtree(str(dir_path)) + BaseModel.metadata.tables[table.name].drop() + self._sql_session.commit() except Exception as e: - logger.exception(f"Failed to drop the video table {e}") + logger.exception( + f"Failed to drop the table {table.name} with Exception {str(e)}" + ) def write(self, table: DataFrameMetadata, rows: Batch): """ @@ -112,21 +113,19 @@ def read( tuples. Argument: - table: table metadata object of teh table to read + table: table metadata object of teh table to read batch_mem_size (int): memory size of the batch read from storage Return: Iterator of Batch read. """ - + new_table = BaseModel.metadata.tables[table.name] - result = self._sql_engine.execute( - new_table.select() - ) + result = self._sql_engine.execute(new_table.select()) data_batch = [] row_size = None for row in result: # Todo: Verfiy the order of columns in row matches the table.columns - # ignore the first dummy (_row_id) primary column + # ignore the first dummy (_row_id) primary column data_batch.append(self._sql_row_to_dict(row[1:], table.columns)) if row_size is None: row_size = 0 @@ -136,4 +135,3 @@ def read( data_batch = [] if data_batch: yield Batch(pd.DataFrame(data_batch)) - diff --git a/eva/storage/storage_engine.py b/eva/storage/storage_engine.py index fa54e7da8..daa2cf413 100644 --- a/eva/storage/storage_engine.py +++ b/eva/storage/storage_engine.py @@ -15,7 +15,7 @@ from eva.configuration.configuration_manager import ConfigurationManager from eva.utils.generic_utils import str_to_class -StorageEngine = str_to_class("eva.storage.sqlite_storage_engine.SQLStorageEngine")() +StorageEngine = str_to_class(ConfigurationManager().get_value("storage", "engine"))() VideoStorageEngine = str_to_class( ConfigurationManager().get_value("storage", "video_engine") )() diff --git a/eva/utils/generic_utils.py b/eva/utils/generic_utils.py index 968df0261..51029811d 100644 --- a/eva/utils/generic_utils.py +++ b/eva/utils/generic_utils.py @@ -14,10 +14,9 @@ # limitations under the License. import hashlib import importlib +import pickle import sys import uuid -import pickle - from pathlib import Path from eva.configuration.configuration_manager import ConfigurationManager @@ -135,9 +134,8 @@ def get_size(obj, seen=None): class PickleSerializer(object): - def serialize(self, data): return pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) def deserialize(self, data): - return pickle.loads(data) \ No newline at end of file + return pickle.loads(data) diff --git a/test/catalog/test_schema.py b/test/catalog/test_schema.py index b536290f9..afad977b6 100644 --- a/test/catalog/test_schema.py +++ b/test/catalog/test_schema.py @@ -13,112 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import unittest -from decimal import Decimal -from unittest.mock import MagicMock, call, patch - -import numpy as np -from petastorm.codecs import NdarrayCodec, ScalarCodec -from petastorm.unischema import UnischemaField -from pyspark.sql.types import FloatType, IntegerType, StringType from eva.catalog.column_type import ColumnType, NdArrayType from eva.catalog.df_schema import DataFrameSchema from eva.catalog.models.df_column import DataFrameColumn -from eva.catalog.schema_utils import SchemaUtils class SchemaTests(unittest.TestCase): # TEST SCHEMA UTILS START - def test_get_petastorm_column(self): - col_name = "frame_id" - col = DataFrameColumn(col_name, ColumnType.INTEGER, False) - petastorm_col = UnischemaField( - col_name, np.int64, (), ScalarCodec(IntegerType()), False - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, ColumnType.FLOAT, True) - petastorm_col = UnischemaField( - col_name, np.float32, (), ScalarCodec(FloatType()), True - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, ColumnType.TEXT, False) - petastorm_col = UnischemaField( - col_name, np.str_, (), ScalarCodec(StringType()), False - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, None, True, [10, 10]) - self.assertEqual(SchemaUtils.get_petastorm_column(col), None) - - def test_get_petastorm_column_ndarray(self): - expected_type = [ - np.int8, - np.uint8, - np.int16, - np.int32, - np.int64, - np.unicode_, - np.bool_, - np.float32, - np.float64, - Decimal, - np.str_, - np.datetime64, - ] - col_name = "frame_id" - for array_type, np_type in zip(NdArrayType, expected_type): - col = DataFrameColumn( - col_name, ColumnType.NDARRAY, True, array_type, [10, 10] - ) - petastorm_col = UnischemaField( - col_name, np_type, [10, 10], NdarrayCodec(), True - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - def test_raise_exception_when_unkown_array_type(self): - col_name = "frame_id" - col = DataFrameColumn( - col_name, ColumnType.NDARRAY, True, ColumnType.TEXT, [10, 10] - ) - self.assertRaises(ValueError, SchemaUtils.get_petastorm_column, col) - - @patch("eva.catalog.schema_utils.Unischema") - @patch("eva.catalog.schema_utils.SchemaUtils.get_petastorm_column") - def test_get_petastorm_schema(self, mock_get_pc, mock_uni): - cols = [MagicMock() for i in range(2)] - mock_get_pc.side_effect = [1, 2] - self.assertEqual( - SchemaUtils.get_petastorm_schema("name", cols), mock_uni.return_value - ) - mock_get_pc.assert_has_calls([call(cols[0]), call(cols[1])]) - mock_uni.assert_called_once_with("name", [1, 2]) - - # TEST SCHEMA UTILS END - - # TEST DF_SCHEMA START - def test_df_schema(self): - schema_name = "foo" - column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) - column_2 = DataFrameColumn( - "frame_data", ColumnType.NDARRAY, False, NdArrayType.UINT8, [28, 28] - ) - column_3 = DataFrameColumn("frame_label", ColumnType.INTEGER, False) - col_list = [column_1, column_2, column_3] - schema = DataFrameSchema(schema_name, col_list) - expected_schema = SchemaUtils.get_petastorm_schema(schema_name, col_list) - self.assertEqual(schema.name, schema_name) - self.assertEqual(schema.column_list, col_list) - self.assertEqual(schema.petastorm_schema.fields, expected_schema.fields) - for field1, field2 in zip( - schema.petastorm_schema.fields, expected_schema.fields - ): - self.assertEqual(field1, field2) - self.assertEqual(schema.pyspark_schema, expected_schema.as_spark_schema()) - def test_schema_equality(self): schema_name = "foo" column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) diff --git a/test/integration_tests/test_load_executor.py b/test/integration_tests/test_load_executor.py index f48fc24b9..b53be07eb 100644 --- a/test/integration_tests/test_load_executor.py +++ b/test/integration_tests/test_load_executor.py @@ -127,3 +127,10 @@ def test_should_load_csv_with_columns_in_table(self): expected_batch = create_dummy_csv_batches(target_columns=select_columns) expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) + + +if __name__ == "__main__": + suite = unittest.TestSuite() + suite.addTest(LoadExecutorTest("test_should_load_csv_in_table")) + runner = unittest.TextTestRunner() + runner.run(suite) From 0f37cd0a028cf08bb8d28b02ff80c396afed7da2 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 03:39:28 -0400 Subject: [PATCH 09/18] feat: remove spark+petastorm dependency --- setup.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/setup.py b/setup.py index 4f1bcaf45..8ffcb6326 100644 --- a/setup.py +++ b/setup.py @@ -44,8 +44,6 @@ def read(path, encoding="utf-8"): "Pillow==9.0.1", "sqlalchemy==1.3.20", "sqlalchemy-utils==0.36.6", - "pyspark==3.1.3", - "petastorm==0.12.0", "antlr4-python3-runtime==4.10", "pyyaml==5.1", "importlib-metadata<5.0", From 7eb3d5d34916198fb17a5d567e7f0d79e25f7f59 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 03:56:25 -0400 Subject: [PATCH 10/18] test: add sqlite test case --- ...ngine.py => test_sqlite_storage_engine.py} | 42 +++++-------------- 1 file changed, 10 insertions(+), 32 deletions(-) rename test/storage/{test_petastorm_storage_engine.py => test_sqlite_storage_engine.py} (60%) diff --git a/test/storage/test_petastorm_storage_engine.py b/test/storage/test_sqlite_storage_engine.py similarity index 60% rename from test/storage/test_petastorm_storage_engine.py rename to test/storage/test_sqlite_storage_engine.py index f6ce4defd..c3f8c124b 100644 --- a/test/storage/test_petastorm_storage_engine.py +++ b/test/storage/test_sqlite_storage_engine.py @@ -14,15 +14,15 @@ # limitations under the License. import shutil import unittest -from test.util import NUM_FRAMES, create_dummy_batches +from test.util import create_dummy_batches from eva.catalog.column_type import ColumnType, NdArrayType from eva.catalog.models.df_column import DataFrameColumn from eva.catalog.models.df_metadata import DataFrameMetadata -from eva.storage.petastorm_storage_engine import PetastormStorageEngine +from eva.storage.sqlite_storage_engine import SQLStorageEngine -class PetastormStorageEngineTest(unittest.TestCase): +class SQLStorageEngineTest(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.table = None @@ -47,41 +47,19 @@ def tearDown(self): pass def test_should_create_empty_table(self): - petastorm = PetastormStorageEngine() - petastorm.create(self.table) - records = list(petastorm.read(self.table, batch_mem_size=3000)) + sqlengine = SQLStorageEngine() + sqlengine.create(self.table) + records = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertEqual(records, []) def test_should_write_rows_to_table(self): dummy_batches = list(create_dummy_batches()) - petastorm = PetastormStorageEngine() - petastorm.create(self.table) + sqlengine = SQLStorageEngine() + sqlengine.create(self.table) for batch in dummy_batches: batch.drop_column_alias() - petastorm.write(self.table, batch) + sqlengine.write(self.table, batch) - read_batch = list(petastorm.read(self.table, batch_mem_size=3000)) + read_batch = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertTrue(read_batch, dummy_batches) - - def test_should_return_even_frames(self): - dummy_batches = list(create_dummy_batches()) - - petastorm = PetastormStorageEngine() - petastorm.create(self.table) - for batch in dummy_batches: - batch.drop_column_alias() - petastorm.write(self.table, batch) - - read_batch = list( - petastorm.read( - self.table, - batch_mem_size=3000, - columns=["id"], - predicate_func=lambda id: id % 2 == 0, - ) - ) - expected_batch = list( - create_dummy_batches(filters=[i for i in range(NUM_FRAMES) if i % 2 == 0]) - ) - self.assertTrue(read_batch, expected_batch) From d033383bc92791140ffd0e151276b32f47f94e02 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 03:57:43 -0400 Subject: [PATCH 11/18] feat: remove petastorm test case --- test/readers/test_petastorm_reader.py | 102 -------------------------- 1 file changed, 102 deletions(-) delete mode 100644 test/readers/test_petastorm_reader.py diff --git a/test/readers/test_petastorm_reader.py b/test/readers/test_petastorm_reader.py deleted file mode 100644 index 61b75b65d..000000000 --- a/test/readers/test_petastorm_reader.py +++ /dev/null @@ -1,102 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import unittest -from test.util import upload_dir_from_config -from unittest.mock import patch - -import numpy as np - -from eva.configuration.configuration_manager import ConfigurationManager -from eva.readers.petastorm_reader import PetastormReader - - -class PetastormLoaderTest(unittest.TestCase): - class DummyRow: - def __init__(self, frame_id, frame_data): - self.frame_id = frame_id - self.frame_data = frame_data - - def _asdict(self): - return {"id": self.frame_id, "data": self.frame_data} - - class DummyReader: - def __init__(self, data): - self.data = data - - def __enter__(self): - return self - - def __iter__(self): - return self.data - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_call_petastorm_make_reader_with_correct_params(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - cur_shard=2, - shard_count=3, - predicate="pred", - ) - list(petastorm_reader._read()) - mock.assert_called_once_with( - os.path.join(upload_dir_from_config, "dummy.avi"), - shard_count=3, - cur_shard=2, - predicate="pred", - cache_type=None, - cache_location=None, - cache_size_limit=None, - cache_row_size_estimate=None, - ) - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_call_petastorm_make_reader_with_negative_shards(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - cur_shard=-1, - shard_count=-2, - ) - list(petastorm_reader._read()) - petastorm_config = ConfigurationManager().get_value("storage", "petastorm") - mock.assert_called_once_with( - os.path.join(upload_dir_from_config, "dummy.avi"), - shard_count=None, - cur_shard=None, - predicate=None, - cache_location=petastorm_config.get("cache_location", None), - cache_row_size_estimate=petastorm_config.get( - "cache_row_size_estimate", None - ), - cache_size_limit=petastorm_config.get("cache_size_limit", None), - cache_type=petastorm_config.get("cache_type", None), - ) - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_read_data_using_petastorm_reader(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - ) - dummy_values = map(lambda i: self.DummyRow(i, np.ones((2, 2, 3)) * i), range(3)) - mock.return_value = self.DummyReader(dummy_values) - actual = list(petastorm_reader._read()) - expected = list(dummy_values) - self.assertTrue(all([np.allclose(i, j) for i, j in zip(actual, expected)])) From e9c92b7c04f45b8aaa0bc67f1c91a1deb64a1b45 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 11:12:33 -0400 Subject: [PATCH 12/18] feat: enable drop table for sqlite --- eva/catalog/services/df_service.py | 1 + eva/executor/drop_executor.py | 2 ++ eva/storage/sqlite_storage_engine.py | 7 ++++++- test/integration_tests/test_load_executor.py | 13 ++++++++----- test/integration_tests/test_select_executor.py | 2 +- 5 files changed, 18 insertions(+), 7 deletions(-) diff --git a/eva/catalog/services/df_service.py b/eva/catalog/services/df_service.py index f0368a400..66c171d1a 100644 --- a/eva/catalog/services/df_service.py +++ b/eva/catalog/services/df_service.py @@ -104,6 +104,7 @@ def drop_dataset_by_name(self, database_name: str, dataset_name: str): try: dataset = self.dataset_object_by_name(database_name, dataset_name) dataset.delete() + return True except Exception as e: err_msg = "Delete dataset failed for name {} with error {}".format( dataset_name, str(e) diff --git a/eva/executor/drop_executor.py b/eva/executor/drop_executor.py index 73fd3e31b..e94593a01 100644 --- a/eva/executor/drop_executor.py +++ b/eva/executor/drop_executor.py @@ -16,6 +16,7 @@ from eva.catalog.catalog_manager import CatalogManager from eva.executor.abstract_executor import AbstractExecutor +from eva.executor.executor_utils import ExecutorError from eva.models.storage.batch import Batch from eva.planner.drop_plan import DropPlan from eva.storage.storage_engine import StorageEngine, VideoStorageEngine @@ -57,6 +58,7 @@ def exec(self): if not success: err_msg = "Failed to drop {}".format(table_ref) logger.exception(err_msg) + raise ExecutorError(err_msg) yield Batch( pd.DataFrame( diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 7b12c46f9..f6b665e10 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -78,8 +78,13 @@ def create(self, table: DataFrameMetadata, **kwargs): def drop(self, table: DataFrameMetadata): try: - BaseModel.metadata.tables[table.name].drop() + table_to_remove = BaseModel.metadata.tables[table.name] + table_to_remove.drop() self._sql_session.commit() + # In memory metadata does not automatically sync with the database + # therefore manually removing the table from the in-memory metadata + # https://github.com/sqlalchemy/sqlalchemy/issues/5112 + BaseModel.metadata.remove(table_to_remove) except Exception as e: logger.exception( f"Failed to drop the table {table.name} with Exception {str(e)}" diff --git a/test/integration_tests/test_load_executor.py b/test/integration_tests/test_load_executor.py index b53be07eb..9e76271db 100644 --- a/test/integration_tests/test_load_executor.py +++ b/test/integration_tests/test_load_executor.py @@ -91,6 +91,10 @@ def test_should_load_csv_in_table(self): expected_batch = create_dummy_csv_batches() expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) + + # clean up + drop_query = "DROP TABLE MyVideoCSV;" + execute_query_fetch_all(drop_query) def test_should_load_csv_with_columns_in_table(self): @@ -101,12 +105,8 @@ def test_should_load_csv_with_columns_in_table(self): id INTEGER UNIQUE, frame_id INTEGER NOT NULL, video_id INTEGER NOT NULL, - dataset_name TEXT(30) NOT NULL, - label TEXT(30), - bbox NDARRAY FLOAT32(4), - object_id INTEGER + dataset_name TEXT(30) NOT NULL ); - """ execute_query_fetch_all(create_table_query) @@ -128,6 +128,9 @@ def test_should_load_csv_with_columns_in_table(self): expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) + # clean up + drop_query = "DROP TABLE MyVideoCSV;" + execute_query_fetch_all(drop_query) if __name__ == "__main__": suite = unittest.TestSuite() diff --git a/test/integration_tests/test_select_executor.py b/test/integration_tests/test_select_executor.py index 66c51cec6..bcc863864 100644 --- a/test/integration_tests/test_select_executor.py +++ b/test/integration_tests/test_select_executor.py @@ -248,7 +248,7 @@ def test_select_and_sample(self): # Disabling it for time being self.assertEqual(actual_batch, expected_batch[0]) - def test_aaselect_and_sample_with_predicate(self): + def test_select_and_sample_with_predicate(self): select_query = ( "SELECT name, id,data FROM MyVideo SAMPLE 2 WHERE id > 5 ORDER BY id;" ) From 177a637535ac5262c68deb8de324fb2ffafdedd7 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 11:14:28 -0400 Subject: [PATCH 13/18] style: ran black --- eva/storage/sqlite_storage_engine.py | 2 +- test/integration_tests/test_load_executor.py | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index f6b665e10..8b8185ec9 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -83,7 +83,7 @@ def drop(self, table: DataFrameMetadata): self._sql_session.commit() # In memory metadata does not automatically sync with the database # therefore manually removing the table from the in-memory metadata - # https://github.com/sqlalchemy/sqlalchemy/issues/5112 + # https://github.com/sqlalchemy/sqlalchemy/issues/5112 BaseModel.metadata.remove(table_to_remove) except Exception as e: logger.exception( diff --git a/test/integration_tests/test_load_executor.py b/test/integration_tests/test_load_executor.py index 9e76271db..4929502f4 100644 --- a/test/integration_tests/test_load_executor.py +++ b/test/integration_tests/test_load_executor.py @@ -91,7 +91,7 @@ def test_should_load_csv_in_table(self): expected_batch = create_dummy_csv_batches() expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) - + # clean up drop_query = "DROP TABLE MyVideoCSV;" execute_query_fetch_all(drop_query) @@ -131,9 +131,3 @@ def test_should_load_csv_with_columns_in_table(self): # clean up drop_query = "DROP TABLE MyVideoCSV;" execute_query_fetch_all(drop_query) - -if __name__ == "__main__": - suite = unittest.TestSuite() - suite.addTest(LoadExecutorTest("test_should_load_csv_in_table")) - runner = unittest.TextTestRunner() - runner.run(suite) From 40656da01d113b575a1e414f8d3dd2ee1db037e6 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 12:42:46 -0400 Subject: [PATCH 14/18] docs: remove old file --- api-docs/source/contribute/index.md | 72 ----------------------------- 1 file changed, 72 deletions(-) delete mode 100644 api-docs/source/contribute/index.md diff --git a/api-docs/source/contribute/index.md b/api-docs/source/contribute/index.md deleted file mode 100644 index 40f104758..000000000 --- a/api-docs/source/contribute/index.md +++ /dev/null @@ -1,72 +0,0 @@ -### Contributing -We welcome all kinds of contributions to EVA. -- New features -- Code reviewing of PR -- Documentation -- Tutorials and Applications - -#### Setting up the development environment -To hack on EVA, you need to checkout the repository and build EVA from the source. -Follow the following instructions to build EVA and test your changes locally. -We recommend using a virtual environment and the pip package manager. EVA requires JAVA 8 for generating the parser. -``` -git clone https://github.com/georgia-tech-db/eva.git && cd eva -python3 -m venv env38 # to create a virtual environment -. env38/bin/activate -pip install --upgrade pip -sudo -E apt install -y openjdk-8-jdk openjdk-8-jre # to install JAVA -sh script/antlr4/generate_parser.sh # to generate the EVA parser -pip install -e ".[dev]" -``` - -#### Submitting a contribution -Follow the following steps to contribute to EVA: -* Merge the most recent changes from the master branch -``` - git remote add origin git@github.com:georgia-tech-db/eva.git - git pull . origin/master -``` -* Run the [test script](#testing) to ensure all the test cases pass. -* Run the `setup_git_hooks.sh` to add a git pre-push hook so that it runs the linter before pushing any changes. -* If you are adding a new SQL command, please add the example usage to the documentation. - -#### Testing - -Before merging the PR, the code must pass all the unit test cases. You can use the following script to run all the test cases locally. -``` -bash script/test/test.sh -``` -If you want to run a specific test file, use the following command. -``` -python -m pytest test/integration_tests/test_select_executor.py -``` -Use the following command to run a specific test case within a test file. -``` -python -m pytest test/integration_tests/test_select_executor.py -k 'test_should_load_and_select_in_table' -``` - -#### Code Style -We use the [black](https://github.com/psf/black) code style for formatting our python code. For docstrings and documentation, we use [Google pydoc format](https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html). - -``` -def function_with_types_in_docstring(param1, param2) -> bool: - """Example function with types documented in the docstring. - - Additional explanatory text can be added in paragraphs. - - Args: - param1 (int): The first parameter. - param2 (str): The second parameter. - - Returns: - bool: The return value. True for success, False otherwise. - -``` - -##### Lint and Formatting -Before merging, the PR must pass the code formatting and linting test case. -On your local machine, run the following script to auto-format using `black` - -``` -python script/formatting/formatter.py -``` From 06c21209e4acae50f2ed2a85c4e35dfbfd5afd7b Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 13:17:45 -0400 Subject: [PATCH 15/18] fix: sqlalchemy does not support numpy data types --- eva/storage/sqlite_storage_engine.py | 5 +++++ test/storage/test_sqlite_storage_engine.py | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 8b8185ec9..6d51c8ad7 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -14,6 +14,7 @@ # limitations under the License. from typing import Iterator, List +import numpy as np import pandas as pd from sqlalchemy.ext.declarative import declarative_base @@ -48,6 +49,10 @@ def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): for col in columns: if col.type == ColumnType.NDARRAY: dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) + elif isinstance(dict_row[col.name], (np.generic,)): + # SqlAlchemy does not understand numpy geenric data types + # convert numpy datatype to python generic datatype + dict_row[col.name] = dict_row[col.name].tolist() return dict_row def _sql_row_to_dict(self, sql_row: tuple, columns: List[DataFrameColumn]): diff --git a/test/storage/test_sqlite_storage_engine.py b/test/storage/test_sqlite_storage_engine.py index c3f8c124b..88dbf9087 100644 --- a/test/storage/test_sqlite_storage_engine.py +++ b/test/storage/test_sqlite_storage_engine.py @@ -51,6 +51,8 @@ def test_should_create_empty_table(self): sqlengine.create(self.table) records = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertEqual(records, []) + # clean up + sqlengine.drop(self.table) def test_should_write_rows_to_table(self): dummy_batches = list(create_dummy_batches()) @@ -63,3 +65,5 @@ def test_should_write_rows_to_table(self): read_batch = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertTrue(read_batch, dummy_batches) + # clean up + sqlengine.drop(self.table) From bdf8c6ee58c561bc311d8e4bd23e1f08af62956e Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Thu, 27 Oct 2022 13:51:02 -0400 Subject: [PATCH 16/18] docs: improve docs --- eva/storage/sqlite_storage_engine.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 6d51c8ad7..390166833 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -50,8 +50,9 @@ def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): if col.type == ColumnType.NDARRAY: dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) elif isinstance(dict_row[col.name], (np.generic,)): - # SqlAlchemy does not understand numpy geenric data types + # SqlAlchemy does not consume numpy generic data types # convert numpy datatype to python generic datatype + # eg. np.int64 -> int dict_row[col.name] = dict_row[col.name].tolist() return dict_row @@ -86,7 +87,7 @@ def drop(self, table: DataFrameMetadata): table_to_remove = BaseModel.metadata.tables[table.name] table_to_remove.drop() self._sql_session.commit() - # In memory metadata does not automatically sync with the database + # In-memory metadata does not automatically sync with the database # therefore manually removing the table from the in-memory metadata # https://github.com/sqlalchemy/sqlalchemy/issues/5112 BaseModel.metadata.remove(table_to_remove) From b649a474789c9e607f3fed5297f3aa3ef62be558 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Mon, 31 Oct 2022 23:08:17 -0400 Subject: [PATCH 17/18] fix: address PR comments --- eva/storage/sqlite_storage_engine.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py index 390166833..a8a1d089a 100644 --- a/eva/storage/sqlite_storage_engine.py +++ b/eva/storage/sqlite_storage_engine.py @@ -16,7 +16,6 @@ import numpy as np import pandas as pd -from sqlalchemy.ext.declarative import declarative_base from eva.catalog.column_type import ColumnType from eva.catalog.models.base_model import BaseModel @@ -32,8 +31,6 @@ # Leveraging Dynamic schema in SQLAlchemy # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html -Base = declarative_base() - class SQLStorageEngine(AbstractStorageEngine): def __init__(self): @@ -51,8 +48,9 @@ def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) elif isinstance(dict_row[col.name], (np.generic,)): # SqlAlchemy does not consume numpy generic data types - # convert numpy datatype to python generic datatype + # convert numpy datatype to python generic datatype using tolist() # eg. np.int64 -> int + # https://stackoverflow.com/a/53067954 dict_row[col.name] = dict_row[col.name].tolist() return dict_row From 570081a5cc163203e4274671c449f8fc07b5f165 Mon Sep 17 00:00:00 2001 From: Gaurav Tarlok Kakkar Date: Mon, 31 Oct 2022 23:15:02 -0400 Subject: [PATCH 18/18] fix: merge issues --- setup.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/setup.py b/setup.py index 026a954db..1394542c7 100644 --- a/setup.py +++ b/setup.py @@ -44,13 +44,7 @@ def read(path, encoding="utf-8"): "Pillow==9.0.1", "sqlalchemy==1.3.20", "sqlalchemy-utils==0.36.6", -<<<<<<< HEAD - "antlr4-python3-runtime==4.10", -======= - "pyspark==3.0.2", - "petastorm==0.11.5", "antlr4-python3-runtime==4.8", ->>>>>>> c832a21359d0515d7a5346bdc03740545f7bd65a "pyyaml==5.1", "importlib-metadata<5.0" ]