Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Monitoring #330

Merged
merged 15 commits into from
Jun 13, 2022
79 changes: 40 additions & 39 deletions feathr_project/feathr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .definition.source import *
from .definition.typed_key import *
from .definition.materialization_settings import *
from .definition.monitoring_settings import *
from .definition.sink import *
from .definition.query_feature_list import *
from .definition.lookup_feature import *
Expand All @@ -19,7 +20,7 @@
from .api.app.core.feathr_api_exception import *

# skipped class as they are internal methods:
# RepoDefinitions, HoconConvertible,
# RepoDefinitions, HoconConvertible,
# expose the modules so docs can build
# referencee: https://stackoverflow.com/questions/15115514/how-do-i-document-classes-without-the-module-name/31594545#31594545

Expand All @@ -30,45 +31,45 @@


__all__ = [
'FeatureJoinJobParams',
'FeatureGenerationJobParams',
'FeathrClient',
'DerivedFeature',
'FeatureAnchor',
'Feature',
'ValueType',
'WindowAggTransformation',
'TypedKey',
'DUMMYKEY',
'BackfillTime',
'MaterializationSettings',
'RedisSink',
'FeatureQuery',
'LookupFeature',
'Aggregation',
'get_result_df',
'AvroJsonSchema',
'Source',
'InputContext',
'HdfsSource',
'FeatureJoinJobParams',
'FeatureGenerationJobParams',
'FeathrClient',
'DerivedFeature',
'FeatureAnchor',
'Feature',
'ValueType',
'WindowAggTransformation',
'TypedKey',
'DUMMYKEY',
'BackfillTime',
'MaterializationSettings',
'MonitoringSettings',
'RedisSink',
'MonitoringSqlSink',
'FeatureQuery',
'LookupFeature',
'Aggregation',
'get_result_df',
'AvroJsonSchema',
'Source',
'InputContext',
'HdfsSource',
'KafkaConfig',
'KafKaSource',
'ValueType',
'BooleanFeatureType',
'Int32FeatureType',
'Int64FeatureType',
'FloatFeatureType',
'DoubleFeatureType',
'StringFeatureType',
'KafKaSource',
'ValueType',
'BooleanFeatureType',
'Int32FeatureType',
'Int64FeatureType',
'FloatFeatureType',
'DoubleFeatureType',
'StringFeatureType',
'BytesFeatureType',
'FloatVectorFeatureType',
'Int32VectorFeatureType',
'Int64VectorFeatureType',
'DoubleVectorFeatureType',
'FeatureNameValidationError',
'ObservationSettings',
'FeaturePrinter',
'FloatVectorFeatureType',
'Int32VectorFeatureType',
'Int64VectorFeatureType',
'DoubleVectorFeatureType',
'FeatureNameValidationError',
'ObservationSettings',
'FeaturePrinter',
'SparkExecutionConfiguration',
]


65 changes: 48 additions & 17 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration
from feathr.definition.feature_derivations import DerivedFeature
from feathr.definition.materialization_settings import MaterializationSettings
from feathr.definition.monitoring_settings import MonitoringSettings
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.definition.query_feature_list import FeatureQuery
from feathr.definition.settings import ObservationSettings
Expand Down Expand Up @@ -237,7 +238,7 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir)
self.anchor_list = anchor_list
self.derived_feature_list = derived_feature_list

# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)
Expand Down Expand Up @@ -400,7 +401,7 @@ def get_offline_features(self,
observation_settings: ObservationSettings,
feature_query: Union[FeatureQuery, List[FeatureQuery]],
output_path: str,
execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None,
execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {},
udf_files = None,
verbose: bool = False
):
Expand Down Expand Up @@ -441,15 +442,15 @@ def get_offline_features(self,
_FeatureRegistry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir)
else:
raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features")

# Pretty print feature_query
if verbose and feature_query:
FeaturePrinter.pretty_print_feature_query(feature_query)

write_to_file(content=config, full_file_name=config_file_path)
return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files)

def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = None, udf_files=[]):
def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = {}, udf_files=[]):
"""Joins the features to your offline observation dataset based on the join config.

Args:
Expand Down Expand Up @@ -523,7 +524,16 @@ def wait_job_to_finish(self, timeout_sec: int = 300):
else:
raise RuntimeError('Spark job failed.')

def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, verbose: bool = False):
def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False):
"""Create a offline job to generate statistics to monitor feature data

