diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index ac76f6b5a..301dfbc4b 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -3,7 +3,8 @@ import logging import os import tempfile -from typing import Dict, List, Union +import json +from typing import Dict, List, Union, Tuple from azure.identity import DefaultAzureCredential from feathr.definition.transformation import WindowAggTransformation @@ -22,6 +23,7 @@ from feathr.definition.query_feature_list import FeatureQuery from feathr.definition.settings import ObservationSettings from feathr.definition.sink import Sink, HdfsSink +from feathr.definition.typed_key import TypedKey from feathr.protobuf.featureValue_pb2 import FeatureValue from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher from feathr.spark_provider._localspark_submission import _FeathrLocalSparkJobLauncher @@ -38,7 +40,7 @@ from loguru import logger from feathr.definition.config_helper import FeathrConfigHelper from pyhocon import ConfigFactory -from feathr.registry._feathr_registry_client import _FeatureRegistry +from feathr.registry._feathr_registry_client import _FeatureRegistry, feature_to_def, derived_feature_to_def from feathr.registry._feature_registry_purview import _PurviewRegistry from feathr.version import get_version class FeathrClient(object): @@ -946,19 +948,32 @@ def _collect_secrets(self, additional_secrets=[]): prop_and_value[prop] = self.envutils.get_environment_variable_with_default(prop) return prop_and_value - def get_features_from_registry(self, project_name: str) -> Dict[str, FeatureBase]: + def get_features_from_registry(self, project_name: str, return_keys: bool = False, verbose: bool = False) -> Union[Dict[str, FeatureBase], Tuple[Dict[str, FeatureBase], Dict[str, Union[TypedKey, List[TypedKey]]]]]: """ Get feature from registry by project name. The features got from registry are automatically built. """ registry_anchor_list, registry_derived_feature_list = self.registry.get_features_from_registry(project_name) self.build_features(registry_anchor_list, registry_derived_feature_list) feature_dict = {} + key_dict = {} # add those features into a dict for easier lookup + if verbose and registry_anchor_list: + logger.info("Get anchor features from registry: ") for anchor in registry_anchor_list: for feature in anchor.features: feature_dict[feature.name] = feature + key_dict[feature.name] = feature.key + if verbose: + logger.info(json.dumps(feature_to_def(feature), indent=2)) + if verbose and registry_derived_feature_list: + logger.info("Get derived features from registry: ") for feature in registry_derived_feature_list: feature_dict[feature.name] = feature + key_dict[feature.name] = feature.key + if verbose: + logger.info(json.dumps(derived_feature_to_def(feature), indent=2)) + if return_keys: + return feature_dict, key_dict return feature_dict def _reshape_config_str(self, config_str:str): diff --git a/feathr_project/test/test_feature_registry.py b/feathr_project/test/test_feature_registry.py index 9fe66322a..681b443bf 100644 --- a/feathr_project/test/test_feature_registry.py +++ b/feathr_project/test/test_feature_registry.py @@ -78,8 +78,8 @@ def test_feathr_register_features_partially(self): client: FeathrClient = registry_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) client.register_features() time.sleep(30) - full_registration = client.get_features_from_registry(client.project_name) - + full_registration, keys = client.get_features_from_registry(client.project_name, return_keys = True, verbose = True) + assert len(keys['f_location_avg_fare']) == 2 now = datetime.now() os.environ["project_config__project_name"] = ''.join(['feathr_ci_registry','_', str(now.minute), '_', str(now.second), '_', str(now.microsecond)]) diff --git a/feathr_project/test/test_fixture.py b/feathr_project/test/test_fixture.py index 8c53b7acf..edd0fcb60 100644 --- a/feathr_project/test/test_fixture.py +++ b/feathr_project/test/test_fixture.py @@ -246,8 +246,12 @@ def add_new_dropoff_and_fare_amount_column(df: DataFrame): key_column_type=ValueType.INT32, description="location id in NYC", full_name="nyc_taxi.location_id") + pu_location_id = TypedKey(key_column="PULocationID", + key_column_type=ValueType.INT32, + full_name="nyc_taxi.pu_location_id" + ) agg_features = [Feature(name="f_location_avg_fare", - key=location_id, + key=[location_id,pu_location_id], feature_type=FLOAT, transform=WindowAggTransformation(agg_expr="cast_float(fare_amount)", agg_func="AVG",