Skip to content

Commit

Permalink
Support 'enabled' configurations for offline stores (#545)
Browse files Browse the repository at this point in the history
For offline stores, S3/ADLS/WASB/JDBC/Snowflake, if any of its 'enabled' config variables are set to 'false' or undefined their other related environment variables will not be visited.
  • Loading branch information
enya-yx committed Aug 24, 2022
1 parent 3e61f3b commit 31414fe
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/samples/fraud_detection_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
" required_environment_variables:\n",
" - 'REDIS_PASSWORD'\n",
"offline_store:\n",
"# Please set 'enabled' flags as true (false by default) if any of items under the same paths are expected to be visited\n",
" adls:\n",
" adls_enabled: true\n",
" wasb:\n",
Expand All @@ -225,6 +226,7 @@
" jdbc_database: 'feathrtestdb'\n",
" jdbc_table: 'feathrtesttable'\n",
" snowflake:\n",
" snowflake_enabled: true\n",
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
Expand Down
2 changes: 2 additions & 0 deletions docs/samples/product_recommendation_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
" required_environment_variables:\n",
" - 'REDIS_PASSWORD'\n",
"offline_store:\n",
"# Please set 'enabled' flags as true (false by default) if any of items under the same paths are expected to be visited\n",
" adls:\n",
" adls_enabled: true\n",
" wasb:\n",
Expand All @@ -220,6 +221,7 @@
" jdbc_database: 'feathrtestdb'\n",
" jdbc_table: 'feathrtesttable'\n",
" snowflake:\n",
" snowflake_enabled: true\n",
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
Expand Down
56 changes: 40 additions & 16 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,24 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
self.redis_ssl_enabled = self.envutils.get_environment_variable_with_default(
'online_store', 'redis', 'ssl_enabled')

# Offline store enabled configs; false by default
self.s3_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_enabled')
self.adls_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'adls', 'adls_enabled')
self.wasb_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'wasb', 'wasb_enabled')
self.jdbc_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'jdbc', 'jdbc_enabled')
self.snowflake_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'snowflake', 'snowflake_enabled')
if not (self.s3_enabled or self.adls_enabled or self.wasb_enabled or self.jdbc_enabled or self.snowflake_enabled):
self.logger.warning("No offline storage enabled.")

# S3 configs
self.s3_endpoint = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_endpoint')
if self.s3_enabled:
self.s3_endpoint = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_endpoint')

# spark configs
self.output_num_parts = self.envutils.get_environment_variable_with_default(
Expand Down Expand Up @@ -502,25 +517,39 @@ def _get_offline_features_with_config(self, feature_join_conf_path='feature_join
python_files=cloud_udf_paths,
job_tags=job_tags,
main_class_name='com.linkedin.feathr.offline.job.FeatureJoinJob',
arguments=[
arguments= [
'--join-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
feature_join_job_params.join_config_path),
'--input', feature_join_job_params.observation_path,
'--output', feature_join_job_params.job_output_path,
'--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
feature_join_job_params.feature_config),
'--num-parts', self.output_num_parts,
'--s3-config', self._get_s3_config_str(),
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str()
],
'--num-parts', self.output_num_parts
]+self._get_offline_storage_arguments(),
reference_files_path=[],
configuration=execution_configurations,
properties=self._get_system_properties()
)

def _get_offline_storage_arguments(self):
arguments = []
if self.s3_enabled:
arguments.append('--s3-config')
arguments.append(self._get_s3_config_str())
if self.adls_enabled:
arguments.append('--adls-config')
arguments.append(self._get_adls_config_str())
if self.wasb_enabled:
arguments.append('--blob-config')
arguments.append(self._get_blob_config_str())
if self.jdbc_enabled:
arguments.append('--sql-config')
arguments.append(self._get_sql_config_str())
if self.snowflake_enabled:
arguments.append('--snowflake-config')
arguments.append(self._get_snowflake_config_str())
return arguments

def get_job_result_uri(self, block=True, timeout_sec=300) -> str:
"""Gets the job output URI
"""
Expand Down Expand Up @@ -617,12 +646,7 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur
'--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
generation_config.feature_config),
'--redis-config', self._getRedisConfigStr(),
'--s3-config', self._get_s3_config_str(),
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str(),
] + optional_params
] + self._get_offline_storage_arguments()+optional_params
monitoring_config_str = self._get_monitoring_config_str()
if monitoring_config_str:
arguments.append('--monitoring-config')
Expand Down
1 change: 1 addition & 0 deletions feathr_project/test/test_user_workspace/feathr_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ offline_store:

# snowflake endpoint
snowflake:
snowflake_enabled: true
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ offline_store:

# snowflake endpoint
snowflake:
snowflake_enabled: true
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"
Expand Down

0 comments on commit 31414fe

Please sign in to comment.