Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pretty-print the features produced by buildFeatures #214

Merged
merged 10 commits into from
May 17, 2022
30 changes: 27 additions & 3 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Union
from pprint import pprint
from xmlrpc.client import Boolean
ahlag marked this conversation as resolved.
Show resolved Hide resolved

import redis
from azure.identity import DefaultAzureCredential
Expand Down Expand Up @@ -210,7 +212,7 @@ def register_features(self, from_context: bool = True):
else:
self.registry.register_features(self.local_workspace_dir, from_context=from_context)

def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = []):
def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = [], pprint_flag: bool = False):
"""Build features based on the current workspace. all actions that triggers a spark job will be based on the
result of this action.
"""
Expand All @@ -235,6 +237,13 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

# Pretty print anchor list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

ahlag marked this conversation as resolved.
Show resolved Hide resolved
def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features. If project_name is not provided or is None, it will return all
Expand Down Expand Up @@ -396,6 +405,7 @@ def get_offline_features(self,
output_path: str,
execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None,
udf_files = None,
pprint_flag: bool = False
):
"""
Get offline features for the observation dataset
Expand Down Expand Up @@ -434,6 +444,13 @@ def get_offline_features(self,
_FeatureRegistry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

# Pretty print anchor list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

write_to_file(content=config, full_file_name=config_file_path)
return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files)
Expand Down Expand Up @@ -512,7 +529,7 @@ def wait_job_to_finish(self, timeout_sec: int = 300):
else:
raise RuntimeError('Spark job failed.')

def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None):
def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, pprint_flag: bool = False):
"""Materialize feature data

Args:
Expand Down Expand Up @@ -541,6 +558,13 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files)
if os.path.exists(config_file_path):
os.remove(config_file_path)

# Pretty print anchor_list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = None, udf_files=[]):
"""Materializes feature data based on the feature generation config. The feature
Expand Down Expand Up @@ -698,4 +722,4 @@ def _get_kafka_config_str(self):
config_str = """
KAFKA_SASL_JAAS_CONFIG: "{sasl}"
""".format(sasl=sasl)
return config_str
return config_str