From 8b582d360d70198f0a48987c5df11fc7b3789aff Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Sun, 26 Jun 2022 21:58:19 +0530 Subject: [PATCH 1/4] added local_folder param to get_result_df (#394) --- feathr_project/feathr/utils/job_utils.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 4ac1a7a88..01b0f9ff6 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -7,7 +7,7 @@ import tempfile -def get_result_df(client: FeathrClient, format: str = None, res_url: str = None) -> pd.DataFrame: +def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, local_folder: str = None) -> pd.DataFrame: """Download the job result dataset from cloud as a Pandas dataframe. format: format override, could be "parquet", "delta", etc. @@ -15,20 +15,26 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None) """ res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200) format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") - tmp_dir = tempfile.TemporaryDirectory() - client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=tmp_dir.name) + # if local_folder params is not provided then create a temporary folder + if local_folder is not None: + local_dir_path = os.getcwd() + local_folder + else: + tmp_dir = tempfile.TemporaryDirectory() + local_dir_path = tmp_dir.name + + client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=local_dir_path) dataframe_list = [] # by default the result are in avro format if format: # helper function for only parquet and avro if format.casefold()=="parquet": - files = glob.glob(os.path.join(tmp_dir.name, '*.parquet')) + files = glob.glob(os.path.join(local_dir_path, '*.parquet')) from pyarrow.parquet import ParquetDataset ds = ParquetDataset(files) result_df = ds.read().to_pandas() elif format.casefold()=="delta": from deltalake import DeltaTable - delta = DeltaTable(tmp_dir.name) + delta = DeltaTable(local_dir_path) if not client.spark_runtime == 'azure_synapse': # don't detect for synapse result with Delta as there's a problem with underlying system # Issues are trached here: https://github.com/delta-io/delta-rs/issues/582 @@ -38,14 +44,15 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None) result_df = pd.DataFrame() elif format.casefold()=="avro": import pandavro as pdx - for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')): + for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) else: # by default use avro import pandavro as pdx - for file in glob.glob(os.path.join(tmp_dir.name, '*.avro')): + for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) - tmp_dir.cleanup() + if tmp_dir is not None: + tmp_dir.cleanup() return result_df \ No newline at end of file From b38a46648a60f9093259ca7d302f045b44e29e81 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 01:39:32 +0530 Subject: [PATCH 2/4] replaced temp_dir with local_folder to fix undefined variable error --- feathr_project/feathr/utils/job_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index 01b0f9ff6..bbb7e72e2 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -53,6 +53,6 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, for file in glob.glob(os.path.join(local_dir_path, '*.avro')): dataframe_list.append(pdx.read_avro(file)) result_df = pd.concat(dataframe_list, axis=0) - if tmp_dir is not None: + if local_folder is None: tmp_dir.cleanup() return result_df \ No newline at end of file From 64c253e1e656b0b8b378cc28bf6467de1d9747d6 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 04:57:34 +0000 Subject: [PATCH 3/4] added docstring and changed local_folder to absolute path --- feathr_project/feathr/utils/job_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index bbb7e72e2..e42c915b0 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -12,12 +12,13 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, format: format override, could be "parquet", "delta", etc. res_url: output URL to download files. Note that this will not block the job so you need to make sure the job is finished and result URL contains actual data. + local_folder: optional parameter to specify the absolute download path. if the user does not provide this, function will create a temporary directory and delete it after reading the dataframe. """ res_url: str = res_url or client.get_job_result_uri(block=True, timeout_sec=1200) format: str = format or client.get_job_tags().get(OUTPUT_FORMAT, "") # if local_folder params is not provided then create a temporary folder if local_folder is not None: - local_dir_path = os.getcwd() + local_folder + local_dir_path = local_folder else: tmp_dir = tempfile.TemporaryDirectory() local_dir_path = tmp_dir.name From cfe2fb37d1c93692a16351c2fd8d947cc2f69f54 Mon Sep 17 00:00:00 2001 From: Shivam Sanju Date: Mon, 27 Jun 2022 09:19:49 +0000 Subject: [PATCH 4/4] changed laucher too launcher --- feathr_project/feathr/utils/job_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feathr_project/feathr/utils/job_utils.py b/feathr_project/feathr/utils/job_utils.py index e42c915b0..32e0b8e5a 100644 --- a/feathr_project/feathr/utils/job_utils.py +++ b/feathr_project/feathr/utils/job_utils.py @@ -23,7 +23,7 @@ def get_result_df(client: FeathrClient, format: str = None, res_url: str = None, tmp_dir = tempfile.TemporaryDirectory() local_dir_path = tmp_dir.name - client.feathr_spark_laucher.download_result(result_path=res_url, local_folder=local_dir_path) + client.feathr_spark_launcher.download_result(result_path=res_url, local_folder=local_dir_path) dataframe_list = [] # by default the result are in avro format if format: