Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support 'enabled' configurations for offline stores #545

Merged
merged 7 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/samples/fraud_detection_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
" required_environment_variables:\n",
" - 'REDIS_PASSWORD'\n",
"offline_store:\n",
"# Please set 'enabled' flags as true (false by default) if any of items under the same paths are expected to be visited\n",
" adls:\n",
" adls_enabled: true\n",
" wasb:\n",
Expand All @@ -225,6 +226,7 @@
" jdbc_database: 'feathrtestdb'\n",
" jdbc_table: 'feathrtesttable'\n",
" snowflake:\n",
" snowflake_enabled: true\n",
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
Expand Down
2 changes: 2 additions & 0 deletions docs/samples/product_recommendation_demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@
" required_environment_variables:\n",
" - 'REDIS_PASSWORD'\n",
"offline_store:\n",
"# Please set 'enabled' flags as true (false by default) if any of items under the same paths are expected to be visited\n",
" adls:\n",
" adls_enabled: true\n",
" wasb:\n",
Expand All @@ -220,6 +221,7 @@
" jdbc_database: 'feathrtestdb'\n",
" jdbc_table: 'feathrtesttable'\n",
" snowflake:\n",
" snowflake_enabled: true\n",
" url: \"dqllago-ol19457.snowflakecomputing.com\"\n",
" user: \"feathrintegration\"\n",
" role: \"ACCOUNTADMIN\"\n",
Expand Down
56 changes: 40 additions & 16 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,24 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
self.redis_ssl_enabled = self.envutils.get_environment_variable_with_default(
'online_store', 'redis', 'ssl_enabled')

# Offline store enabled configs; false by default
self.s3_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_enabled')
self.adls_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'adls', 'adls_enabled')
self.wasb_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'wasb', 'wasb_enabled')
self.jdbc_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'jdbc', 'jdbc_enabled')
self.snowflake_enabled = self.envutils.get_environment_variable_with_default(
'offline_store', 'snowflake', 'snowflake_enabled')
if not (self.s3_enabled or self.adls_enabled or self.wasb_enabled or self.jdbc_enabled or self.snowflake_enabled):
self.logger.warning("No offline storage enabled.")

# S3 configs
self.s3_endpoint = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_endpoint')
if self.s3_enabled:
self.s3_endpoint = self.envutils.get_environment_variable_with_default(
'offline_store', 's3', 's3_endpoint')

# spark configs
self.output_num_parts = self.envutils.get_environment_variable_with_default(
Expand Down Expand Up @@ -502,25 +517,39 @@ def _get_offline_features_with_config(self, feature_join_conf_path='feature_join
python_files=cloud_udf_paths,
job_tags=job_tags,
main_class_name='com.linkedin.feathr.offline.job.FeatureJoinJob',
arguments=[
arguments= [
'--join-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
feature_join_job_params.join_config_path),
'--input', feature_join_job_params.observation_path,
'--output', feature_join_job_params.job_output_path,
'--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
feature_join_job_params.feature_config),
'--num-parts', self.output_num_parts,
'--s3-config', self._get_s3_config_str(),
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str()
],
'--num-parts', self.output_num_parts
]+self._get_offline_storage_arguments(),
reference_files_path=[],
configuration=execution_configurations,
properties=self._get_system_properties()
)

def _get_offline_storage_arguments(self):
arguments = []
if self.s3_enabled:
arguments.append('--s3-config')
arguments.append(self._get_s3_config_str())
if self.adls_enabled:
arguments.append('--adls-config')
arguments.append(self._get_adls_config_str())
if self.wasb_enabled:
arguments.append('--blob-config')
arguments.append(self._get_blob_config_str())
if self.jdbc_enabled:
arguments.append('--sql-config')
arguments.append(self._get_sql_config_str())
if self.snowflake_enabled:
arguments.append('--snowflake-config')
arguments.append(self._get_snowflake_config_str())
return arguments

def get_job_result_uri(self, block=True, timeout_sec=300) -> str:
"""Gets the job output URI
"""
Expand Down Expand Up @@ -617,12 +646,7 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur
'--feature-config', self.feathr_spark_launcher.upload_or_get_cloud_path(
generation_config.feature_config),
'--redis-config', self._getRedisConfigStr(),
'--s3-config', self._get_s3_config_str(),
'--adls-config', self._get_adls_config_str(),
'--blob-config', self._get_blob_config_str(),
'--sql-config', self._get_sql_config_str(),
'--snowflake-config', self._get_snowflake_config_str(),
] + optional_params
] + self._get_offline_storage_arguments()+optional_params
monitoring_config_str = self._get_monitoring_config_str()
if monitoring_config_str:
arguments.append('--monitoring-config')
Expand Down
1 change: 1 addition & 0 deletions feathr_project/test/test_user_workspace/feathr_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ offline_store:

# snowflake endpoint
snowflake:
snowflake_enabled: true
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ offline_store:

# snowflake endpoint
snowflake:
snowflake_enabled: true
url: "dqllago-ol19457.snowflakecomputing.com"
user: "feathrintegration"
role: "ACCOUNTADMIN"
Expand Down