Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1247] Raise DataValidationException on validation failure of a dataframe if the validation ingestion policy is set to strict #1226

Merged
merged 11 commits into from
Feb 27, 2024
7 changes: 7 additions & 0 deletions python/hsfs/client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class FeatureStoreException(Exception):
"""Generic feature store exception"""


class DataValidationException(FeatureStoreException):
"""Raised when data validation fails only when using "STRICT" validation ingestion policy."""

def __init__(self, message):
super().__init__(message)


class ExternalClientError(TypeError):
"""Raised when external client cannot be initialized due to missing arguments."""

Expand Down
12 changes: 10 additions & 2 deletions python/hsfs/core/external_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from hsfs import engine, util
from hsfs import feature_group as fg
from hsfs.client.exceptions import FeatureStoreException
from hsfs.client.exceptions import FeatureStoreException, DataValidationException
from hsfs.core import feature_group_base_engine


Expand Down Expand Up @@ -80,7 +80,15 @@ def insert(
)

if ge_report is not None and ge_report.ingestion_result == "REJECTED":
return None, ge_report
feature_group_url = util.get_feature_group_url(
feature_store_id=feature_group.feature_store_id,
feature_group_id=feature_group.id,
)
raise DataValidationException(
"Data validation failed while validation ingestion policy set to strict, "
+ f"insertion to {feature_group.name} was aborted.\n"
+ f"You can check a summary or download your report at {feature_group_url}"
)

