Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Pull duckdb from contribs, add to CI #4059

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,6 @@ test-python-universal-athena:
not s3_registry and \
not test_snowflake" \
sdk/python/tests

test-python-universal-duckdb:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.duckdb_repo_configuration \
python -m pytest -n 8 --integration \
-k "not test_nullable_online_store and \
not gcs_registry and \
not s3_registry and \
not test_snowflake and \
not bigquery and \
not test_spark_materialization_consistency" \
sdk/python/tests

test-python-universal-postgres-offline:
PYTHONPATH='.' \
Expand Down
2 changes: 1 addition & 1 deletion docs/project/development-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ make lint-python
### Unit Tests
Unit tests (`pytest`) for the Feast Python SDK / CLI can run as follows:
```sh
make test-python
make test-python-unit
```

> :warning: Local configuration can interfere with Unit tests and cause them to fail:
Expand Down
2 changes: 1 addition & 1 deletion environment-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ make install-python-ci-dependencies PYTHON=3.9
4. start the docker daemon
5. run unit tests:
```bash
make test-python
make test-python-unit
```

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ibis
from pydantic import StrictStr

from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import IbisOfflineStore
from feast.infra.offline_stores.ibis import IbisOfflineStore
from feast.repo_config import FeastConfigBaseModel


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ def get_historical_features(
entity_table, feature_views, event_timestamp_col
)

def read_fv(feature_view, feature_refs, full_feature_names):
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
fv_table: Table = ibis.read_parquet(feature_view.batch_source.name)

