Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job_tag to materialization job submission. #890

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from feathr.definition.monitoring_settings import MonitoringSettings
from feathr.definition.query_feature_list import FeatureQuery
from feathr.definition.settings import ObservationSettings
from feathr.definition.sink import Sink
from feathr.definition.sink import Sink, HdfsSink
from feathr.protobuf.featureValue_pb2 import FeatureValue
from feathr.spark_provider._databricks_submission import _FeathrDatabricksJobLauncher
from feathr.spark_provider._localspark_submission import _FeathrLocalSparkJobLauncher
Expand Down Expand Up @@ -191,7 +191,7 @@ def __init__(self, config_path:str = "./feathr_config.yaml", local_workspace_dir
else:
# no registry configured
logger.info("Feathr registry is not configured. Consider setting the Feathr registry component for richer feature store experience.")

logger.info(f"Feathr client {get_version()} initialized successfully.")

def _check_required_environment_variables_exist(self):
Expand Down Expand Up @@ -259,7 +259,7 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)

def get_snowflake_path(self, database: str, schema: str, dbtable: str = None, query: str = None) -> str:
"""
Returns snowflake path given dataset location information.
Expand Down Expand Up @@ -518,7 +518,7 @@ def _get_offline_features_with_config(self,
observation_path=feathr_feature['observationPath'],
feature_config=os.path.join(self.local_workspace_dir, 'feature_conf/'),
job_output_path=output_path)
job_tags = {OUTPUT_PATH_TAG:feature_join_job_params.job_output_path}
job_tags = { OUTPUT_PATH_TAG: feature_join_job_params.job_output_path }
# set output format in job tags if it's set by user, so that it can be used to parse the job result in the helper function
if execution_configurations is not None and OUTPUT_FORMAT in execution_configurations:
job_tags[OUTPUT_FORMAT] = execution_configurations[OUTPUT_FORMAT]
Expand Down Expand Up @@ -679,11 +679,16 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf
if feature.name == fn and not isinstance(feature.transform, WindowAggTransformation):
raise RuntimeError(f"Feature {fn} is not an aggregation feature. Currently Feathr only supports materializing aggregation features. If you want to materialize {fn}, please set allow_materialize_non_agg_feature to True.")

# Collect secrets from sinks
# Collect secrets from sinks. Get output_path as well if the sink is offline sink (HdfsSink) for later use.
secrets = []
output_path = None
for sink in settings.sinks:
if hasattr(sink, "get_required_properties"):
secrets.extend(sink.get_required_properties())
if isinstance(sink, HdfsSink):
# Note, for now we only cache one output path from one of HdfsSinks (if one passed multiple sinks).
output_path = sink.output_path

results = []
# produce materialization config
for end in settings.get_backfill_cutoff_time():
Expand All @@ -703,7 +708,13 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf

udf_files = _PreprocessingPyudfManager.prepare_pyspark_udf_files(settings.feature_names, self.local_workspace_dir)
# CLI will directly call this so the experience won't be broken
result = self._materialize_features_with_config(config_file_path, execution_configurations, udf_files, secrets)
result = self._materialize_features_with_config(
feature_gen_conf_path=config_file_path,
execution_configurations=execution_configurations,
udf_files=udf_files,
secrets=secrets,
output_path=output_path,
)
if os.path.exists(config_file_path) and self.spark_runtime != 'local':
os.remove(config_file_path)
results.append(result)
Expand All @@ -714,19 +725,37 @@ def materialize_features(self, settings: MaterializationSettings, execution_conf

return results

def _materialize_features_with_config(self, feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',execution_configurations: Dict[str,str] = {}, udf_files=[], secrets=[]):
def _materialize_features_with_config(
self,
feature_gen_conf_path: str = 'feature_gen_conf/feature_gen.conf',
execution_configurations: Dict[str,str] = {},
udf_files: List = [],
secrets: List = [],
output_path: str = None,
):
"""Materializes feature data based on the feature generation config. The feature
data will be materialized to the destination specified in the feature generation config.

