Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pretty-print the features produced by buildFeatures #214

Merged
merged 10 commits into from
May 17, 2022
24 changes: 18 additions & 6 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Union
from typing import Dict, List, Union
from utils import FeaturePrinter
from feathr.feature import FeatureBase

import redis
Expand All @@ -22,12 +23,11 @@
from feathr.constants import *
from feathr.feathr_configurations import SparkExecutionConfiguration
from feathr.feature_derivations import DerivedFeature
from feathr.anchor import FeatureAnchor
from feathr.materialization_settings import MaterializationSettings
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.query_feature_list import FeatureQuery
from feathr.settings import ObservationSettings
from feathr.feature_derivations import DerivedFeature
from feathr.anchor import FeatureAnchor
from feathr.feathr_configurations import SparkExecutionConfiguration


Expand Down Expand Up @@ -211,7 +211,7 @@ def register_features(self, from_context: bool = True):
else:
self.registry.register_features(self.local_workspace_dir, from_context=from_context)

def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = []):
def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = [], verbose: bool = False):
"""Build features based on the current workspace. all actions that triggers a spark job will be based on the
result of this action.
"""
Expand All @@ -236,6 +236,10 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)

def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features. If project_name is not provided or is None, it will return all
Expand Down Expand Up @@ -397,6 +401,7 @@ def get_offline_features(self,
output_path: str,
execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None,
udf_files = None,
verbose: bool = False
):
"""
Get offline features for the observation dataset
Expand Down Expand Up @@ -435,6 +440,10 @@ def get_offline_features(self,
_FeatureRegistry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

# Pretty print feature_query
if verbose and feature_query:
FeaturePrinter.pretty_print_feature_query(feature_query)

write_to_file(content=config, full_file_name=config_file_path)
return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files)
Expand Down Expand Up @@ -513,15 +522,14 @@ def wait_job_to_finish(self, timeout_sec: int = 300):
else:
raise RuntimeError('Spark job failed.')

def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None):
def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, verbose: bool = False):
"""Materialize feature data

Args:
settings: Feature materialization settings
execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
"""
# produce materialization config

for end in settings.get_backfill_cutoff_time():
settings.backfill_time.end = end
config = _to_materialization_config(settings)
Expand All @@ -542,6 +550,10 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files)
if os.path.exists(config_file_path):
os.remove(config_file_path)

# Pretty print feature_names of materialized features
if verbose and settings:
FeaturePrinter.pretty_print_materialize_features(settings)

def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = None, udf_files=[]):
"""Materializes feature data based on the feature generation config. The feature
Expand Down
90 changes: 88 additions & 2 deletions feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import os
from datetime import datetime, timedelta
from pathlib import Path

from feathr._materialization_utils import _to_materialization_config
from feathr import (BackfillTime, MaterializationSettings)
from feathr import (BackfillTime, MaterializationSettings, FeatureQuery,
ObservationSettings, SparkExecutionConfiguration)
from feathr import RedisSink

from feathr.anchor import FeatureAnchor
from feathr.dtype import BOOLEAN, FLOAT, FLOAT_VECTOR, INT32, ValueType
from feathr.feature import Feature
from feathr.typed_key import TypedKey
from feathr import INPUT_CONTEXT
from test_fixture import basic_test_setup
from test_fixture import get_online_test_table_name

def test_feature_materialization_config():
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5,20), step=timedelta(days=1))
Expand Down Expand Up @@ -57,3 +66,80 @@ def test_feature_materialization_now_schedule():
assert expected.year == date.year
assert expected.month == date.month
assert expected.day == date.day

def test_build_feature_verbose():
"""
Test verbose for pretty printing features
"""
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

# An anchor feature
features = [
Feature(name="trip_distance", feature_type=FLOAT),
Feature(name="f_is_long_trip_distance",
feature_type=BOOLEAN,
transform="cast_float(trip_distance)>30"),
Feature(name="f_day_of_week",
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)")
]

anchor = FeatureAnchor(name="request_features",
source=INPUT_CONTEXT,
features=features)

# Check pretty print
client.build_features(anchor_list=[anchor], verbose=True)

def test_get_offline_features_verbose():
"""
Test verbose for pretty printing feature query
"""

test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

location_id = TypedKey(key_column="DOLocationID",
key_column_type=ValueType.INT32)

feature_query = FeatureQuery(feature_list=["f_location_avg_fare"], key=location_id)

settings = ObservationSettings(
observation_path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04",
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss"
)

now = datetime.now()

# set output folder based on different runtime
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob','_', str(now.minute), '_', str(now.second), ".parquet"])
else:
output_path = ''.join(['abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/demo_data/output','_', str(now.minute), '_', str(now.second), ".parquet"])

# Check pretty print
client.get_offline_features(
observation_settings=settings,
feature_query=feature_query,
output_path=output_path,
execution_configuratons=SparkExecutionConfiguration({"spark.feathr.inputFormat": "parquet", "spark.feathr.outputFormat": "parquet"}),
verbose=True
)

def test_materialize_features_verbose():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))
redisSink = RedisSink(table_name=online_test_table)
settings = MaterializationSettings("nycTaxiTable",
sinks=[redisSink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"],
backfill_time=backfill_time)
client.materialize_features(settings, verbose=True)
49 changes: 49 additions & 0 deletions feathr_project/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from pprint import pprint
from typing import Union, List

from feathr.anchor import FeatureAnchor
from feathr.query_feature_list import FeatureQuery
from feathr.materialization_settings import MaterializationSettings

class FeaturePrinter:
"""The class for pretty-printing features"""

@staticmethod
def pretty_print_anchors(anchor_list: List[FeatureAnchor]) -> None:
"""Pretty print features

Args:
feature_list: FeatureAnchor
"""

if all(isinstance(anchor, FeatureAnchor) for anchor in anchor_list):
for anchor in anchor_list:
pprint("%s is the achor of %s" % \
(anchor.name, [feature.name for feature in anchor.features]))
else:
raise TypeError("anchor_list must be FeatureAnchor or List[FeatureAnchor]")

@staticmethod
def pretty_print_feature_query(feature_query: FeatureQuery) -> None:
"""Pretty print feature query

Args:
feature_query: feature query
"""

if isinstance(feature_query, FeatureQuery):
print("Features in feature_query: %s" % feature_query.feature_list)
else:
raise TypeError("feature_query must be FeatureQuery")

@staticmethod
def pretty_print_materialize_features(settings: MaterializationSettings) -> None:
"""Pretty print feature query

Args:
feature_query: feature query
"""
if isinstance(settings, MaterializationSettings):
print("Materialization features in settings: %s" % settings.feature_names)
else:
raise TypeError("settings must be MaterializationSettings")