Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Spark+Petastorm with Sqlite+SqlAlchemy #445

Merged
merged 24 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api-docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,3 @@ def setup(sphinx):
category=UserWarning,
message=r".*Container node skipped.*",
)

13 changes: 0 additions & 13 deletions eva/catalog/df_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,13 @@
from typing import List

from eva.catalog.models.df_column import DataFrameColumn
from eva.catalog.schema_utils import SchemaUtils


class DataFrameSchema(object):
def __init__(self, name: str, column_list: List[DataFrameColumn]):

self._name = name
self._column_list = column_list
self._petastorm_schema = SchemaUtils.get_petastorm_schema(
self._name, self._column_list
)
self._pyspark_schema = self._petastorm_schema.as_spark_schema()

def __str__(self):
schema_str = "SCHEMA:: (" + self._name + ")\n"
Expand All @@ -42,14 +37,6 @@ def name(self):
def column_list(self):
return self._column_list

@property
def petastorm_schema(self):
return self._petastorm_schema

@property
def pyspark_schema(self):
return self._pyspark_schema

def __eq__(self, other):
return self.name == other.name and self._column_list == other.column_list

Expand Down
2 changes: 1 addition & 1 deletion eva/catalog/models/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CustomModel:
"""

query = db_session.query_property()
_id = Column("id", Integer, primary_key=True)
_id = Column("_row_id", Integer, primary_key=True)

def __init__(self, **kwargs):
cls_ = type(self)
Expand Down
2 changes: 1 addition & 1 deletion eva/catalog/models/df_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DataFrameColumn(BaseModel):
_is_nullable = Column("is_nullable", Boolean, default=False)
_array_type = Column("array_type", Enum(NdArrayType), nullable=True)
_array_dimensions = Column("array_dimensions", String(100))
_metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata.id"))
_metadata_id = Column("metadata_id", Integer, ForeignKey("df_metadata._row_id"))

_dataset = relationship("DataFrameMetadata", back_populates="_columns")

Expand Down
2 changes: 1 addition & 1 deletion eva/catalog/models/udf_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class UdfIO(BaseModel):
_array_type = Column("array_type", Enum(NdArrayType), nullable=True)
_array_dimensions = Column("array_dimensions", String(100))
_is_input = Column("is_input", Boolean, default=True)
_udf_id = Column("udf_id", Integer, ForeignKey("udf.id"))
_udf_id = Column("udf_id", Integer, ForeignKey("udf._row_id"))
_udf = relationship("UdfMetadata", back_populates="_cols")

__table_args__ = (UniqueConstraint("name", "udf_id"), {})
Expand Down
90 changes: 24 additions & 66 deletions eva/catalog/schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,89 +12,47 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pandas as pd
from petastorm.codecs import NdarrayCodec, ScalarCodec
from petastorm.unischema import Unischema, UnischemaField
from pyspark.sql.types import FloatType, IntegerType, StringType
from typing import Dict, List

from eva.catalog.column_type import ColumnType, NdArrayType
from sqlalchemy import TEXT, Column, Float, Integer, LargeBinary

from eva.catalog.column_type import ColumnType
from eva.catalog.models.df_column import DataFrameColumn
from eva.utils.logging_manager import logger


class SchemaUtils(object):
@staticmethod
def get_petastorm_column(df_column):

def get_sqlalchemy_column(df_column: DataFrameColumn) -> Column:
column_type = df_column.type
column_name = df_column.name
column_is_nullable = df_column.is_nullable
column_array_type = df_column.array_type
column_array_dimensions = df_column.array_dimensions

# Reference:
# https://github.com/uber/petastorm/blob/master/petastorm/
# tests/test_common.py

petastorm_column = None
sqlalchemy_column = None
if column_type == ColumnType.INTEGER:
petastorm_column = UnischemaField(
column_name,
np.int64,
(),
ScalarCodec(IntegerType()),
column_is_nullable,
)
sqlalchemy_column = Column(Integer)
elif column_type == ColumnType.FLOAT:
petastorm_column = UnischemaField(
column_name,
np.float32,
(),
ScalarCodec(FloatType()),
column_is_nullable,
)
sqlalchemy_column = Column(Float)
elif column_type == ColumnType.TEXT:
petastorm_column = UnischemaField(
column_name, np.str_, (), ScalarCodec(StringType()), column_is_nullable
)
sqlalchemy_column = Column(TEXT)
elif column_type == ColumnType.NDARRAY:
np_type = NdArrayType.to_numpy_type(column_array_type)
petastorm_column = UnischemaField(
column_name,
np_type,
column_array_dimensions,
NdarrayCodec(),
column_is_nullable,
)
sqlalchemy_column = Column(LargeBinary)
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
else:
logger.error("Invalid column type: " + str(column_type))

return petastorm_column
return sqlalchemy_column

@staticmethod
def get_petastorm_schema(name, column_list):
petastorm_column_list = []
for _column in column_list:
petastorm_column = SchemaUtils.get_petastorm_column(_column)
petastorm_column_list.append(petastorm_column)
def get_sqlalchemy_schema(
column_list: List[DataFrameColumn],
) -> Dict[str, Column]:
"""Converts the list of DataFrameColumns to SQLAlchemyColumns

