Skip to content

Commit

Permalink
feat: Add delta format to FileSource, add support for it in ibis/du…
Browse files Browse the repository at this point in the history
…ckdb (#4123)
  • Loading branch information
tokoko authored Apr 20, 2024
1 parent 2a6edea commit 2b6f1d0
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 96 deletions.
4 changes: 4 additions & 0 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ message FileFormat {
// Defines options for the Parquet data format
message ParquetFormat {}

// Defines options for delta data format
message DeltaFormat {}

oneof format {
ParquetFormat parquet_format = 1;
DeltaFormat delta_format = 2;
}
}

Expand Down
14 changes: 14 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def from_proto(cls, proto):
fmt = proto.WhichOneof("format")
if fmt == "parquet_format":
return ParquetFormat()
elif fmt == "delta_format":
return DeltaFormat()
if fmt is None:
return None
raise NotImplementedError(f"FileFormat is unsupported: {fmt}")
Expand All @@ -66,6 +68,18 @@ def __str__(self):
return "parquet"


class DeltaFormat(FileFormat):
"""
Defines delta data format
"""

def to_proto(self):
return FileFormatProto(delta_format=FileFormatProto.DeltaFormat())

def __str__(self):
return "delta"


class StreamFormat(ABC):
"""
Defines an abtracts streaming data format used to encode feature data in streams
Expand Down
39 changes: 23 additions & 16 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typeguard import typechecked

from feast import type_map
from feast.data_format import FileFormat, ParquetFormat
from feast.data_format import DeltaFormat, FileFormat, ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
Expand Down Expand Up @@ -157,24 +157,31 @@ def get_table_column_names_and_types(
filesystem, path = FileSource.create_filesystem_and_path(
self.path, self.file_options.s3_endpoint_override
)
# Adding support for different file format path
# based on S3 filesystem
if filesystem is None:
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
pass
# TODO why None check necessary
if self.file_format is None or isinstance(self.file_format, ParquetFormat):
if filesystem is None:
kwargs = (
{"use_legacy_dataset": False}
if version.parse(pyarrow.__version__) < version.parse("15.0.0")
else {}
)

schema = ParquetDataset(path, **kwargs).schema
if hasattr(schema, "names") and hasattr(schema, "types"):
# Newer versions of pyarrow doesn't have this method,
# but this field is good enough.
pass
else:
schema = schema.to_arrow_schema()
else:
schema = schema.to_arrow_schema()
schema = ParquetDataset(path, filesystem=filesystem).schema
elif isinstance(self.file_format, DeltaFormat):
from deltalake import DeltaTable

schema = DeltaTable(self.path).schema().to_pyarrow()
else:
schema = ParquetDataset(path, filesystem=filesystem).schema
raise Exception(f"Unknown FileFormat -> {self.file_format}")

return zip(schema.names, map(str, schema.types))

Expand Down
74 changes: 51 additions & 23 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from ibis.expr.types import Table
from pytz import utc

from feast.data_format import DeltaFormat, ParquetFormat
from feast.data_source import DataSource
from feast.errors import SavedDatasetLocationAlreadyExists
from feast.feature_logging import LoggingConfig, LoggingSource
Expand Down Expand Up @@ -105,6 +106,15 @@ def _generate_row_id(

return entity_table

@staticmethod
def _read_data_source(data_source: DataSource) -> Table:
assert isinstance(data_source, FileSource)

if isinstance(data_source.file_format, ParquetFormat):
return ibis.read_parquet(data_source.path)
elif isinstance(data_source.file_format, DeltaFormat):
return ibis.read_delta(data_source.path)

@staticmethod
def get_historical_features(
config: RepoConfig,
Expand Down Expand Up @@ -137,7 +147,9 @@ def get_historical_features(
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)
fv_table: Table = IbisOfflineStore._read_data_source(
feature_view.batch_source
)

for old_name, new_name in feature_view.batch_source.field_mapping.items():
if old_name in fv_table.columns:
Expand Down Expand Up @@ -227,7 +239,7 @@ def pull_all_from_table_or_query(
start_date = start_date.astimezone(tz=utc)
end_date = end_date.astimezone(tz=utc)

table = ibis.read_parquet(data_source.path)
table = IbisOfflineStore._read_data_source(data_source)

table = table.select(*fields)

Expand Down Expand Up @@ -260,10 +272,9 @@ def write_logged_features(
destination = logging_config.destination
assert isinstance(destination, FileLoggingDestination)

if isinstance(data, Path):
table = ibis.read_parquet(data)
else:
table = ibis.memtable(data)
table = (
ibis.read_parquet(data) if isinstance(data, Path) else ibis.memtable(data)
)

if destination.partition_by:
kwargs = {"partition_by": destination.partition_by}
Expand Down Expand Up @@ -294,12 +305,21 @@ def offline_write_batch(
)

file_options = feature_view.batch_source.file_options
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
if table.schema != prev_table.schema:
table = table.cast(prev_table.schema)
new_table = pyarrow.concat_tables([table, prev_table])

ibis.memtable(new_table).to_parquet(file_options.uri)
if isinstance(feature_view.batch_source.file_format, ParquetFormat):
prev_table = ibis.read_parquet(file_options.uri).to_pyarrow()
if table.schema != prev_table.schema:
table = table.cast(prev_table.schema)
new_table = pyarrow.concat_tables([table, prev_table])

ibis.memtable(new_table).to_parquet(file_options.uri)
elif isinstance(feature_view.batch_source.file_format, DeltaFormat):
from deltalake import DeltaTable

prev_schema = DeltaTable(file_options.uri).schema().to_pyarrow()
if table.schema != prev_schema:
table = table.cast(prev_schema)
ibis.memtable(table).to_delta(file_options.uri, mode="append")


class IbisRetrievalJob(RetrievalJob):
Expand Down Expand Up @@ -338,20 +358,28 @@ def persist(
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
storage.file_options.s3_endpoint_override,
)

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
if isinstance(storage.file_options.file_format, ParquetFormat):
filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.uri,
storage.file_options.s3_endpoint_override,
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem

if path.endswith(".parquet"):
pyarrow.parquet.write_table(
self.to_arrow(), where=path, filesystem=filesystem
)
else:
# otherwise assume destination is directory
pyarrow.parquet.write_to_dataset(
self.to_arrow(), root_path=path, filesystem=filesystem
)
elif isinstance(storage.file_options.file_format, DeltaFormat):
mode = (
"overwrite"
if allow_overwrite and os.path.exists(storage.file_options.uri)
else "error"
)
self.table.to_delta(storage.file_options.uri, mode=mode)

@property
def metadata(self) -> Optional[RetrievalMetadata]:
Expand Down
47 changes: 26 additions & 21 deletions sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ bidict==0.23.1
# via ibis-framework
bleach==6.1.0
# via nbconvert
boto3==1.34.85
boto3==1.34.88
# via
# feast (setup.py)
# moto
botocore==1.34.85
botocore==1.34.88
# via
# boto3
# moto
Expand Down Expand Up @@ -134,11 +134,11 @@ cryptography==42.0.5
# snowflake-connector-python
# types-pyopenssl
# types-redis
dask[array,dataframe]==2024.4.1
dask[array,dataframe]==2024.4.2
# via
# dask-expr
# feast (setup.py)
dask-expr==1.0.11
dask-expr==1.0.12
# via dask
db-dtypes==1.2.0
# via google-cloud-bigquery
Expand All @@ -148,6 +148,8 @@ decorator==5.1.1
# via ipython
defusedxml==0.7.1
# via nbconvert
deltalake==0.16.4
# via feast (setup.py)
dill==0.3.8
# via feast (setup.py)
distlib==0.3.8
Expand All @@ -158,15 +160,15 @@ docker==7.0.0
# testcontainers
docutils==0.19
# via sphinx
duckdb==0.10.1
duckdb==0.10.2
# via
# duckdb-engine
# ibis-framework
duckdb-engine==0.11.5
# via ibis-framework
entrypoints==0.4
# via altair
exceptiongroup==1.2.0
exceptiongroup==1.2.1
# via
# anyio
# ipython
Expand All @@ -175,7 +177,7 @@ execnet==2.1.1
# via pytest-xdist
executing==2.0.1
# via stack-data
fastapi==0.110.1
fastapi==0.110.2
# via feast (setup.py)
fastjsonschema==2.19.1
# via nbformat
Expand Down Expand Up @@ -263,7 +265,7 @@ greenlet==3.0.3
# via sqlalchemy
grpc-google-iam-v1==0.13.0
# via google-cloud-bigtable
grpcio==1.62.1
grpcio==1.62.2
# via
# feast (setup.py)
# google-api-core
Expand All @@ -275,15 +277,15 @@ grpcio==1.62.1
# grpcio-status
# grpcio-testing
# grpcio-tools
grpcio-health-checking==1.62.1
grpcio-health-checking==1.62.2
# via feast (setup.py)
grpcio-reflection==1.62.1
grpcio-reflection==1.62.2
# via feast (setup.py)
grpcio-status==1.62.1
grpcio-status==1.62.2
# via google-api-core
grpcio-testing==1.62.1
grpcio-testing==1.62.2
# via feast (setup.py)
grpcio-tools==1.62.1
grpcio-tools==1.62.2
# via feast (setup.py)
gunicorn==22.0.0 ; platform_system != "Windows"
# via feast (setup.py)
Expand Down Expand Up @@ -482,13 +484,13 @@ nest-asyncio==1.6.0
# via ipykernel
nodeenv==1.8.0
# via pre-commit
notebook==7.1.2
notebook==7.1.3
# via great-expectations
notebook-shim==0.2.4
# via
# jupyterlab
# notebook
numpy==1.24.4
numpy==1.26.4
# via
# altair
# dask
Expand Down Expand Up @@ -615,12 +617,15 @@ pyarrow==15.0.2
# via
# dask-expr
# db-dtypes
# deltalake
# feast (setup.py)
# google-cloud-bigquery
# ibis-framework
# snowflake-connector-python
pyarrow-hotfix==0.6
# via ibis-framework
# via
# deltalake
# ibis-framework
pyasn1==0.6.0
# via
# pyasn1-modules
Expand Down Expand Up @@ -692,7 +697,7 @@ pytest-ordering==0.6
# via feast (setup.py)
pytest-timeout==1.4.2
# via feast (setup.py)
pytest-xdist==3.5.0
pytest-xdist==3.6.0
# via feast (setup.py)
python-dateutil==2.9.0.post0
# via
Expand Down Expand Up @@ -728,7 +733,7 @@ pyyaml==6.0.1
# pre-commit
# responses
# uvicorn
pyzmq==26.0.0
pyzmq==26.0.2
# via
# ipykernel
# jupyter-client
Expand Down Expand Up @@ -785,7 +790,7 @@ rsa==4.9
# via google-auth
ruamel-yaml==0.17.17
# via great-expectations
ruff==0.3.7
ruff==0.4.1
# via feast (setup.py)
s3transfer==0.10.1
# via boto3
Expand All @@ -812,7 +817,7 @@ sniffio==1.3.1
# httpx
snowballstemmer==2.2.0
# via sphinx
snowflake-connector-python[pandas]==3.8.1
snowflake-connector-python[pandas]==3.9.0
# via feast (setup.py)
sortedcontainers==2.4.0
# via snowflake-connector-python
Expand Down Expand Up @@ -895,7 +900,7 @@ tqdm==4.66.2
# via
# feast (setup.py)
# great-expectations
traitlets==5.14.2
traitlets==5.14.3
# via
# comm
# ipykernel
Expand Down
Loading

0 comments on commit 2b6f1d0

Please sign in to comment.