for old_name, new_name in feature_view.batch_source.field_mapping.items():
Expand Down Expand Up @@ -175,6 +177,7 @@ def read_fv(feature_view, feature_refs, full_feature_names):
return (
fv_table,
feature_view.batch_source.timestamp_field,
feature_view.batch_source.created_timestamp_column,
feature_view.projection.join_key_map
or {e.name: e.name for e in feature_view.entity_columns},
feature_refs,
Expand Down Expand Up @@ -351,12 +354,19 @@ def metadata(self) -> Optional[RetrievalMetadata]:

def point_in_time_join(
entity_table: Table,
feature_tables: List[Tuple[Table, str, Dict[str, str], List[str], timedelta]],
feature_tables: List[Tuple[Table, str, str, Dict[str, str], List[str], timedelta]],
event_timestamp_col="event_timestamp",
):
# TODO handle ttl
all_entities = [event_timestamp_col]
for feature_table, timestamp_field, join_key_map, _, _ in feature_tables:
for (
feature_table,
timestamp_field,
created_timestamp_field,
join_key_map,
_,
_,
) in feature_tables:
all_entities.extend(join_key_map.values())

r = ibis.literal("")
Expand All @@ -371,6 +381,7 @@ def point_in_time_join(
for (
feature_table,
timestamp_field,
created_timestamp_field,
join_key_map,
feature_refs,
ttl,
Expand All @@ -395,9 +406,13 @@ def point_in_time_join(

feature_table = feature_table.drop(s.endswith("_y"))

order_by_fields = [ibis.desc(feature_table[timestamp_field])]
if created_timestamp_field:
order_by_fields.append(ibis.desc(feature_table[created_timestamp_field]))

feature_table = (
feature_table.group_by(by="entity_row_id")
.order_by(ibis.desc(feature_table[timestamp_field]))
.order_by(order_by_fields)
.mutate(rn=ibis.row_number())
)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"postgres": "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.PostgreSQLOfflineStore",
"athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
"mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore",
"duckdb": "feast.infra.offline_stores.contrib.duckdb_offline_store.duckdb.DuckDBOfflineStore",
"duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore",
}

FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
DuckDBDataSourceCreator,
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
Expand Down Expand Up @@ -108,6 +109,7 @@

AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [
("local", FileDataSourceCreator),
("local", DuckDBDataSourceCreator),
]

AVAILABLE_ONLINE_STORES: Dict[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from feast.data_format import ParquetFormat
from feast.data_source import DataSource
from feast.feature_logging import LoggingDestination
from feast.infra.offline_stores.duckdb import DuckDBOfflineStoreConfig
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.offline_stores.file_source import (
FileLoggingDestination,
Expand Down Expand Up @@ -214,3 +215,10 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
def teardown(self):
self.minio.stop()
self.f.close()


# TODO split up DataSourceCreator and OfflineStoreCreator
class DuckDBDataSourceCreator(FileDataSourceCreator):
def create_offline_store_config(self):
self.duckdb_offline_store_config = DuckDBOfflineStoreConfig()
return self.duckdb_offline_store_config
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ def test_historical_features_with_no_ttl(

@pytest.mark.integration
@pytest.mark.universal_offline_stores
def test_historical_features_from_bigquery_sources_containing_backfills(environment):
def test_historical_features_containing_backfills(environment):
store = environment.feature_store

now = datetime.now().replace(microsecond=0, second=0, minute=0)
Expand Down
58 changes: 46 additions & 12 deletions sdk/python/tests/unit/infra/offline_stores/test_ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@

import ibis
import pyarrow as pa
import pyarrow.compute as pc

from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import (
point_in_time_join,
)
from feast.infra.offline_stores.ibis import point_in_time_join


def pa_datetime(year, month, day):
Expand All @@ -16,12 +15,13 @@ def pa_datetime(year, month, day):
def customer_table():
return pa.Table.from_arrays(
arrays=[
pa.array([1, 1, 2]),
pa.array([1, 1, 2, 3]),
pa.array(
[
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 3),
]
),
],
Expand All @@ -32,24 +32,38 @@ def customer_table():
def features_table_1():
return pa.Table.from_arrays(
arrays=[
pa.array([1, 1, 1, 2]),
pa.array([1, 1, 1, 2, 3, 3]),
pa.array(
[
pa_datetime(2023, 12, 31),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 3),
pa_datetime(2023, 1, 3),
pa_datetime(2024, 1, 1),
pa_datetime(2024, 1, 1),
]
),
pa.array([11, 22, 33, 22]),
pa.array(
[
pa_datetime(2023, 12, 31),
pa_datetime(2024, 1, 2),
pa_datetime(2024, 1, 3),
pa_datetime(2023, 1, 3),
pa_datetime(2024, 1, 3),
pa_datetime(2024, 1, 2),
]
),
pa.array([11, 22, 33, 22, 10, 20]),
],
names=["customer_id", "event_timestamp", "feature1"],
names=["customer_id", "event_timestamp", "created", "feature1"],
)


def point_in_time_join_brute(
entity_table: pa.Table,
feature_tables: List[Tuple[pa.Table, str, Dict[str, str], List[str], timedelta]],
feature_tables: List[
Tuple[pa.Table, str, str, Dict[str, str], List[str], timedelta]
],
event_timestamp_col="event_timestamp",
):
ret_fields = [entity_table.schema.field(n) for n in entity_table.schema.names]
Expand All @@ -63,6 +77,7 @@ def point_in_time_join_brute(
for (
feature_table,
timestamp_key,
created_timestamp_key,
join_key_map,
feature_refs,
ttl,
Expand All @@ -72,7 +87,9 @@ def point_in_time_join_brute(
[
feature_table.schema.field(f)
for f in feature_table.schema.names
if f not in join_key_map.values() and f != timestamp_key
if f not in join_key_map.values()
and f != timestamp_key
and f != created_timestamp_key
]
)

Expand All @@ -82,9 +99,11 @@ def check_equality(ft_dict, batch_dict, x, y):
)

ft_dict = feature_table.to_pydict()

found_matches = [
(j, ft_dict[timestamp_key][j])
for j in range(entity_table.num_rows)
(j, (ft_dict[timestamp_key][j], ft_dict[created_timestamp_key][j]))
# (j, ft_dict[timestamp_key][j])
for j in range(feature_table.num_rows)
if check_equality(ft_dict, batch_dict, j, i)
and ft_dict[timestamp_key][j] <= row_timestmap
and ft_dict[timestamp_key][j] >= row_timestmap - ttl
Expand All @@ -93,6 +112,7 @@ def check_equality(ft_dict, batch_dict, x, y):
index_found = (
max(found_matches, key=itemgetter(1))[0] if found_matches else None
)

for col in ft_dict.keys():
if col not in feature_refs:
continue
Expand All @@ -108,13 +128,26 @@ def check_equality(ft_dict, batch_dict, x, y):
return pa.Table.from_pydict(ret, schema=pa.schema(ret_fields))


def tables_equal_ignore_order(actual: pa.Table, expected: pa.Table):
sort_keys = [(name, "ascending") for name in actual.column_names]
sort_indices = pc.sort_indices(actual, sort_keys)
actual = pc.take(actual, sort_indices)

sort_keys = [(name, "ascending") for name in expected.column_names]
sort_indices = pc.sort_indices(expected, sort_keys)
expected = pc.take(expected, sort_indices)

return actual.equals(expected)


def test_point_in_time_join():
expected = point_in_time_join_brute(
customer_table(),
feature_tables=[
(
features_table_1(),
"event_timestamp",
"created",
{"customer_id": "customer_id"},
["feature1"],
timedelta(days=10),
Expand All @@ -128,11 +161,12 @@ def test_point_in_time_join():
(
ibis.memtable(features_table_1()),
"event_timestamp",
"created",
{"customer_id": "customer_id"},
["feature1"],
timedelta(days=10),
)
],
).to_pyarrow()

assert actual.equals(expected)
assert tables_equal_ignore_order(actual, expected)
Loading