Skip to content

Commit

Permalink
[HOPSWORKS-3323] Fix TDS creation in PySpark client and add excplicit…
Browse files Browse the repository at this point in the history
… caching (logicalclocks#784)

* fix create training dataset in pyspark, added caching

* fixed stylecheck

* fixed stylecheck

* revert caching for non-split dfs

(cherry picked from commit 2c6864d)
  • Loading branch information
tdoehmen authored and kennethmhc committed Sep 22, 2022
1 parent 1695657 commit d878bd1
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ def write_training_dataset(
split_dataset = dict(
[(split[0], split_dataset[i]) for i, split in enumerate(splits)]
)
for key in split_dataset:
if training_dataset.coalesce:
split_dataset[key] = split_dataset[key].coalesce(1)

split_dataset[key] = split_dataset[key].cache()

transformation_function_engine.TransformationFunctionEngine.populate_builtin_transformation_functions(
training_dataset, feature_view_obj, split_dataset
)
Expand Down Expand Up @@ -481,6 +487,8 @@ def _write_training_dataset_single(
save_mode
).save(path)

feature_dataframe.unpersist()

def read(self, storage_connector, data_format, read_options, location):
if isinstance(location, str):
if data_format.lower() in ["delta", "parquet", "hudi", "orc", "bigquery"]:
Expand Down

0 comments on commit d878bd1

Please sign in to comment.