petastorm_schema = Unischema(name, petastorm_column_list)
return petastorm_schema
Args:
column_list (List[DataFrameColumn]): columns to be converted

@staticmethod
def petastorm_type_cast(schema: Unischema, df: pd.DataFrame) -> pd.DataFrame:
"""
Try to cast the type if schema defined in UnischemeField for
Petastorm is not consistent with panda DataFrame provided.
Returns:
Dict[str, Column]: mapping from column_name to sqlalchemy column object
"""
for unischema in schema.fields.values():
if not isinstance(unischema.codec, NdarrayCodec):
continue
# We only care when the cell data is np.ndarray
col = unischema.name
dtype = unischema.numpy_dtype
try:
df[col] = df[col].apply(lambda x: x.astype(dtype, copy=False))
except Exception:
logger.exception("Failed to cast %s to %s for Petastorm" % (col, dtype))
return df
return {
column.name: SchemaUtils.get_sqlalchemy_column(column)
for column in column_list
}
1 change: 1 addition & 0 deletions eva/catalog/services/df_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def drop_dataset_by_name(self, database_name: str, dataset_name: str):
try:
dataset = self.dataset_object_by_name(database_name, dataset_name)
dataset.delete()
return True
except Exception as e:
err_msg = "Delete dataset failed for name {} with error {}".format(
dataset_name, str(e)
Expand Down
15 changes: 1 addition & 14 deletions eva/eva.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,10 @@ executor:

storage:
upload_dir: ""
engine: "eva.storage.petastorm_storage_engine.PetastormStorageEngine"
engine: "eva.storage.sqlite_storage_engine.SQLStorageEngine"
video_engine: "eva.storage.opencv_storage_engine.OpenCVStorageEngine"
video_engine_version: 0

# https://petastorm.readthedocs.io/en/latest/api.html#module-petastorm.reader
petastorm: {'cache_type' : 'local-disk',
'cache_location' : '.cache',
'cache_size_limit' : 4000000000, #4gb
'cache_row_size_estimate' : 512}

pyspark:
property: {'spark.logConf': 'true',
'spark.driver.memory': '10g',
'spark.sql.execution.arrow.pyspark.enabled': 'true'}
coalesce: 2


server:
host: "0.0.0.0"
port: 5432
Expand Down
2 changes: 2 additions & 0 deletions eva/executor/drop_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from eva.catalog.catalog_manager import CatalogManager
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.planner.drop_plan import DropPlan
from eva.storage.storage_engine import StorageEngine, VideoStorageEngine
Expand Down Expand Up @@ -57,6 +58,7 @@ def exec(self):
if not success:
err_msg = "Failed to drop {}".format(table_ref)
logger.exception(err_msg)
raise ExecutorError(err_msg)

yield Batch(
pd.DataFrame(
Expand Down
80 changes: 0 additions & 80 deletions eva/readers/petastorm_reader.py

This file was deleted.

15 changes: 0 additions & 15 deletions eva/spark/__init__.py

This file was deleted.

Loading