Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pretty-print the features produced by buildFeatures #214

Merged
merged 10 commits into from
May 17, 2022
29 changes: 26 additions & 3 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Union
from pprint import pprint

import redis
from azure.identity import DefaultAzureCredential
Expand Down Expand Up @@ -210,7 +211,7 @@ def register_features(self, from_context: bool = True):
else:
self.registry.register_features(self.local_workspace_dir, from_context=from_context)

def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = []):
def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_list: List[DerivedFeature] = [], pprint_flag: bool = False):
"""Build features based on the current workspace. all actions that triggers a spark job will be based on the
result of this action.
"""
Expand All @@ -235,6 +236,13 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

# Pretty print anchor list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

ahlag marked this conversation as resolved.
Show resolved Hide resolved
def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features. If project_name is not provided or is None, it will return all
Expand Down Expand Up @@ -396,6 +404,7 @@ def get_offline_features(self,
output_path: str,
execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None,
udf_files = None,
pprint_flag: bool = False
):
"""
Get offline features for the observation dataset
Expand Down Expand Up @@ -434,6 +443,13 @@ def get_offline_features(self,
_FeatureRegistry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

# Pretty print anchor list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

write_to_file(content=config, full_file_name=config_file_path)
return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files)
Expand Down Expand Up @@ -512,7 +528,7 @@ def wait_job_to_finish(self, timeout_sec: int = 300):
else:
raise RuntimeError('Spark job failed.')

def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None):
def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, pprint_flag: bool = False):
"""Materialize feature data

Args:
Expand Down Expand Up @@ -541,6 +557,13 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files)
if os.path.exists(config_file_path):
os.remove(config_file_path)

# Pretty print anchor_list and derived_feature_list
if pprint_flag:
if self.anchor_list:
pprint(self.anchor_list)
if self.derived_feature_list:
pprint(self.derived_feature_list)

def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = None, udf_files=[]):
"""Materializes feature data based on the feature generation config. The feature
Expand Down Expand Up @@ -698,4 +721,4 @@ def _get_kafka_config_str(self):
config_str = """
KAFKA_SASL_JAAS_CONFIG: "{sasl}"
""".format(sasl=sasl)
return config_str
return config_str
61 changes: 61 additions & 0 deletions feathr_project/test/test_feature_materialization.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import os
from datetime import datetime, timedelta
from pathlib import Path

from feathr._materialization_utils import _to_materialization_config
from feathr import (BackfillTime, MaterializationSettings)
from feathr import RedisSink
from feathr.anchor import FeatureAnchor
from feathr.dtype import FLOAT, INT32, ValueType
from feathr.feature import Feature
from feathr.source import HdfsSource
from feathr.typed_key import TypedKey
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

from test_fixture import basic_test_setup

def add_new_fare_amount(df: DataFrame) -> DataFrame:
df = df.withColumn("fare_amount_new", col("fare_amount") + 8000000)

return df

def test_feature_materialization_config():
backfill_time = BackfillTime(start=datetime(2020, 5, 20), end=datetime(2020, 5,20), step=timedelta(days=1))
Expand Down Expand Up @@ -57,3 +72,49 @@ def test_feature_materialization_now_schedule():
assert expected.year == date.year
assert expected.month == date.month
assert expected.day == date.day

def test_feature_materialization_now_schedule():
"""Test back fill cutoff time without backfill."""
settings = MaterializationSettings("", [], [])
date = settings.get_backfill_cutoff_time()[0]
expected = datetime.now()
assert expected.year == date.year
assert expected.month == date.month
assert expected.day == date.day
ahlag marked this conversation as resolved.
Show resolved Hide resolved

def test_build_feature_pretty_print_flag():
"""
Test non-SWA feature gen with preprocessing
"""
test_workspace_dir = Path(__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

batch_source = HdfsSource(name="nycTaxiBatchSource_add_new_fare_amount",
path="wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv",
preprocessing=add_new_fare_amount,
event_timestamp_column="lpep_dropoff_datetime",
timestamp_format="yyyy-MM-dd HH:mm:ss")

pickup_time_as_id = TypedKey(key_column="lpep_pickup_datetime",
key_column_type=ValueType.INT32,
description="location id in NYC",
full_name="nyc_taxi.location_id")

features = [
Feature(name="f_is_long_trip_distance",
key=pickup_time_as_id,
feature_type=FLOAT,
transform="fare_amount_new"),
Feature(name="f_day_of_week",
key=pickup_time_as_id,
feature_type=INT32,
transform="dayofweek(lpep_dropoff_datetime)"),
]

regular_anchor = FeatureAnchor(name="request_features_add_new_fare_amount",
source=batch_source,
features=features,
)

client.build_features(anchor_list=[regular_anchor], pprint_flag=True)
ahlag marked this conversation as resolved.
Show resolved Hide resolved