diff --git a/api-docs/conf.py b/api-docs/conf.py index a6aec9136..1388cd359 100644 --- a/api-docs/conf.py +++ b/api-docs/conf.py @@ -114,4 +114,3 @@ def setup(sphinx): category=UserWarning, message=r".*Container node skipped.*", ) - diff --git a/eva/catalog/df_schema.py b/eva/catalog/df_schema.py index 2536945db..0c7a2090c 100644 --- a/eva/catalog/df_schema.py +++ b/eva/catalog/df_schema.py @@ -15,7 +15,6 @@ from typing import List from eva.catalog.models.df_column import DataFrameColumn -from eva.catalog.schema_utils import SchemaUtils class DataFrameSchema(object): @@ -23,10 +22,6 @@ def __init__(self, name: str, column_list: List[DataFrameColumn]): self._name = name self._column_list = column_list - self._petastorm_schema = SchemaUtils.get_petastorm_schema( - self._name, self._column_list - ) - self._pyspark_schema = self._petastorm_schema.as_spark_schema() def __str__(self): schema_str = "SCHEMA:: (" + self._name + ")\n" @@ -42,14 +37,6 @@ def name(self): def column_list(self): return self._column_list - @property - def petastorm_schema(self): - return self._petastorm_schema - - @property - def pyspark_schema(self): - return self._pyspark_schema - def __eq__(self, other): return self.name == other.name and self._column_list == other.column_list diff --git a/eva/catalog/models/base_model.py b/eva/catalog/models/base_model.py index 3d194432d..c32af1afd 100644 --- a/eva/catalog/models/base_model.py +++ b/eva/catalog/models/base_model.py @@ -33,7 +33,7 @@ class CustomModel: """ query = db_session.query_property() - _id = Column("id", Integer, primary_key=True) + _id = Column("_row_id", Integer, primary_key=True) def __init__(self, **kwargs): cls_ = type(self) diff --git a/eva/catalog/models/df_column.py b/eva/catalog/models/df_column.py index 95b348a9e..04164820d 100644 --- a/eva/catalog/models/df_column.py +++ b/eva/catalog/models/df_column.py @@ -31,7 +31,7 @@ class DataFrameColumn(BaseModel): _is_nullable = Column("is_nullable", Boolean, default=False) _array_type = Column("array_type", Enum(NdArrayType), nullable=True) _array_dimensions = Column("array_dimensions", String(100)) - _metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata.id")) + _metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata._row_id")) _dataset = relationship("DataFrameMetadata", back_populates="_columns") diff --git a/eva/catalog/models/udf_io.py b/eva/catalog/models/udf_io.py index 2b5b74a80..f0da3bea4 100644 --- a/eva/catalog/models/udf_io.py +++ b/eva/catalog/models/udf_io.py @@ -32,7 +32,7 @@ class UdfIO(BaseModel): _array_type = Column("array_type", Enum(NdArrayType), nullable=True) _array_dimensions = Column("array_dimensions", String(100)) _is_input = Column("is_input", Boolean, default=True) - _udf_id = Column("udf_id", Integer, ForeignKey("udf.id")) + _udf_id = Column("udf_id", Integer, ForeignKey("udf._row_id")) _udf = relationship("UdfMetadata", back_populates="_cols") __table_args__ = (UniqueConstraint("name", "udf_id"), {}) diff --git a/eva/catalog/schema_utils.py b/eva/catalog/schema_utils.py index 6985e8965..08c58cdfb 100644 --- a/eva/catalog/schema_utils.py +++ b/eva/catalog/schema_utils.py @@ -12,89 +12,47 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import numpy as np -import pandas as pd -from petastorm.codecs import NdarrayCodec, ScalarCodec -from petastorm.unischema import Unischema, UnischemaField -from pyspark.sql.types import FloatType, IntegerType, StringType +from typing import Dict, List -from eva.catalog.column_type import ColumnType, NdArrayType +from sqlalchemy import TEXT, Column, Float, Integer, LargeBinary + +from eva.catalog.column_type import ColumnType +from eva.catalog.models.df_column import DataFrameColumn from eva.utils.logging_manager import logger class SchemaUtils(object): @staticmethod - def get_petastorm_column(df_column): - + def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column: column_type = df_column.type - column_name = df_column.name - column_is_nullable = df_column.is_nullable - column_array_type = df_column.array_type - column_array_dimensions = df_column.array_dimensions - - # Reference: - # https://github.com/uber/petastorm/blob/master/petastorm/ - # tests/test_common.py - petastorm_column = None + sqlalchemy_column = None if column_type == ColumnType.INTEGER: - petastorm_column = UnischemaField( - column_name, - np.int64, - (), - ScalarCodec(IntegerType()), - column_is_nullable, - ) + sqlalchemy_column = Column(Integer) elif column_type == ColumnType.FLOAT: - petastorm_column = UnischemaField( - column_name, - np.float32, - (), - ScalarCodec(FloatType()), - column_is_nullable, - ) + sqlalchemy_column = Column(Float) elif column_type == ColumnType.TEXT: - petastorm_column = UnischemaField( - column_name, np.str_, (), ScalarCodec(StringType()), column_is_nullable - ) + sqlalchemy_column = Column(TEXT) elif column_type == ColumnType.NDARRAY: - np_type = NdArrayType.to_numpy_type(column_array_type) - petastorm_column = UnischemaField( - column_name, - np_type, - column_array_dimensions, - NdarrayCodec(), - column_is_nullable, - ) + sqlalchemy_column = Column(LargeBinary) else: logger.error("Invalid column type: " + str(column_type)) - return petastorm_column + return sqlalchemy_column @staticmethod - def get_petastorm_schema(name, column_list): - petastorm_column_list = [] - for _column in column_list: - petastorm_column = SchemaUtils.get_petastorm_column(_column) - petastorm_column_list.append(petastorm_column) + def get_sqlalchemy_schema( + column_list: List[DataFrameColumn], + ) -> Dict[str, Column]: + """Converts the list of DataFrameColumns to SQLAlchemyColumns - petastorm_schema = Unischema(name, petastorm_column_list) - return petastorm_schema + Args: + column_list (List[DataFrameColumn]): columns to be converted - @staticmethod - def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) -> pd.DataFrame: - """ - Try to cast the type if schema defined in UnischemeField for - Petastorm is not consistent with panda DataFrame provided. + Returns: + Dict[str, Column]: mapping from column_name to sqlalchemy column object """ - for unischema in schema.fields.values(): - if not isinstance(unischema.codec, NdarrayCodec): - continue - # We only care when the cell data is np.ndarray - col = unischema.name - dtype = unischema.numpy_dtype - try: - df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False)) - except Exception: - logger.exception("Failed to cast %s to %s for Petastorm" % (col, dtype)) - return df + return { + column.name: SchemaUtils.get_sqlalchemy_column(column) + for column in column_list + } diff --git a/eva/catalog/services/df_service.py b/eva/catalog/services/df_service.py index f0368a400..66c171d1a 100644 --- a/eva/catalog/services/df_service.py +++ b/eva/catalog/services/df_service.py @@ -104,6 +104,7 @@ def drop_dataset_by_name(self, database_name: str, dataset_name: str): try: dataset = self.dataset_object_by_name(database_name, dataset_name) dataset.delete() + return True except Exception as e: err_msg = "Delete dataset failed for name {} with error {}".format( dataset_name, str(e) diff --git a/eva/eva.yml b/eva/eva.yml index 7497007a5..b7d72e0b7 100644 --- a/eva/eva.yml +++ b/eva/eva.yml @@ -17,23 +17,10 @@ executor: storage: upload_dir: "" - engine: "eva.storage.petastorm_storage_engine.PetastormStorageEngine" + engine: "eva.storage.sqlite_storage_engine.SQLStorageEngine" video_engine: "eva.storage.opencv_storage_engine.OpenCVStorageEngine" video_engine_version: 0 - # https://petastorm.readthedocs.io/en/latest/api.html#module-petastorm.reader - petastorm: {'cache_type' : 'local-disk', - 'cache_location' : '.cache', - 'cache_size_limit' : 4000000000, #4gb - 'cache_row_size_estimate' : 512} - -pyspark: - property: {'spark.logConf': 'true', - 'spark.driver.memory': '10g', - 'spark.sql.execution.arrow.pyspark.enabled': 'true'} - coalesce: 2 - - server: host: "0.0.0.0" port: 5432 diff --git a/eva/executor/drop_executor.py b/eva/executor/drop_executor.py index 73fd3e31b..e94593a01 100644 --- a/eva/executor/drop_executor.py +++ b/eva/executor/drop_executor.py @@ -16,6 +16,7 @@ from eva.catalog.catalog_manager import CatalogManager from eva.executor.abstract_executor import AbstractExecutor +from eva.executor.executor_utils import ExecutorError from eva.models.storage.batch import Batch from eva.planner.drop_plan import DropPlan from eva.storage.storage_engine import StorageEngine, VideoStorageEngine @@ -57,6 +58,7 @@ def exec(self): if not success: err_msg = "Failed to drop {}".format(table_ref) logger.exception(err_msg) + raise ExecutorError(err_msg) yield Batch( pd.DataFrame( diff --git a/eva/readers/petastorm_reader.py b/eva/readers/petastorm_reader.py deleted file mode 100644 index 41f3f9073..000000000 --- a/eva/readers/petastorm_reader.py +++ /dev/null @@ -1,80 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Dict, Iterator - -from petastorm import make_reader - -from eva.configuration.configuration_manager import ConfigurationManager -from eva.readers.abstract_reader import AbstractReader - - -class PetastormReader(AbstractReader): - def __init__( - self, *args, cur_shard=None, shard_count=None, predicate=None, **kwargs - ): - """ - Reads data from the petastorm parquet stores. Note this won't - work for any arbitary parquet store apart from one materialized - using petastorm. In order to generalize, we might have to replace - `make_reader` with `make_batch_reader`. - https://petastorm.readthedocs.io/en/latest/api.html#module-petastorm.reader - - Attributes: - cur_shard (int, optional): Shard number to load from if sharded - shard_count (int, optional): Specify total number of shards if - applicable - predicate (PredicateBase, optional): instance of predicate object - to filter rows to be returned by reader - cache_type (str): the cache type, if desired. - Options are [None, ‘null’, ‘local-disk’] to either have a - null/noop cache or a cache implemented using diskcache. - cache_location (int): the location or path of the cache. - cache_size_limit (int): the size limit of the cache in bytes - cache_row_size_estimate (int): the estimated size of a row - """ - self.cur_shard = cur_shard - self.shard_count = shard_count - self.predicate = predicate - petastorm_config = ConfigurationManager().get_value("storage", "petastorm") - # cache not allowed with predicates - if self.predicate or petastorm_config is None: - petastorm_config = {} - self.cache_type = petastorm_config.get("cache_type", None) - self.cache_location = petastorm_config.get("cache_location", None) - self.cache_size_limit = petastorm_config.get("cache_size_limit", None) - self.cache_row_size_estimate = petastorm_config.get( - "cache_row_size_estimate", None - ) - super().__init__(*args, **kwargs) - if self.cur_shard is not None and self.cur_shard <= 0: - self.cur_shard = None - - if self.shard_count is not None and self.shard_count <= 0: - self.shard_count = None - - def _read(self) -> Iterator[Dict]: - # `Todo`: Generalize this reader - with make_reader( - self.file_url, - shard_count=self.shard_count, - cur_shard=self.cur_shard, - predicate=self.predicate, - cache_type=self.cache_type, - cache_location=self.cache_location, - cache_size_limit=self.cache_size_limit, - cache_row_size_estimate=self.cache_row_size_estimate, - ) as reader: - for row in reader: - yield row._asdict() diff --git a/eva/spark/__init__.py b/eva/spark/__init__.py deleted file mode 100644 index 3765a7c41..000000000 --- a/eva/spark/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""creates a spark session""" diff --git a/eva/spark/session.py b/eva/spark/session.py deleted file mode 100644 index b62c44528..000000000 --- a/eva/spark/session.py +++ /dev/null @@ -1,81 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from pyspark.conf import SparkConf -from pyspark.sql import SparkSession - -from eva.configuration.configuration_manager import ConfigurationManager - - -class Session(object): - """ - Wrapper around Spark Session - """ - - _instance = None - _session = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super(Session, cls).__new__(cls) - return cls._instance - - def __init__(self): - self._config = ConfigurationManager() - name = self._config.get_value("core", "application") - self.init_spark_session(name) - - def init_spark_session(self, application_name, spark_master=None): - """Setup a spark session. - - :param spark_master: A master parameter used by spark session builder. - Use default value (None) to use system - environment configured spark cluster. - Use 'local[*]' to run on a local box. - - :return: spark_session: A spark session - """ - - eva_spark_conf = SparkConf() - pyspark_config = self._config.get_value("pyspark", "property") - for key, value in pyspark_config.items(): - eva_spark_conf.set(key, value) - - session_builder = SparkSession.builder.appName(application_name).config( - conf=eva_spark_conf - ) - - if spark_master: - session_builder.master(spark_master) - - # Gets an existing SparkSession or, - # if there is no existing one, creates a new one based - # on the options set in this builder. - self._session = session_builder.getOrCreate() - - # Configure logging - spark_context = self._session.sparkContext - spark_context.setLogLevel("OFF") - - def get_session(self): - return self._session - - def get_context(self): - return self._session.sparkContext - - def stop(self): - self._session.stop() - - def __del__(self): - self._session.stop() diff --git a/eva/storage/petastorm_storage_engine.py b/eva/storage/petastorm_storage_engine.py deleted file mode 100644 index ba4937168..000000000 --- a/eva/storage/petastorm_storage_engine.py +++ /dev/null @@ -1,133 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import shutil -from pathlib import Path -from typing import Iterator, List - -from petastorm.etl.dataset_metadata import materialize_dataset -from petastorm.predicates import in_lambda -from petastorm.unischema import dict_to_spark_row - -from eva.catalog.models.df_metadata import DataFrameMetadata -from eva.configuration.configuration_manager import ConfigurationManager -from eva.models.storage.batch import Batch -from eva.readers.petastorm_reader import PetastormReader -from eva.spark.session import Session -from eva.storage.abstract_storage_engine import AbstractStorageEngine -from eva.utils.logging_manager import logger - - -class PetastormStorageEngine(AbstractStorageEngine): - def __init__(self): - """ - Maintain a long live spark session and context. - """ - self._spark = Session() - self.spark_session = self._spark.get_session() - self.spark_context = self._spark.get_context() - self.coalesce = ConfigurationManager().get_value("pyspark", "coalesce") - - def _spark_url(self, table: DataFrameMetadata) -> str: - """ - Generate a spark/petastorm url given a table - """ - return Path(table.file_url).resolve().as_uri() - - def create(self, table: DataFrameMetadata, **kwargs): - """ - Create an empty dataframe in petastorm. - """ - empty_rdd = self.spark_context.emptyRDD() - - with materialize_dataset( - self.spark_session, self._spark_url(table), table.schema.petastorm_schema - ): - - self.spark_session.createDataFrame( - empty_rdd, table.schema.pyspark_schema - ).coalesce(self.coalesce).write.mode("overwrite").parquet( - self._spark_url(table) - ) - - def drop(self, table: DataFrameMetadata): - dir_path = self._spark_url(table) - try: - shutil.rmtree(str(dir_path)) - except Exception as e: - logger.exception(f"Failed to drop the video table {e}") - - def write(self, table: DataFrameMetadata, rows: Batch): - """ - Write rows into the dataframe. - - Arguments: - table: table metadata object to write into - rows : batch to be persisted in the storage. - """ - - if rows.empty(): - return - # ToDo - # Throw an error if the row schema doesn't match the table schema - - with materialize_dataset( - self.spark_session, self._spark_url(table), table.schema.petastorm_schema - ): - - records = rows.frames - columns = records.keys() - rows_rdd = ( - self.spark_context.parallelize(records.values) - .map(lambda x: dict(zip(columns, x))) - .map(lambda x: dict_to_spark_row(table.schema.petastorm_schema, x)) - ) - self.spark_session.createDataFrame( - rows_rdd, table.schema.pyspark_schema - ).coalesce(self.coalesce).write.mode("append").parquet( - self._spark_url(table) - ) - - def read( - self, - table: DataFrameMetadata, - batch_mem_size: int, - columns: List[str] = None, - predicate_func=None, - ) -> Iterator[Batch]: - """ - Reads the table and return a batch iterator for the - tuples that passes the predicate func. - - Argument: - table: table metadata object to write into - batch_mem_size (int): memory size of the batch read from storage - columns (List[str]): A list of column names to be - considered in predicate_func - predicate_func: customized predicate function returns bool - - Return: - Iterator of Batch read. - """ - predicate = None - if predicate_func and columns: - predicate = in_lambda(columns, predicate_func) - - # ToDo: Handle the sharding logic. We might have to maintain a - # context for deciding which shard to read - reader = PetastormReader( - self._spark_url(table), batch_mem_size=batch_mem_size, predicate=predicate - ) - for batch in reader.read(): - yield batch diff --git a/eva/storage/sqlite_storage_engine.py b/eva/storage/sqlite_storage_engine.py new file mode 100644 index 000000000..a8a1d089a --- /dev/null +++ b/eva/storage/sqlite_storage_engine.py @@ -0,0 +1,146 @@ +# coding=utf-8 +# Copyright 2018-2022 EVA +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Iterator, List + +import numpy as np +import pandas as pd + +from eva.catalog.column_type import ColumnType +from eva.catalog.models.base_model import BaseModel +from eva.catalog.models.df_column import DataFrameColumn +from eva.catalog.models.df_metadata import DataFrameMetadata +from eva.catalog.schema_utils import SchemaUtils +from eva.catalog.sql_config import SQLConfig +from eva.models.storage.batch import Batch +from eva.storage.abstract_storage_engine import AbstractStorageEngine +from eva.utils.generic_utils import PickleSerializer, get_size +from eva.utils.logging_manager import logger + +# Leveraging Dynamic schema in SQLAlchemy +# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + + +class SQLStorageEngine(AbstractStorageEngine): + def __init__(self): + """ + Grab the existing sql session + """ + self._sql_session = SQLConfig().session + self._sql_engine = SQLConfig().engine + self._serializer = PickleSerializer() + + def _dict_to_sql_row(self, dict_row: dict, columns: List[DataFrameColumn]): + # Serialize numpy data + for col in columns: + if col.type == ColumnType.NDARRAY: + dict_row[col.name] = self._serializer.serialize(dict_row[col.name]) + elif isinstance(dict_row[col.name], (np.generic,)): + # SqlAlchemy does not consume numpy generic data types + # convert numpy datatype to python generic datatype using tolist() + # eg. np.int64 -> int + # https://stackoverflow.com/a/53067954 + dict_row[col.name] = dict_row[col.name].tolist() + return dict_row + + def _sql_row_to_dict(self, sql_row: tuple, columns: List[DataFrameColumn]): + # Deserialize numpy data + dict_row = {} + for idx, col in enumerate(columns): + if col.type == ColumnType.NDARRAY: + dict_row[col.name] = self._serializer.deserialize(sql_row[idx]) + else: + dict_row[col.name] = sql_row[idx] + return dict_row + + def create(self, table: DataFrameMetadata, **kwargs): + """ + Create an empty table in sql. + It dynamically constructs schema in sqlaclchemy + to create the table + """ + attr_dict = {"__tablename__": table.name} + sqlalchemy_schema = SchemaUtils.get_sqlalchemy_schema(table.columns) + attr_dict.update(sqlalchemy_schema) + # dynamic schema generation + # https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html + new_table = type("__placeholder_class_name", (BaseModel,), attr_dict)() + BaseModel.metadata.tables[table.name].create(self._sql_engine) + self._sql_session.commit() + return new_table + + def drop(self, table: DataFrameMetadata): + try: + table_to_remove = BaseModel.metadata.tables[table.name] + table_to_remove.drop() + self._sql_session.commit() + # In-memory metadata does not automatically sync with the database + # therefore manually removing the table from the in-memory metadata + # https://github.com/sqlalchemy/sqlalchemy/issues/5112 + BaseModel.metadata.remove(table_to_remove) + except Exception as e: + logger.exception( + f"Failed to drop the table {table.name} with Exception {str(e)}" + ) + + def write(self, table: DataFrameMetadata, rows: Batch): + """ + Write rows into the sql table. + + Arguments: + table: table metadata object to write into + rows : batch to be persisted in the storage. + """ + new_table = BaseModel.metadata.tables[table.name] + columns = rows.frames.keys() + data = [] + # ToDo: validate the data type before inserting into the table + for record in rows.frames.values: + row_data = {col: record[idx] for idx, col in enumerate(columns)} + data.append(self._dict_to_sql_row(row_data, table.columns)) + self._sql_engine.execute(new_table.insert(), data) + self._sql_session.commit() + + def read( + self, + table: DataFrameMetadata, + batch_mem_size: int, + ) -> Iterator[Batch]: + """ + Reads the table and return a batch iterator for the + tuples. + + Argument: + table: table metadata object of teh table to read + batch_mem_size (int): memory size of the batch read from storage + Return: + Iterator of Batch read. + """ + + new_table = BaseModel.metadata.tables[table.name] + result = self._sql_engine.execute(new_table.select()) + data_batch = [] + row_size = None + for row in result: + # Todo: Verfiy the order of columns in row matches the table.columns + # ignore the first dummy (_row_id) primary column + data_batch.append(self._sql_row_to_dict(row[1:], table.columns)) + if row_size is None: + row_size = 0 + row_size = get_size(data_batch) + if len(data_batch) * row_size >= batch_mem_size: + yield Batch(pd.DataFrame(data_batch)) + data_batch = [] + if data_batch: + yield Batch(pd.DataFrame(data_batch)) diff --git a/eva/utils/generic_utils.py b/eva/utils/generic_utils.py index f852fe7cb..51029811d 100644 --- a/eva/utils/generic_utils.py +++ b/eva/utils/generic_utils.py @@ -14,6 +14,7 @@ # limitations under the License. import hashlib import importlib +import pickle import sys import uuid from pathlib import Path @@ -130,3 +131,11 @@ def get_size(obj, seen=None): elif hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes, bytearray)): size += sum([get_size(i, seen) for i in obj]) return size + + +class PickleSerializer(object): + def serialize(self, data): + return pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL) + + def deserialize(self, data): + return pickle.loads(data) diff --git a/setup.py b/setup.py index ab9e628a3..1394542c7 100644 --- a/setup.py +++ b/setup.py @@ -44,8 +44,6 @@ def read(path, encoding="utf-8"): "Pillow==9.0.1", "sqlalchemy==1.3.20", "sqlalchemy-utils==0.36.6", - "pyspark==3.0.2", - "petastorm==0.11.5", "antlr4-python3-runtime==4.8", "pyyaml==5.1", "importlib-metadata<5.0" @@ -165,4 +163,4 @@ def read(path, encoding="utf-8"): extras_require=EXTRA_REQUIRES, include_package_data=True, package_data={"eva": ["eva.yml"]} -) +) \ No newline at end of file diff --git a/test/catalog/test_schema.py b/test/catalog/test_schema.py index b536290f9..afad977b6 100644 --- a/test/catalog/test_schema.py +++ b/test/catalog/test_schema.py @@ -13,112 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import unittest -from decimal import Decimal -from unittest.mock import MagicMock, call, patch - -import numpy as np -from petastorm.codecs import NdarrayCodec, ScalarCodec -from petastorm.unischema import UnischemaField -from pyspark.sql.types import FloatType, IntegerType, StringType from eva.catalog.column_type import ColumnType, NdArrayType from eva.catalog.df_schema import DataFrameSchema from eva.catalog.models.df_column import DataFrameColumn -from eva.catalog.schema_utils import SchemaUtils class SchemaTests(unittest.TestCase): # TEST SCHEMA UTILS START - def test_get_petastorm_column(self): - col_name = "frame_id" - col = DataFrameColumn(col_name, ColumnType.INTEGER, False) - petastorm_col = UnischemaField( - col_name, np.int64, (), ScalarCodec(IntegerType()), False - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, ColumnType.FLOAT, True) - petastorm_col = UnischemaField( - col_name, np.float32, (), ScalarCodec(FloatType()), True - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, ColumnType.TEXT, False) - petastorm_col = UnischemaField( - col_name, np.str_, (), ScalarCodec(StringType()), False - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - col = DataFrameColumn(col_name, None, True, [10, 10]) - self.assertEqual(SchemaUtils.get_petastorm_column(col), None) - - def test_get_petastorm_column_ndarray(self): - expected_type = [ - np.int8, - np.uint8, - np.int16, - np.int32, - np.int64, - np.unicode_, - np.bool_, - np.float32, - np.float64, - Decimal, - np.str_, - np.datetime64, - ] - col_name = "frame_id" - for array_type, np_type in zip(NdArrayType, expected_type): - col = DataFrameColumn( - col_name, ColumnType.NDARRAY, True, array_type, [10, 10] - ) - petastorm_col = UnischemaField( - col_name, np_type, [10, 10], NdarrayCodec(), True - ) - self.assertEqual(SchemaUtils.get_petastorm_column(col), petastorm_col) - - def test_raise_exception_when_unkown_array_type(self): - col_name = "frame_id" - col = DataFrameColumn( - col_name, ColumnType.NDARRAY, True, ColumnType.TEXT, [10, 10] - ) - self.assertRaises(ValueError, SchemaUtils.get_petastorm_column, col) - - @patch("eva.catalog.schema_utils.Unischema") - @patch("eva.catalog.schema_utils.SchemaUtils.get_petastorm_column") - def test_get_petastorm_schema(self, mock_get_pc, mock_uni): - cols = [MagicMock() for i in range(2)] - mock_get_pc.side_effect = [1, 2] - self.assertEqual( - SchemaUtils.get_petastorm_schema("name", cols), mock_uni.return_value - ) - mock_get_pc.assert_has_calls([call(cols[0]), call(cols[1])]) - mock_uni.assert_called_once_with("name", [1, 2]) - - # TEST SCHEMA UTILS END - - # TEST DF_SCHEMA START - def test_df_schema(self): - schema_name = "foo" - column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) - column_2 = DataFrameColumn( - "frame_data", ColumnType.NDARRAY, False, NdArrayType.UINT8, [28, 28] - ) - column_3 = DataFrameColumn("frame_label", ColumnType.INTEGER, False) - col_list = [column_1, column_2, column_3] - schema = DataFrameSchema(schema_name, col_list) - expected_schema = SchemaUtils.get_petastorm_schema(schema_name, col_list) - self.assertEqual(schema.name, schema_name) - self.assertEqual(schema.column_list, col_list) - self.assertEqual(schema.petastorm_schema.fields, expected_schema.fields) - for field1, field2 in zip( - schema.petastorm_schema.fields, expected_schema.fields - ): - self.assertEqual(field1, field2) - self.assertEqual(schema.pyspark_schema, expected_schema.as_spark_schema()) - def test_schema_equality(self): schema_name = "foo" column_1 = DataFrameColumn("frame_id", ColumnType.INTEGER, False) diff --git a/test/integration_tests/test_load_executor.py b/test/integration_tests/test_load_executor.py index f48fc24b9..4929502f4 100644 --- a/test/integration_tests/test_load_executor.py +++ b/test/integration_tests/test_load_executor.py @@ -92,6 +92,10 @@ def test_should_load_csv_in_table(self): expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) + # clean up + drop_query = "DROP TABLE MyVideoCSV;" + execute_query_fetch_all(drop_query) + def test_should_load_csv_with_columns_in_table(self): # loading a csv requires a table to be created first @@ -101,12 +105,8 @@ def test_should_load_csv_with_columns_in_table(self): id INTEGER UNIQUE, frame_id INTEGER NOT NULL, video_id INTEGER NOT NULL, - dataset_name TEXT(30) NOT NULL, - label TEXT(30), - bbox NDARRAY FLOAT32(4), - object_id INTEGER + dataset_name TEXT(30) NOT NULL ); - """ execute_query_fetch_all(create_table_query) @@ -127,3 +127,7 @@ def test_should_load_csv_with_columns_in_table(self): expected_batch = create_dummy_csv_batches(target_columns=select_columns) expected_batch.modify_column_alias("myvideocsv") self.assertEqual(actual_batch, expected_batch) + + # clean up + drop_query = "DROP TABLE MyVideoCSV;" + execute_query_fetch_all(drop_query) diff --git a/test/integration_tests/test_select_executor.py b/test/integration_tests/test_select_executor.py index 5feeb1724..8a1f3370c 100644 --- a/test/integration_tests/test_select_executor.py +++ b/test/integration_tests/test_select_executor.py @@ -247,7 +247,7 @@ def test_select_and_sample(self): # Disabling it for time being self.assertEqual(actual_batch, expected_batch[0]) - def test_aaselect_and_sample_with_predicate(self): + def test_select_and_sample_with_predicate(self): select_query = ( "SELECT name, id,data FROM MyVideo SAMPLE 2 WHERE id > 5 ORDER BY id;" ) diff --git a/test/readers/test_petastorm_reader.py b/test/readers/test_petastorm_reader.py deleted file mode 100644 index 61b75b65d..000000000 --- a/test/readers/test_petastorm_reader.py +++ /dev/null @@ -1,102 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import unittest -from test.util import upload_dir_from_config -from unittest.mock import patch - -import numpy as np - -from eva.configuration.configuration_manager import ConfigurationManager -from eva.readers.petastorm_reader import PetastormReader - - -class PetastormLoaderTest(unittest.TestCase): - class DummyRow: - def __init__(self, frame_id, frame_data): - self.frame_id = frame_id - self.frame_data = frame_data - - def _asdict(self): - return {"id": self.frame_id, "data": self.frame_data} - - class DummyReader: - def __init__(self, data): - self.data = data - - def __enter__(self): - return self - - def __iter__(self): - return self.data - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_call_petastorm_make_reader_with_correct_params(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - cur_shard=2, - shard_count=3, - predicate="pred", - ) - list(petastorm_reader._read()) - mock.assert_called_once_with( - os.path.join(upload_dir_from_config, "dummy.avi"), - shard_count=3, - cur_shard=2, - predicate="pred", - cache_type=None, - cache_location=None, - cache_size_limit=None, - cache_row_size_estimate=None, - ) - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_call_petastorm_make_reader_with_negative_shards(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - cur_shard=-1, - shard_count=-2, - ) - list(petastorm_reader._read()) - petastorm_config = ConfigurationManager().get_value("storage", "petastorm") - mock.assert_called_once_with( - os.path.join(upload_dir_from_config, "dummy.avi"), - shard_count=None, - cur_shard=None, - predicate=None, - cache_location=petastorm_config.get("cache_location", None), - cache_row_size_estimate=petastorm_config.get( - "cache_row_size_estimate", None - ), - cache_size_limit=petastorm_config.get("cache_size_limit", None), - cache_type=petastorm_config.get("cache_type", None), - ) - - @patch("eva.readers.petastorm_reader.make_reader") - def test_should_read_data_using_petastorm_reader(self, mock): - petastorm_reader = PetastormReader( - file_url=os.path.join(upload_dir_from_config, "dummy.avi"), - batch_mem_size=3000, - ) - dummy_values = map(lambda i: self.DummyRow(i, np.ones((2, 2, 3)) * i), range(3)) - mock.return_value = self.DummyReader(dummy_values) - actual = list(petastorm_reader._read()) - expected = list(dummy_values) - self.assertTrue(all([np.allclose(i, j) for i, j in zip(actual, expected)])) diff --git a/test/spark/__init__.py b/test/spark/__init__.py deleted file mode 100644 index ccbb30dee..000000000 --- a/test/spark/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/test/spark/test_session.py b/test/spark/test_session.py deleted file mode 100644 index 35f034102..000000000 --- a/test/spark/test_session.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=utf-8 -# Copyright 2018-2022 EVA -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import unittest - -from pyspark.sql import SparkSession - -from eva.spark.session import Session - - -class SparkSessionTest(unittest.TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def setUp(self): - self.session = Session() - - # def tearDown(self): - # self.session = Session() - # self.session.stop() - - def test_session(self): - - spark_session = self.session.get_session() - - session2 = Session() - self.assertEqual(self.session, session2) - self.assertIsInstance(spark_session, SparkSession) diff --git a/test/storage/test_petastorm_storage_engine.py b/test/storage/test_sqlite_storage_engine.py similarity index 60% rename from test/storage/test_petastorm_storage_engine.py rename to test/storage/test_sqlite_storage_engine.py index f6ce4defd..88dbf9087 100644 --- a/test/storage/test_petastorm_storage_engine.py +++ b/test/storage/test_sqlite_storage_engine.py @@ -14,15 +14,15 @@ # limitations under the License. import shutil import unittest -from test.util import NUM_FRAMES, create_dummy_batches +from test.util import create_dummy_batches from eva.catalog.column_type import ColumnType, NdArrayType from eva.catalog.models.df_column import DataFrameColumn from eva.catalog.models.df_metadata import DataFrameMetadata -from eva.storage.petastorm_storage_engine import PetastormStorageEngine +from eva.storage.sqlite_storage_engine import SQLStorageEngine -class PetastormStorageEngineTest(unittest.TestCase): +class SQLStorageEngineTest(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.table = None @@ -47,41 +47,23 @@ def tearDown(self): pass def test_should_create_empty_table(self): - petastorm = PetastormStorageEngine() - petastorm.create(self.table) - records = list(petastorm.read(self.table, batch_mem_size=3000)) + sqlengine = SQLStorageEngine() + sqlengine.create(self.table) + records = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertEqual(records, []) + # clean up + sqlengine.drop(self.table) def test_should_write_rows_to_table(self): dummy_batches = list(create_dummy_batches()) - petastorm = PetastormStorageEngine() - petastorm.create(self.table) + sqlengine = SQLStorageEngine() + sqlengine.create(self.table) for batch in dummy_batches: batch.drop_column_alias() - petastorm.write(self.table, batch) + sqlengine.write(self.table, batch) - read_batch = list(petastorm.read(self.table, batch_mem_size=3000)) + read_batch = list(sqlengine.read(self.table, batch_mem_size=3000)) self.assertTrue(read_batch, dummy_batches) - - def test_should_return_even_frames(self): - dummy_batches = list(create_dummy_batches()) - - petastorm = PetastormStorageEngine() - petastorm.create(self.table) - for batch in dummy_batches: - batch.drop_column_alias() - petastorm.write(self.table, batch) - - read_batch = list( - petastorm.read( - self.table, - batch_mem_size=3000, - columns=["id"], - predicate_func=lambda id: id % 2 == 0, - ) - ) - expected_batch = list( - create_dummy_batches(filters=[i for i in range(NUM_FRAMES) if i % 2 == 0]) - ) - self.assertTrue(read_batch, expected_batch) + # clean up + sqlengine.drop(self.table)