diff --git a/python/hsfs/client/exceptions.py b/python/hsfs/client/exceptions.py index ee8907f0a0..d5f2bbf168 100644 --- a/python/hsfs/client/exceptions.py +++ b/python/hsfs/client/exceptions.py @@ -62,6 +62,13 @@ class FeatureStoreException(Exception): """Generic feature store exception""" +class DataValidationException(FeatureStoreException): + """Raised when data validation fails only when using "STRICT" validation ingestion policy.""" + + def __init__(self, message): + super().__init__(message) + + class ExternalClientError(TypeError): """Raised when external client cannot be initialized due to missing arguments.""" diff --git a/python/hsfs/core/external_feature_group_engine.py b/python/hsfs/core/external_feature_group_engine.py index 94db21fc6a..d44effc863 100644 --- a/python/hsfs/core/external_feature_group_engine.py +++ b/python/hsfs/core/external_feature_group_engine.py @@ -15,7 +15,7 @@ from hsfs import engine, util from hsfs import feature_group as fg -from hsfs.client.exceptions import FeatureStoreException +from hsfs.client.exceptions import FeatureStoreException, DataValidationException from hsfs.core import feature_group_base_engine @@ -80,7 +80,15 @@ def insert( ) if ge_report is not None and ge_report.ingestion_result == "REJECTED": - return None, ge_report + feature_group_url = util.get_feature_group_url( + feature_store_id=feature_group.feature_store_id, + feature_group_id=feature_group.id, + ) + raise DataValidationException( + "Data validation failed while validation ingestion policy set to strict, " + + f"insertion to {feature_group.name} was aborted.\n" + + f"You can check a summary or download your report at {feature_group_url}" + ) return ( engine.get_instance().save_dataframe( diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 74eb84f272..abfa2078a2 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -14,7 +14,7 @@ # import warnings -from hsfs import engine, client, util +from hsfs import engine, util from hsfs import feature_group as fg from hsfs.client import exceptions from hsfs.core import feature_group_base_engine, hudi_engine @@ -107,7 +107,15 @@ def insert( ) if ge_report is not None and ge_report.ingestion_result == "REJECTED": - return None, ge_report + feature_group_url = util.get_feature_group_url( + feature_store_id=feature_group.feature_store_id, + feature_group_id=feature_group.id, + ) + raise exceptions.DataValidationException( + "Data validation failed while validation ingestion policy set to strict, " + + f"insertion to {feature_group.name} was aborted.\n" + + f"You can check a summary or download your report at {feature_group_url}." + ) offline_write_options = write_options online_write_options = write_options @@ -353,16 +361,8 @@ def save_feature_group_metadata( self._feature_group_api.save(feature_group) print( "Feature Group created successfully, explore it at \n" - + self._get_feature_group_url(feature_group) - ) - - def _get_feature_group_url(self, feature_group): - sub_path = ( - "/p/" - + str(client.get_instance()._project_id) - + "/fs/" - + str(feature_group.feature_store_id) - + "/fg/" - + str(feature_group.id) + + util.get_feature_group_url( + feature_store_id=feature_group.feature_store_id, + feature_group_id=feature_group.id, + ) ) - return util.get_hostname_replaced_url(sub_path) diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index c97c0d0094..54768b8ff8 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2496,6 +2496,12 @@ def insert( # Returns (`Job`, `ValidationReport`) A tuple with job information if python engine is used and the validation report if validation is enabled. + + # Raises + `hsfs.client.exceptions.RestAPIError`. e.g fail to create feature group, dataframe schema does not match + existing feature group schema, etc. + `hsfs.client.exceptions.DataValidationException`. If data validation fails and the expectation + suite `validation_ingestion_policy` is set to `STRICT`. Data is NOT ingested. """ if storage and self.stream: warnings.warn( @@ -3392,6 +3398,69 @@ def insert( save_code: Optional[bool] = True, wait: bool = False, ) -> Tuple[Optional[Job], Optional[ValidationReport]]: + """Insert the dataframe feature values ONLY in the online feature store. + + External Feature Groups contains metadata about feature data in an external storage system. + External storage system are usually offline, meaning feature values cannot be retrieved in real-time. + In order to use the feature values for real-time use-cases, you can insert them + in Hopsoworks Online Feature Store via this method. + + The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with + for a given primary key will overwrite the existing value. No record of the previous value is kept. + + !!! example + ```python + # connect to the Feature Store + fs = ... + + # get the External Feature Group instance + fg = fs.get_feature_group(name="external_sales_records", version=1) + + # get the feature values, e.g reading from csv files in a S3 bucket + feature_values = ... + + # insert the feature values in the online feature store + fg.insert(feature_values) + ``` + + !!! Note + Data Validation via Great Expectation is supported if you have attached an expectation suite to + your External Feature Group. However, as opposed to regular Feature Groups, this can lead to + discrepancies between the data in the external storage system and the online feature store. + + # Arguments + features: DataFrame, RDD, Ndarray, list. Features to be saved. + write_options: Additional write options as key-value pairs, defaults to `{}`. + When using the `python` engine, write_options can contain the + following entries: + * key `kafka_producer_config` and value an object of type [properties](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.htmln) + used to configure the Kafka client. To optimize for throughput in high latency connection consider + changing [producer properties](https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html#producer). + * key `internal_kafka` and value `True` or `False` in case you established + connectivity from you Python environment to the internal advertised + listeners of the Hopsworks Kafka Cluster. Defaults to `False` and + will use external listeners when connecting from outside of Hopsworks. + validation_options: Additional validation options as key-value pairs, defaults to `{}`. + * key `run_validation` boolean value, set to `False` to skip validation temporarily on ingestion. + * key `save_report` boolean value, set to `False` to skip upload of the validation report to Hopsworks. + * key `ge_validate_kwargs` a dictionary containing kwargs for the validate method of Great Expectations. + * key `fetch_expectation_suite` a boolean value, by default `True`, to control whether the expectation + suite of the feature group should be fetched before every insert. + save_code: When running HSFS on Hopsworks or Databricks, HSFS can save the code/notebook used to create + the feature group or used to insert data to it. When calling the `insert` method repeatedly + with small batches of data, this can slow down the writes. Use this option to turn off saving + code. Defaults to `True`. + + # Returns + Tuple(`Job`, `ValidationReport`) The validation report if validation is enabled. + + # Raises + `hsfs.client.exceptions.RestAPIError`. e.g fail to create feature group, dataframe schema does not match + existing feature group schema, etc. + `hsfs.client.exceptions.DataValidationException`. If data validation fails and the expectation + suite `validation_ingestion_policy` is set to `STRICT`. Data is NOT ingested. + + """ feature_dataframe = engine.get_instance().convert_to_default_dataframe(features) if write_options is None: diff --git a/python/hsfs/util.py b/python/hsfs/util.py index 1bb95a6921..8fbcecd3dd 100644 --- a/python/hsfs/util.py +++ b/python/hsfs/util.py @@ -422,6 +422,18 @@ def run_with_loading_animation(message, func, *args, **kwargs): print(f"\rFinished: {message} ({(end-start):.2f}s) ", end="\n") +def get_feature_group_url(feature_store_id: int, feature_group_id: int): + sub_path = ( + "/p/" + + str(client.get_instance()._project_id) + + "/fs/" + + str(feature_store_id) + + "/fg/" + + str(feature_group_id) + ) + return get_hostname_replaced_url(sub_path) + + class VersionWarning(Warning): pass diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index c01f8166fa..01233534d1 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -201,6 +201,10 @@ def test_insert_ge_report(self, mocker): "hsfs.core.great_expectation_engine.GreatExpectationEngine" ) mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") + mocker.patch( + "hsfs.util.get_feature_group_url", + return_value="url", + ) fg_engine = feature_group_engine.FeatureGroupEngine( feature_store_id=feature_store_id @@ -225,14 +229,15 @@ def test_insert_ge_report(self, mocker): mock_ge_engine.return_value.validate.return_value = vr # Act - fg_engine.insert( - feature_group=fg, - feature_dataframe=None, - overwrite=None, - operation=None, - storage=None, - write_options=None, - ) + with pytest.raises(exceptions.DataValidationException): + fg_engine.insert( + feature_group=fg, + feature_dataframe=None, + overwrite=None, + operation=None, + storage=None, + write_options=None, + ) # Assert assert mock_fg_api.return_value.delete_content.call_count == 0 @@ -1056,7 +1061,7 @@ def test_save_feature_group_metadata(self, mocker): ) mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url", + "hsfs.util.get_feature_group_url", return_value=feature_group_url, ) mock_print = mocker.patch("builtins.print") @@ -1104,7 +1109,7 @@ def test_save_feature_group_metadata_features(self, mocker): ) mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url", + "hsfs.util.get_feature_group_url", return_value=feature_group_url, ) mock_print = mocker.patch("builtins.print") @@ -1153,7 +1158,7 @@ def test_save_feature_group_metadata_primary_partition_precombine(self, mocker): ) mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url", + "hsfs.util.get_feature_group_url", return_value=feature_group_url, ) mock_print = mocker.patch("builtins.print") @@ -1204,7 +1209,7 @@ def test_save_feature_group_metadata_primary_partition_precombine_event_error( ) mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url", + "hsfs.util.get_feature_group_url", return_value=feature_group_url, ) @@ -1309,7 +1314,7 @@ def test_save_feature_group_metadata_write_options(self, mocker): ) mock_fg_api = mocker.patch("hsfs.core.feature_group_api.FeatureGroupApi") mocker.patch( - "hsfs.core.feature_group_engine.FeatureGroupEngine._get_feature_group_url", + "hsfs.util.get_feature_group_url", return_value=feature_group_url, ) mock_print = mocker.patch("builtins.print") @@ -1346,37 +1351,3 @@ def test_save_feature_group_metadata_write_options(self, mocker): ] == "Feature Group created successfully, explore it at \n{}".format( feature_group_url ) - - def test_get_feature_group_url(self, mocker): - # Arrange - feature_store_id = 99 - - mocker.patch("hsfs.engine.get_type") - mock_client_get_instance = mocker.patch("hsfs.client.get_instance") - mock_util_get_hostname_replaced_url = mocker.patch( - "hsfs.util.get_hostname_replaced_url" - ) - - fg_engine = feature_group_engine.FeatureGroupEngine( - feature_store_id=feature_store_id - ) - - fg = feature_group.FeatureGroup( - name="test", - version=1, - featurestore_id=feature_store_id, - primary_key=[], - partition_key=[], - id=10, - ) - - mock_client_get_instance.return_value._project_id = 50 - - # Act - fg_engine._get_feature_group_url(feature_group=fg) - - # Assert - assert mock_util_get_hostname_replaced_url.call_count == 1 - assert ( - mock_util_get_hostname_replaced_url.call_args[0][0] == "/p/50/fs/99/fg/10" - ) diff --git a/python/tests/test_util.py b/python/tests/test_util.py index 94df8dd703..4d1465f8ae 100644 --- a/python/tests/test_util.py +++ b/python/tests/test_util.py @@ -123,3 +123,24 @@ def test_get_job_url(self, mocker): ].path == "p/5/jobs/named/7/executions" ) + + def test_get_feature_group_url(self, mocker): + # Arrange + feature_store_id = 99 + feature_group_id = 10 + mock_client_get_instance = mocker.patch("hsfs.client.get_instance") + mock_util_get_hostname_replaced_url = mocker.patch( + "hsfs.util.get_hostname_replaced_url" + ) + mock_client_get_instance.return_value._project_id = 50 + + # Act + util.get_feature_group_url( + feature_group_id=feature_group_id, feature_store_id=feature_store_id + ) + + # Assert + assert mock_util_get_hostname_replaced_url.call_count == 1 + assert ( + mock_util_get_hostname_replaced_url.call_args[0][0] == "/p/50/fs/99/fg/10" + )