return (
engine.get_instance().save_dataframe(
Expand Down
28 changes: 14 additions & 14 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
import warnings

from hsfs import engine, client, util
from hsfs import engine, util
from hsfs import feature_group as fg
from hsfs.client import exceptions
from hsfs.core import feature_group_base_engine, hudi_engine
Expand Down Expand Up @@ -107,7 +107,15 @@ def insert(
)

if ge_report is not None and ge_report.ingestion_result == "REJECTED":
return None, ge_report
feature_group_url = util.get_feature_group_url(
feature_store_id=feature_group.feature_store_id,
feature_group_id=feature_group.id,
)
raise exceptions.DataValidationException(
"Data validation failed while validation ingestion policy set to strict, "
+ f"insertion to {feature_group.name} was aborted.\n"
+ f"You can check a summary or download your report at {feature_group_url}."
)

offline_write_options = write_options
online_write_options = write_options
Expand Down Expand Up @@ -353,16 +361,8 @@ def save_feature_group_metadata(
self._feature_group_api.save(feature_group)
print(
"Feature Group created successfully, explore it at \n"
+ self._get_feature_group_url(feature_group)
)

def _get_feature_group_url(self, feature_group):
sub_path = (
"/p/"
+ str(client.get_instance()._project_id)
+ "/fs/"
+ str(feature_group.feature_store_id)
+ "/fg/"
+ str(feature_group.id)
+ util.get_feature_group_url(
feature_store_id=feature_group.feature_store_id,
feature_group_id=feature_group.id,
)
)
return util.get_hostname_replaced_url(sub_path)
69 changes: 69 additions & 0 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,12 @@ def insert(

# Returns
(`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled.

# Raises
`hsfs.client.exceptions.RestAPIError`. e.g fail to create feature group, dataframe schema does not match
existing feature group schema, etc.
`hsfs.client.exceptions.DataValidationException`. If data validation fails and the expectation
suite `validation_ingestion_policy` is set to `STRICT`. Data is NOT ingested.
"""
if storage and self.stream:
warnings.warn(
Expand Down Expand Up @@ -3392,6 +3398,69 @@ def insert(
save_code: Optional[bool] = True,
wait: bool = False,
) -> Tuple[Optional[Job], Optional[ValidationReport]]:
"""Insert the dataframe feature values ONLY in the online feature store.

External Feature Groups contains metadata about feature data in an external storage system.
External storage system are usually offline, meaning feature values cannot be retrieved in real-time.
In order to use the feature values for real-time use-cases, you can insert them
in Hopsoworks Online Feature Store via this method.

The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with
for a given primary key will overwrite the existing value. No record of the previous value is kept.

!!! example
```python
# connect to the Feature Store
fs = ...

# get the External Feature Group instance
fg = fs.get_feature_group(name="external_sales_records", version=1)

# get the feature values, e.g reading from csv files in a S3 bucket
feature_values = ...

# insert the feature values in the online feature store
fg.insert(feature_values)
```

!!! Note
Data Validation via Great Expectation is supported if you have attached an expectation suite to
your External Feature Group. However, as opposed to regular Feature Groups, this can lead to
discrepancies between the data in the external storage system and the online feature store.

# Arguments
features: DataFrame, RDD, Ndarray, list. Features to be saved.
write_options: Additional write options as key-value pairs, defaults to `{}`.
When using the `python` engine, write_options can contain the
following entries:
* key `kafka_producer_config` and value an object of type [properties](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.htmln)
used to configure the Kafka client. To optimize for throughput in high latency connection consider
changing [producer properties](https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html#producer).
* key `internal_kafka` and value `True` or `False` in case you established
connectivity from you Python environment to the internal advertised
listeners of the Hopsworks Kafka Cluster. Defaults to `False` and
will use external listeners when connecting from outside of Hopsworks.
validation_options: Additional validation options as key-value pairs, defaults to `{}`.
* key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion.
* key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks.
* key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations.
* key `fetch_expectation_suite` a boolean value, by default `True`, to control whether the expectation
suite of the feature group should be fetched before every insert.
save_code: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create
the feature group or used to insert data to it. When calling the `insert` method repeatedly
with small batches of data, this can slow down the writes. Use this option to turn off saving
code. Defaults to `True`.

# Returns
Tuple(`Job`, `ValidationReport`) The validation report if validation is enabled.

# Raises
`hsfs.client.exceptions.RestAPIError`. e.g fail to create feature group, dataframe schema does not match
existing feature group schema, etc.
`hsfs.client.exceptions.DataValidationException`. If data validation fails and the expectation
suite `validation_ingestion_policy` is set to `STRICT`. Data is NOT ingested.

"""
feature_dataframe = engine.get_instance().convert_to_default_dataframe(features)

if write_options is None:
Expand Down
12 changes: 12 additions & 0 deletions python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,18 @@ def run_with_loading_animation(message, func, *args, **kwargs):
print(f"\rFinished: {message} ({(end-start):.2f}s) ", end="\n")


def get_feature_group_url(feature_store_id: int, feature_group_id: int):
sub_path = (
"/p/"
+ str(client.get_instance()._project_id)
+ "/fs/"
+ str(feature_store_id)
+ "/fg/"
+ str(feature_group_id)
)
return get_hostname_replaced_url(sub_path)


class VersionWarning(Warning):
pass

Expand Down
65 changes: 18 additions & 47 deletions python/tests/core/test_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ def test_insert_ge_report(self, mocker):
"hsfs.core.great_expectation_engine.GreatExpectationEngine"
)
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
mocker.patch(
"hsfs.util.get_feature_group_url",
return_value="url",
)

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
Expand All @@ -225,14 +229,15 @@ def test_insert_ge_report(self, mocker):
mock_ge_engine.return_value.validate.return_value = vr

# Act
fg_engine.insert(
feature_group=fg,
feature_dataframe=None,
overwrite=None,
operation=None,
storage=None,
write_options=None,
)
with pytest.raises(exceptions.DataValidationException):
fg_engine.insert(
feature_group=fg,
feature_dataframe=None,
overwrite=None,
operation=None,
storage=None,
write_options=None,
)

# Assert
assert mock_fg_api.return_value.delete_content.call_count == 0
Expand Down Expand Up @@ -1056,7 +1061,7 @@ def test_save_feature_group_metadata(self, mocker):
)
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url",
"hsfs.util.get_feature_group_url",
return_value=feature_group_url,
)
mock_print = mocker.patch("builtins.print")
Expand Down Expand Up @@ -1104,7 +1109,7 @@ def test_save_feature_group_metadata_features(self, mocker):
)
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url",
"hsfs.util.get_feature_group_url",
return_value=feature_group_url,
)
mock_print = mocker.patch("builtins.print")
Expand Down Expand Up @@ -1153,7 +1158,7 @@ def test_save_feature_group_metadata_primary_partition_precombine(self, mocker):
)
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url",
"hsfs.util.get_feature_group_url",
return_value=feature_group_url,
)
mock_print = mocker.patch("builtins.print")
Expand Down Expand Up @@ -1204,7 +1209,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error(
)

mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url",
"hsfs.util.get_feature_group_url",
return_value=feature_group_url,
)

Expand Down Expand Up @@ -1309,7 +1314,7 @@ def test_save_feature_group_metadata_write_options(self, mocker):
)
mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url",
"hsfs.util.get_feature_group_url",
return_value=feature_group_url,
)
mock_print = mocker.patch("builtins.print")
Expand Down Expand Up @@ -1346,37 +1351,3 @@ def test_save_feature_group_metadata_write_options(self, mocker):
] == "Feature Group created successfully, explore it at \n{}".format(
feature_group_url
)

def test_get_feature_group_url(self, mocker):
# Arrange
feature_store_id = 99

mocker.patch("hsfs.engine.get_type")
mock_client_get_instance = mocker.patch("hsfs.client.get_instance")
mock_util_get_hostname_replaced_url = mocker.patch(
"hsfs.util.get_hostname_replaced_url"
)

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
)

fg = feature_group.FeatureGroup(
name="test",
version=1,
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
id=10,
)

mock_client_get_instance.return_value._project_id = 50

# Act
fg_engine._get_feature_group_url(feature_group=fg)

# Assert
assert mock_util_get_hostname_replaced_url.call_count == 1
assert (
mock_util_get_hostname_replaced_url.call_args[0][0] == "/p/50/fs/99/fg/10"
)
21 changes: 21 additions & 0 deletions python/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,24 @@ def test_get_job_url(self, mocker):
].path
== "p/5/jobs/named/7/executions"
)

def test_get_feature_group_url(self, mocker):
# Arrange
feature_store_id = 99
feature_group_id = 10
mock_client_get_instance = mocker.patch("hsfs.client.get_instance")
mock_util_get_hostname_replaced_url = mocker.patch(
"hsfs.util.get_hostname_replaced_url"
)
mock_client_get_instance.return_value._project_id = 50

# Act
util.get_feature_group_url(
feature_group_id=feature_group_id, feature_store_id=feature_store_id
)

# Assert
assert mock_util_get_hostname_replaced_url.call_count == 1
assert (
mock_util_get_hostname_replaced_url.call_args[0][0] == "/p/50/fs/99/fg/10"
)
Loading