diff --git a/feathr_project/feathr/__init__.py b/feathr_project/feathr/__init__.py index b62529dd5..bfa4b0895 100644 --- a/feathr_project/feathr/__init__.py +++ b/feathr_project/feathr/__init__.py @@ -9,6 +9,7 @@ from .definition.source import * from .definition.typed_key import * from .definition.materialization_settings import * +from .definition.monitoring_settings import * from .definition.sink import * from .definition.query_feature_list import * from .definition.lookup_feature import * @@ -19,7 +20,7 @@ from .api.app.core.feathr_api_exception import * # skipped class as they are internal methods: -# RepoDefinitions, HoconConvertible, +# RepoDefinitions, HoconConvertible, # expose the modules so docs can build # referenceeļ¼š https://stackoverflow.com/questions/15115514/how-do-i-document-classes-without-the-module-name/31594545#31594545 @@ -30,45 +31,45 @@ __all__ = [ - 'FeatureJoinJobParams', - 'FeatureGenerationJobParams', - 'FeathrClient', - 'DerivedFeature', - 'FeatureAnchor', - 'Feature', - 'ValueType', - 'WindowAggTransformation', - 'TypedKey', - 'DUMMYKEY', - 'BackfillTime', - 'MaterializationSettings', - 'RedisSink', - 'FeatureQuery', - 'LookupFeature', - 'Aggregation', - 'get_result_df', - 'AvroJsonSchema', - 'Source', - 'InputContext', - 'HdfsSource', + 'FeatureJoinJobParams', + 'FeatureGenerationJobParams', + 'FeathrClient', + 'DerivedFeature', + 'FeatureAnchor', + 'Feature', + 'ValueType', + 'WindowAggTransformation', + 'TypedKey', + 'DUMMYKEY', + 'BackfillTime', + 'MaterializationSettings', + 'MonitoringSettings', + 'RedisSink', + 'MonitoringSqlSink', + 'FeatureQuery', + 'LookupFeature', + 'Aggregation', + 'get_result_df', + 'AvroJsonSchema', + 'Source', + 'InputContext', + 'HdfsSource', 'KafkaConfig', - 'KafKaSource', - 'ValueType', - 'BooleanFeatureType', - 'Int32FeatureType', - 'Int64FeatureType', - 'FloatFeatureType', - 'DoubleFeatureType', - 'StringFeatureType', + 'KafKaSource', + 'ValueType', + 'BooleanFeatureType', + 'Int32FeatureType', + 'Int64FeatureType', + 'FloatFeatureType', + 'DoubleFeatureType', + 'StringFeatureType', 'BytesFeatureType', - 'FloatVectorFeatureType', - 'Int32VectorFeatureType', - 'Int64VectorFeatureType', - 'DoubleVectorFeatureType', - 'FeatureNameValidationError', - 'ObservationSettings', - 'FeaturePrinter', + 'FloatVectorFeatureType', + 'Int32VectorFeatureType', + 'Int64VectorFeatureType', + 'DoubleVectorFeatureType', + 'FeatureNameValidationError', + 'ObservationSettings', + 'FeaturePrinter', 'SparkExecutionConfiguration', ] - - diff --git a/feathr_project/feathr/client.py b/feathr_project/feathr/client.py index 8314bdba2..0ebe3ffa0 100644 --- a/feathr_project/feathr/client.py +++ b/feathr_project/feathr/client.py @@ -22,6 +22,7 @@ from feathr.spark_provider.feathr_configurations import SparkExecutionConfiguration from feathr.definition.feature_derivations import DerivedFeature from feathr.definition.materialization_settings import MaterializationSettings +from feathr.definition.monitoring_settings import MonitoringSettings from feathr.protobuf.featureValue_pb2 import FeatureValue from feathr.definition.query_feature_list import FeatureQuery from feathr.definition.settings import ObservationSettings @@ -237,7 +238,7 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_ self.registry.save_to_feature_config_from_context(anchor_list, derived_feature_list, self.local_workspace_dir) self.anchor_list = anchor_list self.derived_feature_list = derived_feature_list - + # Pretty print anchor_list if verbose and self.anchor_list: FeaturePrinter.pretty_print_anchors(self.anchor_list) @@ -400,7 +401,7 @@ def get_offline_features(self, observation_settings: ObservationSettings, feature_query: Union[FeatureQuery, List[FeatureQuery]], output_path: str, - execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, + execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, udf_files = None, verbose: bool = False ): @@ -441,7 +442,7 @@ def get_offline_features(self, _FeatureRegistry.save_to_feature_config_from_context(self.anchor_list, self.derived_feature_list, self.local_workspace_dir) else: raise RuntimeError("Please call FeathrClient.build_features() first in order to get offline features") - + # Pretty print feature_query if verbose and feature_query: FeaturePrinter.pretty_print_feature_query(feature_query) @@ -449,7 +450,7 @@ def get_offline_features(self, write_to_file(content=config, full_file_name=config_file_path) return self._get_offline_features_with_config(config_file_path, execution_configuratons, udf_files=udf_files) - def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = None, udf_files=[]): + def _get_offline_features_with_config(self, feature_join_conf_path='feature_join_conf/feature_join.conf', execution_configuratons: Dict[str,str] = {}, udf_files=[]): """Joins the features to your offline observation dataset based on the join config. Args: @@ -523,7 +524,16 @@ def wait_job_to_finish(self, timeout_sec: int = 300): else: raise RuntimeError('Spark job failed.') - def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = None, verbose: bool = False): + def monitor_features(self, settings: MonitoringSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False): + """Create a offline job to generate statistics to monitor feature data + + Args: + settings: Feature monitoring settings + execution_configuratons: a dict that will be passed to spark job when the job starts up, i.e. the "spark configurations". Note that not all of the configuration will be honored since some of the configurations are managed by the Spark platform, such as Databricks or Azure Synapse. Refer to the [spark documentation](https://spark.apache.org/docs/latest/configuration.html) for a complete list of spark configurations. + """ + self.materialize_features(settings, execution_configuratons, verbose) + + def materialize_features(self, settings: MaterializationSettings, execution_configuratons: Union[SparkExecutionConfiguration ,Dict[str,str]] = {}, verbose: bool = False): """Materialize feature data Args: @@ -551,12 +561,12 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf self._materialize_features_with_config(config_file_path, execution_configuratons, udf_files) if os.path.exists(config_file_path): os.remove(config_file_path) - + # Pretty print feature_names of materialized features if verbose and settings: FeaturePrinter.pretty_print_materialize_features(settings) - def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = None, udf_files=[]): + def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configuratons: Dict[str,str] = {}, udf_files=[]): """Materializes feature data based on the feature generation config. The feature data will be materialized to the destination specified in the feature generation config. @@ -578,24 +588,29 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur optional_params = [] if _EnvVaraibleUtil.get_environment_variable('KAFKA_SASL_JAAS_CONFIG'): optional_params = optional_params + ['--kafka-config', self._get_kafka_config_str()] - return self.feathr_spark_laucher.submit_feathr_job( - job_name=self.project_name + '_feathr_feature_materialization_job', - main_jar_path=self._FEATHR_JOB_JAR_PATH, - python_files=cloud_udf_paths, - main_class_name='com.linkedin.feathr.offline.job.FeatureGenJob', - arguments=[ + arguments = [ '--generation-config', self.feathr_spark_laucher.upload_or_get_cloud_path( - generation_config.generation_config_path), + generation_config.generation_config_path), # Local Config, comma seperated file names '--feature-config', self.feathr_spark_laucher.upload_or_get_cloud_path( - generation_config.feature_config), + generation_config.feature_config), '--redis-config', self._getRedisConfigStr(), '--s3-config', self._get_s3_config_str(), '--adls-config', self._get_adls_config_str(), '--blob-config', self._get_blob_config_str(), '--sql-config', self._get_sql_config_str(), - '--snowflake-config', self._get_snowflake_config_str() - ] + optional_params, + '--snowflake-config', self._get_snowflake_config_str(), + ] + optional_params + monitoring_config_str = self._get_monitoring_config_str() + if monitoring_config_str: + arguments.append('--monitoring-config') + arguments.append(monitoring_config_str) + return self.feathr_spark_laucher.submit_feathr_job( + job_name=self.project_name + '_feathr_feature_materialization_job', + main_jar_path=self._FEATHR_JOB_JAR_PATH, + python_files=cloud_udf_paths, + main_class_name='com.linkedin.feathr.offline.job.FeatureGenJob', + arguments=arguments, reference_files_path=[], configuration=execution_configuratons, ) @@ -688,6 +703,22 @@ def _get_sql_config_str(self): """.format(JDBC_TABLE=table, JDBC_USER=user, JDBC_PASSWORD=password, JDBC_DRIVER = driver, JDBC_AUTH_FLAG = auth_flag, JDBC_TOKEN = token) return config_str + def _get_monitoring_config_str(self): + """Construct monitoring-related config string.""" + url = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'url') + user = self.envutils.get_environment_variable_with_default('monitoring', 'database', 'sql', 'user') + password = self.envutils.get_environment_variable('MONITORING_DATABASE_SQL_PASSWORD') + if url: + # HOCCON format will be parsed by the Feathr job + config_str = """ + MONITORING_DATABASE_SQL_URL: "{url}" + MONITORING_DATABASE_SQL_USER: {user} + MONITORING_DATABASE_SQL_PASSWORD: {password} + """.format(url=url, user=user, password=password) + return config_str + else: + "" + def _get_snowflake_config_str(self): """Construct the Snowflake config string for jdbc. The url, user, role and other parameters can be set via yaml config. Password can be set via environment variables.""" diff --git a/feathr_project/feathr/definition/monitoring_settings.py b/feathr_project/feathr/definition/monitoring_settings.py new file mode 100644 index 000000000..ee39f84d5 --- /dev/null +++ b/feathr_project/feathr/definition/monitoring_settings.py @@ -0,0 +1,8 @@ +from feathr.definition.materialization_settings import MaterializationSettings + + +# it's completely the same as MaterializationSettings. But we renamed it to improve usability. +# In the future, we may want to rely a separate system other than MaterializationSettings to generate stats. +class MonitoringSettings(MaterializationSettings): + """Settings about monitoring features. + """ diff --git a/feathr_project/feathr/definition/sink.py b/feathr_project/feathr/definition/sink.py index 54088c3af..73542fa3b 100644 --- a/feathr_project/feathr/definition/sink.py +++ b/feathr_project/feathr/definition/sink.py @@ -8,6 +8,27 @@ class Sink(HoconConvertible): """ pass +class MonitoringSqlSink(Sink): + """SQL-based sink that stores feature monitoring results. + + Attributes: + table_name: output table name + """ + def __init__(self, table_name: str) -> None: + self.table_name = table_name + + def to_feature_config(self) -> str: + """Produce the config used in feature monitoring""" + tm = Template(""" + { + name: MONITORING + params: { + table_name: "{{source.table_name}}" + } + } + """) + msg = tm.render(source=self) + return msg class RedisSink(Sink): """Redis-based sink use to store online feature data, can be used in batch job or streaming job. diff --git a/feathr_project/feathr/spark_provider/_abc.py b/feathr_project/feathr/spark_provider/_abc.py index b7ecc907d..998b9e88d 100644 --- a/feathr_project/feathr/spark_provider/_abc.py +++ b/feathr_project/feathr/spark_provider/_abc.py @@ -9,7 +9,7 @@ class SparkJobLauncher(ABC): @abstractmethod def upload_or_get_cloud_path(self, local_path_or_http_path: str): """upload a file from local path or an http path to the current work directory. Should support transferring file from an http path to cloud working storage, or upload directly from a local storage. - + Args: local_path_or_http_path (str): local path or http path """ @@ -19,7 +19,7 @@ def upload_or_get_cloud_path(self, local_path_or_http_path: str): @abstractmethod def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str], reference_files_path: List[str], job_tags: Dict[str, str] = None, - configuration: Dict[str, str] = None): + configuration: Dict[str, str] = {}): """ Submits the feathr job diff --git a/feathr_project/feathr/spark_provider/_databricks_submission.py b/feathr_project/feathr/spark_provider/_databricks_submission.py index b5368c4f3..4ef48cb48 100644 --- a/feathr_project/feathr/spark_provider/_databricks_submission.py +++ b/feathr_project/feathr/spark_provider/_databricks_submission.py @@ -111,12 +111,12 @@ def upload_local_file(self, local_path: str) -> str: file_name = os.path.basename(local_path) # returned paths for the uploaded file returned_path = os.path.join(self.databricks_work_dir, file_name) - # `local_path_or_http_path` will be either string or PathLib object, so normalize it to string + # `local_path_or_http_path` will be either string or PathLib object, so normalize it to string local_path = str(local_path) DbfsApi(self.api_client).cp(recursive=True, overwrite=True, src=local_path, dst=returned_path) return returned_path - def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str], python_files: List[str], reference_files_path: List[str] = [], job_tags: Dict[str, str] = None, configuration: Dict[str, str] = None): + def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: str, arguments: List[str], python_files: List[str], reference_files_path: List[str] = [], job_tags: Dict[str, str] = None, configuration: Dict[str, str] = {}): """ submit the feathr job to databricks Refer to the databricks doc for more details on the meaning of the parameters: @@ -140,6 +140,10 @@ def submit_feathr_job(self, job_name: str, main_jar_path: str, main_class_name: submission_params['run_name'] = job_name if 'existing_cluster_id' not in submission_params: # if users don't specify existing_cluster_id + # Solving this issue: Handshake fails trying to connect from Azure Databricks to Azure PostgreSQL with SSL + # https://docs.microsoft.com/en-us/answers/questions/170730/handshake-fails-trying-to-connect-from-azure-datab.html + configuration['spark.executor.extraJavaOptions'] = '-Djava.security.properties=' + configuration['spark.driver.extraJavaOptions'] = '-Djava.security.properties=' submission_params['new_cluster']['spark_conf'] = configuration submission_params['new_cluster']['custom_tags'] = job_tags # the feathr main jar file is anyway needed regardless it's pyspark or scala spark @@ -231,7 +235,7 @@ def get_job_tags(self) -> Dict[str, str]: assert self.res_job_id is not None # For result structure, see https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs#--response-structure-6 result = RunsApi(self.api_client).get_run(self.res_job_id) - + if 'new_cluster' in result['cluster_spec']: custom_tags = result['cluster_spec']['new_cluster']['custom_tags'] return custom_tags @@ -239,7 +243,7 @@ def get_job_tags(self) -> Dict[str, str]: # this is not a new cluster; it's an existing cluster. logger.warning("Job tags are not available since you are using an existing Databricks cluster. Consider using 'new_cluster' in databricks configuration.") return None - + def download_result(self, result_path: str, local_folder: str): """ diff --git a/feathr_project/feathr/spark_provider/_synapse_submission.py b/feathr_project/feathr/spark_provider/_synapse_submission.py index 98d4d05fd..3bf681275 100644 --- a/feathr_project/feathr/spark_provider/_synapse_submission.py +++ b/feathr_project/feathr/spark_provider/_synapse_submission.py @@ -78,8 +78,8 @@ def download_result(self, result_path: str, local_folder: str): return self._datalake.download_file(result_path, local_folder) def submit_feathr_job(self, job_name: str, main_jar_path: str = None, main_class_name: str = None, arguments: List[str] = None, - python_files: List[str] = None, reference_files_path: List[str] = None, job_tags: Dict[str, str] = None, - configuration: Dict[str, str] = None): + python_files: List[str]= None, reference_files_path: List[str] = None, job_tags: Dict[str, str] = None, + configuration: Dict[str, str] = {}): """ Submits the feathr job Refer to the Apache Livy doc for more details on the meaning of the parameters: diff --git a/feathr_project/test/test_azure_feature_monitoring_e2e.py b/feathr_project/test/test_azure_feature_monitoring_e2e.py new file mode 100644 index 000000000..ed2cbebd2 --- /dev/null +++ b/feathr_project/test/test_azure_feature_monitoring_e2e.py @@ -0,0 +1,25 @@ +import os +from pathlib import Path + +from feathr import MonitoringSettings +from feathr import RedisSink, MonitoringSqlSink + +from test_fixture import (basic_test_setup, get_online_test_table_name) + + +def test_feature_monitoring(): + online_test_table = get_online_test_table_name("nycTaxiCITable") + test_workspace_dir = Path( + __file__).parent.resolve() / "test_user_workspace" + + client = basic_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml")) + + monitor_sink = MonitoringSqlSink(table_name=online_test_table) + settings = MonitoringSettings("monitoringSetting", + sinks=[monitor_sink], + feature_names=[ + "f_location_avg_fare", "f_location_max_fare"]) + client.monitor_features(settings) + # just assume the job is successful without validating the actual result in Redis. Might need to consolidate + # this part with the test_feathr_online_store test case + client.wait_job_to_finish(timeout_sec=900) diff --git a/feathr_project/test/test_user_workspace/feathr_config.yaml b/feathr_project/test/test_user_workspace/feathr_config.yaml index 9269b85cb..f92dcb492 100644 --- a/feathr_project/test/test_user_workspace/feathr_config.yaml +++ b/feathr_project/test/test_user_workspace/feathr_config.yaml @@ -84,11 +84,11 @@ spark_config: feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.5.0.jar" databricks: # workspace instance - workspace_instance_url: 'https://adb-5638037984879289.9.azuredatabricks.net/' + workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/' workspace_token_value: '' # config string including run time information, spark version, machine size, etc. # the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs - config_template: {'run_name':'','new_cluster':{'spark_version':'9.1.x-scala2.12','node_type_id':'Standard_F4s','num_workers':2,'spark_conf':{}},'libraries':[{'jar':''}],'spark_jar_task':{'main_class_name':'','parameters':['']}} + config_template: {"run_name":"FEATHR_FILL_IN","new_cluster":{"spark_version":"9.1.x-scala2.12","num_workers":2,"spark_conf":{"FEATHR_FILL_IN":"FEATHR_FILL_IN"},"instance_pool_id":"0403-214809-inlet434-pool-l9dj3kwz"},"libraries":[{"jar":"FEATHR_FILL_IN"}],"spark_jar_task":{"main_class_name":"FEATHR_FILL_IN","parameters":["FEATHR_FILL_IN"]}} # Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/ work_dir: 'dbfs:/feathr_getting_started' # this is the default location so end users don't have to compile the runtime again. @@ -111,4 +111,10 @@ feature_registry: # delimiter indicates that how the project/workspace name, feature names etc. are delimited. By default it will be '__' # this is for global reference (mainly for feature sharing). For exmaple, when we setup a project called foo, and we have an anchor called 'taxi_driver' and the feature name is called 'f_daily_trips' # the feature will have a globally unique name called 'foo__taxi_driver__f_daily_trips' - delimiter: '__' \ No newline at end of file + delimiter: '__' + +monitoring: + database: + sql: + url: 'jdbc:postgresql://featuremonitoring.postgres.database.azure.com:5432/postgres' + user: "demo" diff --git a/feathr_project/test/test_utils/query_sql.py b/feathr_project/test/test_utils/query_sql.py new file mode 100644 index 000000000..8e14b8cda --- /dev/null +++ b/feathr_project/test/test_utils/query_sql.py @@ -0,0 +1,43 @@ +import psycopg2 +from feathr._envvariableutil import _EnvVaraibleUtil + +# script to query SQL database for debugging purpose + +def show_table(cursor, table_name): + cursor.execute("select * from " + table_name + ";") + print(cursor.fetchall()) + + q = """ + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_name = %s; + """ + + cur = conn.cursor() + cur.execute(q, (table_name,)) # (table_name,) passed as tuple + print(cur.fetchall()) + + +# Update connection string information +host = "featuremonitoring.postgres.database.azure.com" +dbname = "postgres" +user = "demo" +password = _EnvVaraibleUtil.get_environment_variable('SQL_TEST_PASSWORD') +sslmode = "require" + +# Construct connection string +conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode) +conn = psycopg2.connect(conn_string) +print("Connection established") + +cursor = conn.cursor() + +show_table(cursor, "f_int") +cursor.execute("select * from f_location_avg_fare;") +print(cursor.fetchall()) + + +# Clean up +conn.commit() +cursor.close() +conn.close() \ No newline at end of file diff --git a/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigUtils.scala b/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigUtils.scala index 0c8993e58..a97100e30 100644 --- a/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigUtils.scala @@ -35,6 +35,7 @@ object DataSourceConfigUtils { blobConfigStr = cmdParser.extractOptionalValue("blob-config"), sqlConfigStr = cmdParser.extractOptionalValue("sql-config"), snowflakeConfigStr = cmdParser.extractOptionalValue("snowflake-config"), + monitoringConfigStr = cmdParser.extractOptionalValue("monitoring-config"), kafkaConfigStr = cmdParser.extractOptionalValue("kafka-config") ) } @@ -45,6 +46,7 @@ object DataSourceConfigUtils { BlobResourceInfoSetter.setup(ss, configs.blobConfig, resource) S3ResourceInfoSetter.setup(ss, configs.s3Config, resource) SnowflakeResourceInfoSetter.setup(ss, configs.snowflakeConfig, resource) + MonitoringResourceInfoSetter.setup(ss, configs.monitoringConfig, resource) KafkaResourceInfoSetter.setup(ss, configs.kafkaConfig, resource) } diff --git a/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigs.scala b/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigs.scala index a57e84da0..b8156d991 100644 --- a/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigs.scala +++ b/src/main/scala/com/linkedin/feathr/offline/config/datasource/DataSourceConfigs.scala @@ -17,6 +17,7 @@ class DataSourceConfigs( val blobConfigStr: Option[String] = None, val sqlConfigStr: Option[String] = None, val snowflakeConfigStr: Option[String] = None, + val monitoringConfigStr: Option[String] = None, val kafkaConfigStr: Option[String] = None ) { val redisConfig: DataSourceConfig = parseConfigStr(redisConfigStr) @@ -25,6 +26,7 @@ class DataSourceConfigs( val blobConfig: DataSourceConfig = parseConfigStr(blobConfigStr) val sqlConfig: DataSourceConfig = parseConfigStr(sqlConfigStr) val snowflakeConfig: DataSourceConfig = parseConfigStr(snowflakeConfigStr) + val monitoringConfig: DataSourceConfig = parseConfigStr(monitoringConfigStr) val kafkaConfig: DataSourceConfig = parseConfigStr(kafkaConfigStr) def parseConfigStr(configStr: Option[String] = None): DataSourceConfig = { diff --git a/src/main/scala/com/linkedin/feathr/offline/config/datasource/MonitoringResourceInfoSetter.scala b/src/main/scala/com/linkedin/feathr/offline/config/datasource/MonitoringResourceInfoSetter.scala new file mode 100644 index 000000000..bdb36f3e6 --- /dev/null +++ b/src/main/scala/com/linkedin/feathr/offline/config/datasource/MonitoringResourceInfoSetter.scala @@ -0,0 +1,26 @@ +package com.linkedin.feathr.offline.config.datasource + +import org.apache.spark.sql.SparkSession + +private[feathr] class MonitoringResourceInfoSetter extends ResourceInfoSetter() { + override val params: List[String] = List() + + override def setupHadoopConfig(ss: SparkSession, context: Option[DataSourceConfig], resource: Option[Resource]): Unit = { + context.foreach(dataSourceConfig => { + ss.conf.set("monitoring_database_url", getAuthFromContext("MONITORING_DATABASE_SQL_URL", dataSourceConfig)) + ss.conf.set("monitoring_database_user", getAuthFromContext("MONITORING_DATABASE_SQL_USER", dataSourceConfig)) + ss.conf.set("monitoring_database_password", getAuthFromContext("MONITORING_DATABASE_SQL_PASSWORD", dataSourceConfig)) + }) + } + + override def getAuthFromConfig(str: String, resource: Resource): String = ??? +} + + +private[feathr] object MonitoringResourceInfoSetter{ + val monitoringSetter = new MonitoringResourceInfoSetter() + + def setup(ss: SparkSession, config: DataSourceConfig, resource: Resource): Unit ={ + monitoringSetter.setup(ss, config, resource) + } +} \ No newline at end of file diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringProcessor.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringProcessor.scala new file mode 100644 index 000000000..47d08c544 --- /dev/null +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringProcessor.scala @@ -0,0 +1,34 @@ +package com.linkedin.feathr.offline.generation.outputProcessor + +import com.linkedin.feathr.common.Header +import com.linkedin.feathr.common.configObj.generation.OutputProcessorConfig +import com.linkedin.feathr.offline.generation.FeatureGenUtils +import com.linkedin.feathr.offline.generation.outputProcessor.PushToRedisOutputProcessor.TABLE_PARAM_CONFIG_NAME +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +/** + * feature generation output processor used to generate feature monitoring stats and pushed to sink + * @param config config object of output processor, built from the feature generation config + */ + +private[offline] class FeatureMonitoringProcessor(config: OutputProcessorConfig, endTimeOpt: Option[String] = None) extends WriteToHDFSOutputProcessor(config, endTimeOpt, dataLoaderHandlers=List()) { + /** + * process single dataframe, e.g, convert feature data schema + * + * @param ss spark session + * @param df feature dataframe + * @param header meta info of the input dataframe + * @param parentPath path to save feature data + * @return processed dataframe and header + */ + override def processSingle(ss: SparkSession, df: DataFrame, header: Header, parentPath: String): (DataFrame, Header) = { + val keyColumns = FeatureGenUtils.getKeyColumnsFromHeader(header) + + val tableName = config.getParams.getString(TABLE_PARAM_CONFIG_NAME) + val allFeatureCols = header.featureInfoMap.map(x => (x._2.columnName)).toSet + + FeatureMonitoringUtils.writeToRedis(ss, df, tableName, keyColumns, allFeatureCols, SaveMode.Overwrite) + (df, header) + } +} + diff --git a/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringUtils.scala b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringUtils.scala new file mode 100644 index 000000000..95675e8a4 --- /dev/null +++ b/src/main/scala/com/linkedin/feathr/offline/generation/outputProcessor/FeatureMonitoringUtils.scala @@ -0,0 +1,126 @@ +package com.linkedin.feathr.offline.generation.outputProcessor + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +object FeatureMonitoringUtils { + def writeToRedis(ss: SparkSession, df: DataFrame, tableName: String, keyColumns: Seq[String], allFeatureCols: Set[String], saveMode: SaveMode): Unit = { + df.show(10) + + val dfSchema = df.schema + dfSchema.indices.foreach(index => { + val field = dfSchema.fields(index) + val fieldName = field.name + if (allFeatureCols.contains(fieldName)) { + field.dataType match { + case DoubleType | FloatType | IntegerType | LongType => + val missing = df.filter(col(fieldName).isNull).count() + val total = df.count() +// +------------+------------+----------+----+---+---+---+--------+ +// |feature_name|feature_type| date|mean|avg|min|max|coverage| +// +------------+------------+----------+----+---+---+---+--------+ +// | f_int| integer|2022-06-09| 0.5|0.5| 0| 1| 1.0| +// +------------+------------+----------+----+---+---+---+--------+ +// +// +------------+------------+----------+------------------+------------------+-------------------+------------------+--------+ +// |feature_name|feature_type| date| mean| avg| min| max|coverage| +// +------------+------------+----------+------------------+------------------+-------------------+------------------+--------+ +// | f_double| double|2022-06-09|0.6061345296768118|0.6061345296768118|0.13751738103840128|0.9651418273038033| 1.0| +// +------------+------------+----------+------------------+------------------+-------------------+------------------+--------+ + val stats_df = df.select( + lit(fieldName).name("feature_name"), + lit(field.dataType.typeName).name("feature_type"), + current_date().name("date"), + mean(df(fieldName)).name("mean"), + avg(df(fieldName)).name("avg"), + min(df(fieldName)).name("min"), + max(df(fieldName)).name("max"), + lit((total - missing) * 1.0 / total).name("coverage") + ) + + stats_df.show() + writeToSql(ss, stats_df, fieldName, saveMode) + case StringType | BooleanType => + // Will add support for more stats as we have more user requirements + // The difficulty with term frequency is that it requires a different table other than the scalar stats. +// val frequencyDf = df +// .select( +// lit(fieldName).name("feature_name"), +// lit(field.dataType.typeName).name("feature_type"), +// current_date(), +// col(fieldName), +// ) +// .groupBy(fieldName) +// .count() +// .select( +// col("*"), +// lit(fieldName).name("feature_name"), +// lit(field.dataType.typeName).name("feature_type"), +// current_date() +// ) +// writeToSql(frequencyDf, fieldName + "_frequency") + + val missing = df.filter(col(fieldName).isNull).count() + val total = df.count() + // cardinality is defined as the number of elements in a set or other grouping, as a property of that grouping. + val cardinality = df.groupBy(fieldName).count().count() + +// +------------+------------+----------+-----+------+--------+-----------+ +// |feature_name|feature_type| date| min| max|coverage|cardinality| +// +------------+------------+----------+-----+------+--------+-----------+ +// | f_string| string|2022-06-09|apple|orange| 0.9| 3| +// +------------+------------+----------+-----+------+--------+-----------+ +// +------------+------------+----------+-----+----+--------+-----------+ +// |feature_name|feature_type| date| min| max|coverage|cardinality| +// +------------+------------+----------+-----+----+--------+-----------+ +// | f_boolean| boolean|2022-06-09|false|true| 1.0| 2| +// +------------+------------+----------+-----+----+--------+-----------+ + val stats_df = df.select( + lit(fieldName).name("feature_name"), + lit(field.dataType.typeName).name("feature_type"), + current_date().name("date"), + min(df(fieldName)).name("min"), + max(df(fieldName)).name("max"), + lit((total - missing) * 1.0 / total).name("coverage"), + lit(cardinality).name("cardinality") + ) + + writeToSql(ss, stats_df, fieldName, saveMode) + case _ => + (rowData: Any) => { + throw new RuntimeException(f"The data type(${field.dataType}) and data (${rowData}) is not supported in monitoring yet.") + } + } + } + }) + } + + /** + * Write the feature monitoring results(usually stats) to SQL database. + */ + private def writeToSql(ss: SparkSession, stats_df: DataFrame, tableName: String, saveMode: SaveMode): Unit = { + if (!ss.sparkContext.isLocal) { + val url = ss.conf.get("monitoring_database_url") + val username = ss.conf.get("monitoring_database_user") + val password = ss.conf.get("monitoring_database_password") + + println("monitoring output:") + println("url: " + url) + println("username: " + username) + + stats_df.write + .format("jdbc") + .option("url", url) + .option("dbtable", tableName) + .option("user", username) + .option("password", password) + .option("ssl", true) + .option("sslmode", "require") + .mode(saveMode) + .save() + } else { + stats_df.show(10) + } + } +} diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala index 394bd66b3..cb9dda05d 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala @@ -47,6 +47,7 @@ object FeatureGenJob { "blob-config" -> OptionParam("bc", "Authentication config for Azure Blob Storage (wasb)", "BLOB_CONFIG", ""), "sql-config" -> OptionParam("sqlc", "Authentication config for Azure SQL Database (jdbc)", "SQL_CONFIG", ""), "snowflake-config" -> OptionParam("sfc", "Authentication config for Snowflake Database (jdbc)", "SNOWFLAKE_CONFIG", ""), + "monitoring-config" -> OptionParam("mc", "Feature monitoring related configs", "MONITORING_CONFIG", ""), "kafka-config" -> OptionParam("kc", "Authentication config for Kafka", "KAFKA_CONFIG", "") ) val extraOptions = List(new CmdOption("LOCALMODE", "local-mode", false, "Run in local mode")) @@ -65,6 +66,8 @@ object FeatureGenJob { val dataSourceConfigs = DataSourceConfigUtils.getConfigs(cmdParser) val featureGenJobContext = new FeatureGenJobContext(workDir, paramsOverride, featureConfOverride) + println("dataSourceConfigs: ") + println(dataSourceConfigs) (applicationConfigPath, featureDefinitionsInput, featureGenJobContext, dataSourceConfigs) } @@ -208,7 +211,7 @@ object FeatureGenJob { val feathrClient = FeathrClient.builder(sparkSession) .addFeatureDef(featureConfig) - .addLocalOverrideDef(localFeatureConfigWithOverride) + .addLocalOverrideDef(localFeatureConfigWithOverride) .build() val allAnchoredFeatures = feathrClient.allAnchoredFeatures diff --git a/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenSpec.scala b/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenSpec.scala index f974c0aa1..caf6e0e28 100644 --- a/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenSpec.scala +++ b/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenSpec.scala @@ -5,7 +5,7 @@ import com.linkedin.feathr.common.configObj.configbuilder.{FeatureGenConfigBuild import com.linkedin.feathr.common.configObj.generation.{FeatureGenConfig, OfflineOperationalConfig, OutputProcessorConfig} import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrDataOutputException} import com.linkedin.feathr.common.{DateParam, DateTimeParam, DateTimeUtils, RichConfig} -import com.linkedin.feathr.offline.generation.outputProcessor.{PushToRedisOutputProcessor, WriteToHDFSOutputProcessor} +import com.linkedin.feathr.offline.generation.outputProcessor.{FeatureMonitoringProcessor, PushToRedisOutputProcessor, WriteToHDFSOutputProcessor} import com.linkedin.feathr.offline.util.{FeatureGenConstants, IncrementalAggUtils} import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler import com.linkedin.feathr.sparkcommon.OutputProcessor @@ -55,6 +55,10 @@ class FeatureGenSpec(private val featureGenConfig: FeatureGenConfig, dataLoaderH val params = config.getParams val decoratedConfig = OutputProcessorBuilder.build(config.getName, params) new PushToRedisOutputProcessor(decoratedConfig, None) + case FeatureGenConstants.MONITORING_OUTPUT_PROCESSOR_NAME => + val params = config.getParams + val decoratedConfig = OutputProcessorBuilder.build(config.getName, params) + new FeatureMonitoringProcessor(decoratedConfig, None) case _ => throw new FeathrDataOutputException(ErrorLabel.FEATHR_USER_ERROR, "Custom output processor is not yet supported.") } diff --git a/src/main/scala/com/linkedin/feathr/offline/util/FeatureGenUtils.scala b/src/main/scala/com/linkedin/feathr/offline/util/FeatureGenUtils.scala index 3ccbe57f7..26339a94f 100644 --- a/src/main/scala/com/linkedin/feathr/offline/util/FeatureGenUtils.scala +++ b/src/main/scala/com/linkedin/feathr/offline/util/FeatureGenUtils.scala @@ -16,6 +16,7 @@ import java.util.TimeZone private[offline] object FeatureGenConstants { val HDFS_OUTPUT_PROCESSOR_NAME = "HDFS" val REDIS_OUTPUT_PROCESSOR_NAME = "REDIS" + val MONITORING_OUTPUT_PROCESSOR_NAME = "MONITORING" val OUTPUT_TIME_PATH = "outputTimePath" val SAVE_SCHEMA_META = "saveSchemaMeta" val WORK_DIR = "workDir" diff --git a/src/test/resources/mockdata/feature_monitoring_mock_data/feature_monitoring_data.csv b/src/test/resources/mockdata/feature_monitoring_mock_data/feature_monitoring_data.csv new file mode 100644 index 000000000..faf4f804f --- /dev/null +++ b/src/test/resources/mockdata/feature_monitoring_mock_data/feature_monitoring_data.csv @@ -0,0 +1,11 @@ +user_id,value1,value2,value3,value4,value_string,value_boolean +1,1,2,3,4,apple,true +2,1,2,3,4,apple,false +3,1,2,3,4,,false +4,1,2,3,4,orange,false +5,1,2,3,4,orange,false +6,1,2,3,4,orange,false +7,1,2,3,4,orange,false +8,1,2,3,4,orange,false +9,1,2,3,4,orange,false +10,1,2,3,4,orange,true \ No newline at end of file diff --git a/src/test/scala/com/linkedin/feathr/offline/FeatureMonitoringIntegTest.scala b/src/test/scala/com/linkedin/feathr/offline/FeatureMonitoringIntegTest.scala new file mode 100644 index 000000000..d901665f2 --- /dev/null +++ b/src/test/scala/com/linkedin/feathr/offline/FeatureMonitoringIntegTest.scala @@ -0,0 +1,105 @@ +package com.linkedin.feathr.offline + +import org.testng.annotations.Test + +/** + * Integration tests to test feature monitoring APIs in feathr offline. + */ +class FeatureMonitoringIntegTest extends FeathrIntegTest { + /** + * Test scalar features + */ + @Test(enabled = true) + def testFeatureGenWithApplicationConfig(): Unit = { + val applicationConfig = + s""" + | operational: { + | name: generateWithDefaultParams + | endTime: 2021-01-02 + | endTimeFormat: "yyyy-MM-dd" + | resolution: DAILY + | output:[ + | { + | name: MONITORING + | params: { + | table_name: "monitoringFeatures" + | } + | } + | ] + |} + |features: [f_string, f_int, f_null, f_double, f_null, f_boolean + |] + """.stripMargin + val featureDefConfig = + """ + |anchors: { + | anchor: { + | source: featureMonitoringSource + | key: user_id + | features: { + | f_string: { + | def: "value_string" + | type : { + | type: TENSOR + | tensorCategory: DENSE + | dimensionType: [] + | valType: STRING + | } + | } + | f_int: { + | def: "import java.util.Random; Random random = new Random(); random.nextInt(2)" + | type : { + | type: TENSOR + | tensorCategory: DENSE + | dimensionType: [] + | valType: INT + | } + | } + | f_null: { + | def: "null" + | type : { + | type: TENSOR + | tensorCategory: DENSE + | dimensionType: [] + | valType: DOUBLE + | } + | } + | f_double: { + | def: "import java.util.Random; Random random = new Random(); random.nextDouble()" + | type : { + | type: TENSOR + | tensorCategory: DENSE + | dimensionType: [] + | valType: DOUBLE + | } + | } + | f_boolean: { + | def: "Boolean.valueOf(value_boolean)" + | type : { + | type: TENSOR + | tensorCategory: DENSE + | dimensionType: [] + | valType: BOOLEAN + | } + | } + | } + | } + |} + | + |derivations: { + | f_derived: { + | definition: "f_double * f_double" + | type: NUMERIC + | } + |} + |sources: { + | featureMonitoringSource: { + | location: { path: "/feature_monitoring_mock_data/feature_monitoring_data.csv" } + | } + |} + |""".stripMargin + + val res = localFeatureGenerate(applicationConfig, featureDefConfig) + res.head._2.data.show(100) + } +}