Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Validating logged features via Python SDK #2640

Merged
merged 4 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions sdk/python/feast/dqm/profilers/ge_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from feast.protos.feast.core.ValidationProfile_pb2 import (
GEValidationProfiler as GEValidationProfilerProto,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus


def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
Expand All @@ -41,6 +42,23 @@ def _prepare_dataset(dataset: PandasDataset) -> PandasDataset:
return dataset_copy


def _add_feature_metadata(dataset: PandasDataset) -> PandasDataset:
for column in dataset.columns:
if "__" not in column:
# not a feature column
continue

if "event_timestamp" in dataset.columns:
dataset[f"{column}__timestamp"] = dataset["event_timestamp"]

dataset[f"{column}__status"] = FieldStatus.PRESENT
dataset[f"{column}__status"] = dataset[f"{column}__status"].mask(
dataset[column].isna(), FieldStatus.NOT_FOUND
)

return dataset


class GEProfile(Profile):
"""
GEProfile is an implementation of abstract Profile for integration with Great Expectations.
Expand Down Expand Up @@ -96,9 +114,12 @@ class GEProfiler(Profiler):
"""

def __init__(
self, user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite]
self,
user_defined_profiler: Callable[[pd.DataFrame], ExpectationSuite],
with_feature_metadata: bool = False,
):
self.user_defined_profiler = user_defined_profiler
self.with_feature_metadata = with_feature_metadata

def analyze_dataset(self, df: pd.DataFrame) -> Profile:
"""
Expand All @@ -113,6 +134,9 @@ def analyze_dataset(self, df: pd.DataFrame) -> Profile:

dataset = _prepare_dataset(dataset)

if self.with_feature_metadata:
dataset = _add_feature_metadata(dataset)

return GEProfile(expectation_suite=self.user_defined_profiler(dataset))

def to_proto(self):
Expand Down Expand Up @@ -158,5 +182,13 @@ def __repr__(self):
return json.dumps(failed_expectations, indent=2)


def ge_profiler(func):
return GEProfiler(user_defined_profiler=func)
def ge_profiler(*args, with_feature_metadata=False):
def wrapper(fun):
return GEProfiler(
user_defined_profiler=fun, with_feature_metadata=with_feature_metadata
)

if args:
return wrapper(args[0])

return wrapper
5 changes: 5 additions & 0 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
FeatureViewNotFoundException,
OnDemandFeatureViewNotFoundException,
)
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.protos.feast.core.FeatureService_pb2 import (
LoggingConfig as LoggingConfigProto,
)
Expand Down Expand Up @@ -77,7 +78,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema:

else:
for entity_name in feature_view.entities:
if entity_name == DUMMY_ENTITY_NAME:
continue

entity = registry.get_entity(entity_name, self._project)

join_key = projection.join_key_map.get(
entity.join_key, entity.join_key
)
Expand Down
55 changes: 54 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from feast.data_source import DataSource
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.dqm.errors import ValidationFailed
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand Down Expand Up @@ -82,7 +83,7 @@
from feast.repo_config import RepoConfig, load_repo_config
from feast.repo_contents import RepoContents
from feast.request_feature_view import RequestFeatureView
from feast.saved_dataset import SavedDataset, SavedDatasetStorage
from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference
from feast.type_map import (
feast_value_type_to_python_type,
python_values_to_proto_values,
Expand Down Expand Up @@ -2044,6 +2045,58 @@ def write_logged_features(
registry=self._registry,
)

def validate_logged_features(
self,
source: Union[FeatureService],
start: datetime,
end: datetime,
reference: ValidationReference,
throw_exception: bool = True,
) -> Optional[ValidationFailed]:
"""
Load logged features from an offline store and validate them against provided validation reference.

Args:
source: Logs source object (currently only feature services are supported)
start: lower bound for loading logged features
end: upper bound for loading logged features
reference: validation reference
throw_exception: throw exception or return it as a result

Returns:
Throw or return (depends on parameter) ValidationFailed exception if validation was not successful
or None if successful.