Args:
settings: Feature monitoring settings
execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations.
"""
self.materialize_features(settings, execution_configuratons, verbose)

def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False):
"""Materialize feature data

Args:
Expand Down Expand Up @@ -551,12 +561,12 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files)
if os.path.exists(config_file_path):
os.remove(config_file_path)

# Pretty print feature_names of materialized features
if verbose and settings:
FeaturePrinter.pretty_print_materialize_features(settings)

def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = None, udf_files=[]):
def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = {}, udf_files=[]):
"""Materializes feature data based on the feature generation config. The feature
data will be materialized to the destination specified in the feature generation config.

Expand All @@ -578,24 +588,29 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur
optional_params = []
if _EnvVaraibleUtil.get_environment_variable('KAFKA_SASL_JAAS_CONFIG'):
optional_params = optional_params + ['--kafka-config', self._get_kafka_config_str()]
return self.feathr_spark_laucher.submit_feathr_job(
job_name=self.project_name + '_feathr_feature_materialization_job',
main_jar_path=self._FEATHR_JOB_JAR_PATH,
python_files=cloud_udf_paths,
main_class_name='com.linkedin.feathr.offline.job.FeatureGenJob',
arguments=[
arguments = [
'--generation-config', self.feathr_spark_laucher.upload_or_get_cloud_path(
generation_config.generation_config_path),
generation_config.generation_config_path),
# Local Config, comma seperated file names
'--feature-config', self.feathr_spark_laucher.upload_or_get_cloud_path(
generation_config.feature_config),
generation_config.feature_config),
'--redis-config', self._getRedisConfigStr(),
'--s3-config', self._get_s3_config_str(),
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str()
] + optional_params,
'--snowflake-config', self._get_snowflake_config_str(),
] + optional_params
monitoring_config_str = self._get_monitoring_config_str()
if monitoring_config_str:
arguments.append('--monitoring-config')
arguments.append(monitoring_config_str)
return self.feathr_spark_laucher.submit_feathr_job(
job_name=self.project_name + '_feathr_feature_materialization_job',
main_jar_path=self._FEATHR_JOB_JAR_PATH,
python_files=cloud_udf_paths,
main_class_name='com.linkedin.feathr.offline.job.FeatureGenJob',
arguments=arguments,
reference_files_path=[],
configuration=execution_configuratons,
)
Expand Down Expand Up @@ -688,6 +703,22 @@ def _get_sql_config_str(self):
""".format(JDBC_TABLE=table, JDBC_USER=user, JDBC_PASSWORD=password, JDBC_DRIVER = driver, JDBC_AUTH_FLAG = auth_flag, JDBC_TOKEN = token)
return config_str

def _get_monitoring_config_str(self):
"""Construct monitoring-related config string."""
url = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'url')
user = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'user')
password = self.envutils.get_environment_variable('MONITORING_DATABASE_SQL_PASSWORD')
if url:
# HOCCON format will be parsed by the Feathr job
config_str = """
MONITORING_DATABASE_SQL_URL: "{url}"
MONITORING_DATABASE_SQL_USER: {user}
MONITORING_DATABASE_SQL_PASSWORD: {password}
""".format(url=url, user=user, password=password)
return config_str
else:
""

def _get_snowflake_config_str(self):
"""Construct the Snowflake config string for jdbc. The url, user, role and other parameters can be set via
yaml config. Password can be set via environment variables."""
Expand Down
8 changes: 8 additions & 0 deletions feathr_project/feathr/definition/monitoring_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from feathr.definition.materialization_settings import MaterializationSettings


# it's completely the same as MaterializationSettings. But we renamed it to improve usability.
# In the future, we may want to rely a separate system other than MaterializationSettings to generate stats.
class MonitoringSettings(MaterializationSettings):
"""Settings about monitoring features.
"""
21 changes: 21 additions & 0 deletions feathr_project/feathr/definition/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ class Sink(HoconConvertible):
"""
pass

class MonitoringSqlSink(Sink):
"""SQL-based sink that stores feature monitoring results.