Args
feature_gen_conf_path: Relative path to the feature generation config you want to materialize.
feature_gen_conf_path: Relative path to the feature generation config you want to materialize.
execution_configurations: Spark job execution configurations.
udf_files: UDF files.
secrets: Secrets to access sinks.
output_path: The output path of the materialized features when using an offline sink.
"""
cloud_udf_paths = [self.feathr_spark_launcher.upload_or_get_cloud_path(udf_local_path) for udf_local_path in udf_files]

# Read all features conf
generation_config = FeatureGenerationJobParams(
generation_config_path=os.path.abspath(feature_gen_conf_path),
feature_config=os.path.join(self.local_workspace_dir, "feature_conf/"))

job_tags = { OUTPUT_PATH_TAG: output_path }
# set output format in job tags if it's set by user, so that it can be used to parse the job result in the helper function
if execution_configurations is not None and OUTPUT_FORMAT in execution_configurations:
job_tags[OUTPUT_FORMAT] = execution_configurations[OUTPUT_FORMAT]
else:
job_tags[OUTPUT_FORMAT] = "avro"
'''
- Job tags are for job metadata and it's not passed to the actual spark job (i.e. not visible to spark job), more like a platform related thing that Feathr want to add (currently job tags only have job output URL and job output format, ). They are carried over with the job and is visible to every Feathr client. Think this more like some customized metadata for the job which would be weird to be put in the spark job itself.
- Job arguments (or sometimes called job parameters)are the arguments which are command line arguments passed into the actual spark job. This is usually highly related with the spark job. In Feathr it's like the input to the scala spark CLI. They are usually not spark specific (for example if we want to specify the location of the feature files, or want to
Expand All @@ -752,14 +781,14 @@ def _materialize_features_with_config(self, feature_gen_conf_path: str = 'featur
job_name=self.project_name + '_feathr_feature_materialization_job',
main_jar_path=self._FEATHR_JOB_JAR_PATH,
python_files=cloud_udf_paths,
job_tags=job_tags,
main_class_name=GEN_CLASS_NAME,
arguments=arguments,
reference_files_path=[],
configuration=execution_configurations,
properties=self._collect_secrets(secrets)
)


def wait_job_to_finish(self, timeout_sec: int = 300):
"""Waits for the job to finish in a blocking way unless it times out
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def get_job_tags(self) -> Dict[str, str]:
result = RunsApi(self.api_client).get_run(self.res_job_id)

if "new_cluster" in result["cluster_spec"]:
custom_tags = result["cluster_spec"]["new_cluster"]["custom_tags"]
custom_tags = result["cluster_spec"]["new_cluster"].get("custom_tags")
return custom_tags
else:
# this is not a new cluster; it's an existing cluster.
Expand Down
4 changes: 2 additions & 2 deletions feathr_project/feathr/utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ def get_result_spark_df(
def get_result_df(
client: FeathrClient,
data_format: str = None,
format: str = None,
res_url: str = None,
local_cache_path: str = None,
spark: SparkSession = None,
format: str = None,
) -> Union[DataFrame, pd.DataFrame]:
"""Download the job result dataset from cloud as a Spark DataFrame or pandas DataFrame.

Args:
client: Feathr client
data_format: Format to read the downloaded files. Currently support `parquet`, `delta`, `avro`, and `csv`.
Default to use client's job tags if exists.
format: An alias for `data_format` (for backward compatibility).
res_url: Result URL to download files from. Note that this will not block the job so you need to make sure
the job is finished and the result URL contains actual data. Default to use client's job tags if exists.
local_cache_path (optional): Specify the absolute download directory. if the user does not provide this,
the function will create a temporary directory.
spark (optional): Spark session. If provided, the function returns spark Dataframe.
Otherwise, it returns pd.DataFrame.
format: An alias for `data_format` (for backward compatibility).

Returns:
Either Spark or pandas DataFrame.
Expand Down
12 changes: 6 additions & 6 deletions feathr_project/test/test_azure_spark_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_feathr_materialize_to_offline():

backfill_time = BackfillTime(start=datetime(
2020, 5, 20), end=datetime(2020, 5, 20), step=timedelta(days=1))

now = datetime.now()
if client.spark_runtime == 'databricks':
output_path = ''.join(['dbfs:/feathrazure_cijob_materialize_offline_','_', str(now.minute), '_', str(now.second), ""])
Expand All @@ -55,7 +55,7 @@ def test_feathr_materialize_to_offline():

# download result and just assert the returned result is not empty
# by default, it will write to a folder appended with date
res_df = get_result_df(client, "avro", output_path + "/df0/daily/2020/05/20")
res_df = get_result_df(client, data_format="avro", res_url=output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0

def test_feathr_online_store_agg_features():
Expand Down Expand Up @@ -411,7 +411,7 @@ def test_feathr_materialize_with_time_partition_pattern():
output_path = 'dbfs:/timePartitionPattern_test'
else:
output_path = 'abfss://feathrazuretest3fs@feathrazuretest3storage.dfs.core.windows.net/timePartitionPattern_test'

offline_sink = HdfsSink(output_path=output_path)
settings = MaterializationSettings("nycTaxiTable",
sinks=[offline_sink],
Expand All @@ -426,7 +426,7 @@ def test_feathr_materialize_with_time_partition_pattern():
# by default, it will write to a folder appended with date
res_df = get_result_df(client_producer, "avro", output_path + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0

client_consumer: FeathrClient = time_partition_pattern_test_setup(os.path.join(test_workspace_dir, "feathr_config.yaml"), output_path+'/df0/daily')

backfill_time_tpp = BackfillTime(start=datetime(
Expand All @@ -451,8 +451,8 @@ def test_feathr_materialize_with_time_partition_pattern():
# by default, it will write to a folder appended with date
res_df = get_result_df(client_consumer, "avro", output_path_tpp + "/df0/daily/2020/05/20")
assert res_df.shape[0] > 0


if __name__ == "__main__":
test_feathr_materialize_to_aerospike()
test_feathr_get_offline_features_to_sql()
Expand Down