"""
warnings.warn(
"Logged features validation is an experimental feature. "
"This API is unstable and it could and most probably will be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)

if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")

j = self._get_provider().retrieve_feature_service_logs(
feature_service=source,
start_date=start,
end_date=end,
config=self.config,
registry=self.registry,
)

# read and run validation
try:
j.to_arrow(validation_reference=reference)
except ValidationFailed as exc:
if throw_exception:
raise

return exc

return None


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ def __copy__(self):
online=self.online,
)
fv.projection = copy.copy(self.projection)
fv.entities = self.entities
return fv

def __eq__(self, other):
Expand Down
16 changes: 12 additions & 4 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:
def _to_df_internal(self) -> pd.DataFrame:
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
df = self.evaluation_function().compute()
df = df.reset_index(drop=True)
return df

@log_exceptions_and_usage
Expand Down Expand Up @@ -556,11 +557,18 @@ def _filter_ttl(
# Filter rows by defined timestamp tolerance
if feature_view.ttl and feature_view.ttl.total_seconds() != 0:
df_to_join = df_to_join[
(
df_to_join[timestamp_field]
>= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl
# do not drop entity rows if one of the sources returns NaNs
df_to_join[timestamp_field].isna()
| (
(
df_to_join[timestamp_field]
>= df_to_join[entity_df_event_timestamp_col] - feature_view.ttl
)
& (
df_to_join[timestamp_field]
<= df_to_join[entity_df_event_timestamp_col]
)
)
& (df_to_join[timestamp_field] <= df_to_join[entity_df_event_timestamp_col])
]

df_to_join = df_to_join.persist()
Expand Down
130 changes: 130 additions & 0 deletions sdk/python/tests/integration/e2e/test_validation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
import datetime

import pandas as pd
import pyarrow as pa
import pytest
from great_expectations.core import ExpectationSuite
from great_expectations.dataset import PandasDataset

from feast import FeatureService
from feast.dqm.errors import ValidationFailed
from feast.dqm.profilers.ge_profiler import ge_profiler
from feast.feature_logging import (
LOG_TIMESTAMP_FIELD,
FeatureServiceLoggingSource,
LoggingConfig,
)
from feast.protos.feast.serving.ServingService_pb2 import FieldStatus
from feast.wait import wait_retry_backoff
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
)
Expand All @@ -13,6 +24,7 @@
driver,
location,
)
from tests.utils.logged_features import prepare_logs

_features = [
"customer_profile:current_balance",
Expand All @@ -32,6 +44,39 @@ def configurable_profiler(dataset: PandasDataset) -> ExpectationSuite:

return UserConfigurableProfiler(
profile_dataset=dataset,
ignored_columns=["event_timestamp"],
excluded_expectations=[
"expect_table_columns_to_match_ordered_list",
"expect_table_row_count_to_be_between",
],
value_set_threshold="few",
).build_suite()


@ge_profiler(with_feature_metadata=True)
def profiler_with_feature_metadata(dataset: PandasDataset) -> ExpectationSuite:
from great_expectations.profile.user_configurable_profiler import (
UserConfigurableProfiler,
)

# always present
dataset.expect_column_values_to_be_in_set(
"global_stats__avg_ride_length__status", {FieldStatus.PRESENT}
)

# present at least in 70% of rows
dataset.expect_column_values_to_be_in_set(
"customer_profile__current_balance__status", {FieldStatus.PRESENT}, mostly=0.7
)

return UserConfigurableProfiler(
profile_dataset=dataset,
ignored_columns=["event_timestamp"]
+ [
c
for c in dataset.columns
if c.endswith("__timestamp") or c.endswith("__status")
],
excluded_expectations=[
"expect_table_columns_to_match_ordered_list",
"expect_table_row_count_to_be_between",
Expand Down Expand Up @@ -127,3 +172,88 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so

assert failed_expectations[1].check_name == "expect_column_values_to_be_in_set"
assert failed_expectations[1].column_name == "avg_passenger_count"


@pytest.mark.integration
def test_logged_features_validation(environment, universal_data_sources):
store = environment.feature_store

(_, datasets, data_sources) = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)
feature_service = FeatureService(
name="test_service",
features=[
feature_views.customer[
["current_balance", "avg_passenger_count", "lifetime_trip_count"]
],
feature_views.order[["order_is_success"]],
feature_views.global_fv[["num_rides", "avg_ride_length"]],
],
logging_config=LoggingConfig(
destination=environment.data_source_creator.create_logged_features_destination()
),
)

store.apply(
[driver(), customer(), location(), feature_service, *feature_views.values()]
)

entity_df = datasets.entity_df.drop(
columns=["order_id", "origin_id", "destination_id"]
)

# add some non-existing entities to check NotFound feature handling
for i in range(5):
entity_df = entity_df.append(
{
"customer_id": 2000 + i,
"driver_id": 6000 + i,
"event_timestamp": datetime.datetime.now(),
},
ignore_index=True,
)

reference_dataset = store.create_saved_dataset(
from_=store.get_historical_features(
entity_df=entity_df, features=feature_service, full_feature_names=True
),
name="reference_for_validating_logged_features",
storage=environment.data_source_creator.create_saved_dataset_destination(),
)

log_source_df = store.get_historical_features(
entity_df=entity_df, features=feature_service, full_feature_names=False
).to_df()
logs_df = prepare_logs(log_source_df, feature_service, store)

schema = FeatureServiceLoggingSource(
feature_service=feature_service, project=store.project
).get_schema(store._registry)
store.write_logged_features(
pa.Table.from_pandas(logs_df, schema=schema), source=feature_service
)

def validate():
"""
Return Tuple[succeed, completed]
Succeed will be True if no ValidateFailed exception was raised
"""
try:
store.validate_logged_features(
feature_service,
start=logs_df[LOG_TIMESTAMP_FIELD].min(),
end=logs_df[LOG_TIMESTAMP_FIELD].max() + datetime.timedelta(seconds=1),
reference=reference_dataset.as_reference(
profiler=profiler_with_feature_metadata
),
)
except ValidationFailed:
return False, True
except Exception:
# log table is still being created
return False, False

return True, True

success = wait_retry_backoff(validate, timeout_secs=30)
assert success, "Validation failed (unexpectedly)"
Loading