Attributes:
table_name: output table name
"""
def __init__(self, table_name: str) -> None:
self.table_name = table_name

def to_feature_config(self) -> str:
"""Produce the config used in feature monitoring"""
tm = Template("""
{
name: MONITORING
params: {
table_name: "{{source.table_name}}"
}
}
""")
msg = tm.render(source=self)
return msg

class RedisSink(Sink):
"""Redis-based sink use to store online feature data, can be used in batch job or streaming job.
Expand Down
4 changes: 2 additions & 2 deletions feathr_project/feathr/spark_provider/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class SparkJobLauncher(ABC):
@abstractmethod
def upload_or_get_cloud_path(self, local_path_or_http_path: str):
"""upload a file from local path or an http path to the current work directory. Should support transferring file from an http path to cloud working storage, or upload directly from a local storage.

Args:
local_path_or_http_path (str): local path or http path
"""
Expand All @@ -19,7 +19,7 @@ def upload_or_get_cloud_path(self, local_path_or_http_path: str):
@abstractmethod
def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str],
reference_files_path: List[str], job_tags: Dict[str, str] = None,
configuration: Dict[str, str] = None):
configuration: Dict[str, str] = {}):
"""
Submits the feathr job

Expand Down
12 changes: 8 additions & 4 deletions feathr_project/feathr/spark_provider/_databricks_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ def upload_local_file(self, local_path: str) -> str:
file_name = os.path.basename(local_path)
# returned paths for the uploaded file
returned_path = os.path.join(self.databricks_work_dir, file_name)
# `local_path_or_http_path` will be either string or PathLib object, so normalize it to string
# `local_path_or_http_path` will be either string or PathLib object, so normalize it to string
local_path = str(local_path)
DbfsApi(self.api_client).cp(recursive=True, overwrite=True, src=local_path, dst=returned_path)
return returned_path

def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str], python_files: List[str], reference_files_path: List[str] = [], job_tags: Dict[str, str] = None, configuration: Dict[str, str] = None):
def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str], python_files: List[str], reference_files_path: List[str] = [], job_tags: Dict[str, str] = None, configuration: Dict[str, str] = {}):
"""
submit the feathr job to databricks
Refer to the databricks doc for more details on the meaning of the parameters:
Expand All @@ -140,6 +140,10 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name:
submission_params['run_name'] = job_name
if 'existing_cluster_id' not in submission_params:
# if users don't specify existing_cluster_id
# Solving this issue: Handshake fails trying to connect from Azure Databricks to Azure PostgreSQL with SSL
# https://docs.microsoft.com/en-us/answers/questions/170730/handshake-fails-trying-to-connect-from-azure-datab.html
configuration['spark.executor.extraJavaOptions'] = '-Djava.security.properties='
configuration['spark.driver.extraJavaOptions'] = '-Djava.security.properties='
submission_params['new_cluster']['spark_conf'] = configuration
submission_params['new_cluster']['custom_tags'] = job_tags
# the feathr main jar file is anyway needed regardless it's pyspark or scala spark
Expand Down Expand Up @@ -227,15 +231,15 @@ def get_job_tags(self) -> Dict[str, str]:
assert self.res_job_id is not None
# For result structure, see https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--response-structure-6
result = RunsApi(self.api_client).get_run(self.res_job_id)

if 'new_cluster' in result['cluster_spec']:
custom_tags = result['cluster_spec']['new_cluster']['custom_tags']
return custom_tags
else:
# this is not a new cluster; it's an existing cluster.
logger.warning("Job tags are not available since you are using an existing Databricks cluster. Consider using 'new_cluster' in databricks configuration.")
return None


def download_result(self, result_path: str, local_folder: str):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def download_result(self, result_path: str, local_folder: str):

def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_class_name: str = None, arguments: List[str] = None,
python_files: List[str]= None, reference_files_path: List[str] = None, job_tags: Dict[str, str] = None,
configuration: Dict[str, str] = None):
configuration: Dict[str, str] = {}):
"""
Submits the feathr job
Refer to the Apache Livy doc for more details on the meaning of the parameters:
Expand Down
25 changes: 25 additions & 0 deletions feathr_project/test/test_azure_feature_monitoring_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import os
from pathlib import Path

from feathr import MonitoringSettings
from feathr import RedisSink, MonitoringSqlSink

from test_fixture import (basic_test_setup, get_online_test_table_name)


def test_feature_monitoring():
online_test_table = get_online_test_table_name("nycTaxiCITable")
test_workspace_dir = Path(
__file__).parent.resolve() / "test_user_workspace"

client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"))

monitor_sink = MonitoringSqlSink(table_name=online_test_table)
settings = MonitoringSettings("monitoringSetting",
sinks=[monitor_sink],
feature_names=[
"f_location_avg_fare", "f_location_max_fare"])
client.monitor_features(settings)
# just assume the job is successful without validating the actual result in Redis. Might need to consolidate
# this part with the test_feathr_online_store test case
client.wait_job_to_finish(timeout_sec=900)
Loading