Skip to content

Commit

Permalink
Add offline retrival integration tests using the universal repo (#1769)
Browse files Browse the repository at this point in the history
* Add offline retrival integration tests using the universal repo

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* Refactor refactor refactor

Signed-off-by: Achal Shah <achals@gmail.com>

* comment new test and format

Signed-off-by: Achal Shah <achals@gmail.com>

* Skip new test

Signed-off-by: Achal Shah <achals@gmail.com>

* format

Signed-off-by: Achal Shah <achals@gmail.com>

* fix local provider tests

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* remove more unused imports

Signed-off-by: Achal Shah <achals@gmail.com>

* CR comments

Signed-off-by: Achal Shah <achals@gmail.com>

* CR comments

Signed-off-by: Achal Shah <achals@gmail.com>

* Get all the tests working

Signed-off-by: Achal Shah <achals@gmail.com>

* get query version to work

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* fix e2e tests

Signed-off-by: Achal Shah <achals@gmail.com>

* fix lint

Signed-off-by: Achal Shah <achals@gmail.com>

* drop table if exists

Signed-off-by: Achal Shah <achals@gmail.com>

* remove commented out tests

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Aug 16, 2021
1 parent 7c5b42c commit 351b913
Show file tree
Hide file tree
Showing 11 changed files with 616 additions and 83 deletions.
23 changes: 18 additions & 5 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,24 @@
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import parametrize_e2e_test
from tests.integration.feature_repos.test_repo_configuration import (
Environment,
parametrize_e2e_test,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view


@parametrize_e2e_test
def test_e2e_consistency(fs: FeatureStore):
run_offline_online_store_consistency_test(fs)
def test_e2e_consistency(test_environment: Environment):
fs, fv = (
test_environment.feature_store,
driver_feature_view(test_environment.data_source),
)
entity = driver()
fs.apply([fv, entity])

run_offline_online_store_consistency_test(fs, fv)


def check_offline_and_online_features(
Expand Down Expand Up @@ -63,10 +75,11 @@ def check_offline_and_online_features(
assert math.isnan(df.to_dict()["value"][0])


def run_offline_online_store_consistency_test(fs: FeatureStore,) -> None:
def run_offline_online_store_consistency_test(
fs: FeatureStore, fv: FeatureView
) -> None:
now = datetime.utcnow()

fv = fs.get_feature_view("test_correctness")
full_feature_names = True
check_offline_store: bool = True

Expand Down
227 changes: 204 additions & 23 deletions sdk/python/tests/integration/feature_repos/test_repo_configuration.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import tempfile
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, replace
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union

import pytest
from attr import dataclass

from feast import FeatureStore, RepoConfig, importer
from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data, importer
from feast.data_source import DataSource
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.entities import customer, driver
from tests.integration.feature_repos.universal.feature_views import (
correctness_feature_view,
create_customer_daily_profile_feature_view,
create_driver_hourly_stats_feature_view,
)


@dataclass
@dataclass(frozen=True, repr=True)
class TestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
Expand All @@ -30,20 +33,21 @@ class TestRepoConfig:
offline_store_creator: str = "tests.integration.feature_repos.universal.data_sources.file.FileDataSourceCreator"

full_feature_names: bool = True
infer_event_timestamp_col: bool = True


FULL_REPO_CONFIGS: List[TestRepoConfig] = [
TestRepoConfig(), # Local
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
TestRepoConfig(
provider="gcp",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.bigquery.BigQueryDataSourceCreator",
online_store="datastore",
),
TestRepoConfig(
provider="aws",
offline_store_creator="tests.integration.feature_repos.universal.data_sources.redshift.RedshiftDataSourceCreator",
online_store={"type": "dynamodb", "region": "us-west-2"},
),
]


Expand All @@ -52,8 +56,128 @@ class TestRepoConfig:
PROVIDERS: List[str] = []


@dataclass
class Environment:
name: str
test_repo_config: TestRepoConfig
feature_store: FeatureStore
data_source: DataSource
data_source_creator: DataSourceCreator

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=7)
before_start_date = end_date - timedelta(days=365)
after_end_date = end_date + timedelta(days=365)

customer_entities = list(range(1001, 1110))
customer_df = driver_test_data.create_customer_daily_profile_df(
customer_entities, start_date, end_date
)
_customer_feature_view: Optional[FeatureView] = None

driver_entities = list(range(5001, 5110))
driver_df = driver_test_data.create_driver_hourly_stats_df(
driver_entities, start_date, end_date
)
_driver_stats_feature_view: Optional[FeatureView] = None

orders_df = driver_test_data.create_orders_df(
customers=customer_entities,
drivers=driver_entities,
start_date=before_start_date,
end_date=after_end_date,
order_count=1000,
)
_orders_table: Optional[str] = None

def customer_feature_view(self) -> FeatureView:
if self._customer_feature_view is None:
customer_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "customer_profile"
)
ds = self.data_source_creator.create_data_sources(
customer_table_id,
self.customer_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
self._customer_feature_view = create_customer_daily_profile_feature_view(ds)
return self._customer_feature_view

def driver_stats_feature_view(self) -> FeatureView:
if self._driver_stats_feature_view is None:
driver_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "driver_hourly"
)
ds = self.data_source_creator.create_data_sources(
driver_table_id,
self.driver_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
self._driver_stats_feature_view = create_driver_hourly_stats_feature_view(
ds
)
return self._driver_stats_feature_view

def orders_table(self) -> Optional[str]:
if self._orders_table is None:
orders_table_id = self.data_source_creator.get_prefixed_table_name(
self.name, "orders"
)
ds = self.data_source_creator.create_data_sources(
orders_table_id,
self.orders_df,
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
if hasattr(ds, "table_ref"):
self._orders_table = ds.table_ref
elif hasattr(ds, "table"):
self._orders_table = ds.table
return self._orders_table


def vary_full_feature_names(configs: List[TestRepoConfig]) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, full_feature_names=True)
false_c = replace(c, full_feature_names=False)
new_configs.extend([true_c, false_c])
return new_configs


def vary_infer_event_timestamp_col(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
true_c = replace(c, infer_event_timestamp_col=True)
false_c = replace(c, infer_event_timestamp_col=False)
new_configs.extend([true_c, false_c])
return new_configs


def vary_providers_for_offline_stores(
configs: List[TestRepoConfig],
) -> List[TestRepoConfig]:
new_configs = []
for c in configs:
if "FileDataSourceCreator" in c.offline_store_creator:
new_configs.append(c)
elif "RedshiftDataSourceCreator" in c.offline_store_creator:
for p in ["local", "aws"]:
new_configs.append(replace(c, provider=p))
elif "BigQueryDataSourceCreator" in c.offline_store_creator:
for p in ["local", "gcp"]:
new_configs.append(replace(c, provider=p))
return new_configs


@contextmanager
def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
def construct_test_environment(
test_repo_config: TestRepoConfig, create_and_apply: bool = False
) -> Environment:
"""
This method should take in the parameters from the test repo config and created a feature repo, apply it,
and return the constructed feature store object to callers.
Expand All @@ -74,8 +198,10 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:

offline_creator: DataSourceCreator = importer.get_class_from_type(
module_name, config_class_name, "DataSourceCreator"
)()
ds = offline_creator.create_data_source(project, df)
)(project)
ds = offline_creator.create_data_sources(
project, df, field_mapping={"ts_1": "ts", "id": "driver_id"}
)
offline_store = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

Expand All @@ -89,21 +215,76 @@ def construct_feature_store(test_repo_config: TestRepoConfig) -> FeatureStore:
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
fv = correctness_feature_view(ds)
entity = driver()
fs.apply([fv, entity])
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source=ds,
data_source_creator=offline_creator,
)

yield fs
fvs = []
entities = []
try:
if create_and_apply:
entities.extend([driver(), customer()])
fvs.extend(
[
environment.driver_stats_feature_view(),
environment.customer_feature_view(),
]
)
fs.apply(fvs + entities)

fs.teardown()
offline_creator.teardown(project)
yield environment
finally:
offline_creator.teardown()
fs.teardown()


def parametrize_e2e_test(e2e_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.
The decorator also ensures that sample data needed for the test is available in the relevant offline store.
Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).
The decorator takes care of tearing down the feature store, as well as the sample data.
"""

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: str(v))
def inner_test(config):
with construct_test_environment(config) as environment:
e2e_test(environment)

return inner_test


def parametrize_offline_retrieval_test(offline_retrieval_test):
"""
This decorator should be used for end-to-end tests. These tests are expected to be parameterized,
and receive an empty feature repo created for all supported configurations.
The decorator also ensures that sample data needed for the test is available in the relevant offline store.
Decorated tests should create and apply the objects needed by the tests, and perform any operations needed
(such as materialization and looking up feature values).
The decorator takes care of tearing down the feature store, as well as the sample data.
"""

configs = vary_providers_for_offline_stores(FULL_REPO_CONFIGS)
configs = vary_full_feature_names(configs)
configs = vary_infer_event_timestamp_col(configs)

@pytest.mark.integration
@pytest.mark.parametrize("config", FULL_REPO_CONFIGS, ids=lambda v: v.provider)
@pytest.mark.parametrize("config", configs, ids=lambda v: str(v))
def inner_test(config):
with construct_feature_store(config) as fs:
e2e_test(fs)
with construct_test_environment(config, create_and_apply=True) as environment:
offline_retrieval_test(environment)

return inner_test
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict

import pandas as pd

Expand All @@ -8,12 +9,13 @@

class DataSourceCreator(ABC):
@abstractmethod
def create_data_source(
def create_data_sources(
self,
name: str,
destination: str,
df: pd.DataFrame,
event_timestamp_column="ts",
created_timestamp_column="created_ts",
field_mapping: Dict[str, str] = None,
) -> DataSource:
...

Expand All @@ -22,5 +24,9 @@ def create_offline_store_config(self) -> FeastConfigBaseModel:
...

@abstractmethod
def teardown(self, name: str):
def teardown(self):
...

@abstractmethod
def get_prefixed_table_name(self, name: str, suffix: str) -> str:
...
Loading

0 comments on commit 351b913

Please sign in to comment.