Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Dataset.from_spark #5701

Merged
merged 31 commits into from
Apr 26, 2023
Merged

Add Dataset.from_spark #5701

merged 31 commits into from
Apr 26, 2023

Conversation

maddiedawson
Copy link
Contributor

@maddiedawson maddiedawson commented Apr 3, 2023

Adds static method Dataset.from_spark to create datasets from Spark DataFrames.

This approach alleviates users of the need to materialize their dataframe---a common use case is that the user loads their dataset into a dataframe, uses Spark to apply some transformation to some of the columns, and then wants to train on the dataset.

Related issue: #5678

@HuggingFaceDocBuilderDev
Copy link

HuggingFaceDocBuilderDev commented Apr 4, 2023

The documentation is not available anymore as the PR was closed or merged.

src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/packaged_modules/spark/spark.py Outdated Show resolved Hide resolved
@maddiedawson maddiedawson requested a review from lu-wang-dl April 6, 2023 17:54
@maddiedawson maddiedawson changed the title [WIP] Add Dataset.from_spark Add Dataset.from_spark Apr 6, 2023
@maddiedawson maddiedawson marked this pull request as draft April 6, 2023 23:49
@maddiedawson maddiedawson marked this pull request as ready for review April 6, 2023 23:50
@maddiedawson
Copy link
Contributor Author

@mariosasko Would you or another HF datasets maintainer be able to review this, please?

Copy link
Member

@albertvillanova albertvillanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work done, @maddiedawson.

Please note there was a previous PR adding this functionality:

However the addition of this conversion was rejected by @lhoestq: see comment #5401 (comment)

Thinking more about it I don't really see how those two methods help in practice, since one can already do datasets <-> pandas <-> spark and those two methods don't add value over this.

@lhoestq
Copy link
Member

lhoestq commented Apr 7, 2023

Amazing ! Great job @maddiedawson

Do you know if it's possible to also support writing to Parquet using the HF ParquetWriter if file_format="parquet" ?

Parquet is often used when people want to stream the data to train models - which is suitable for big datasets. On the other hand Arrow is generally used for local memory mapping with random access.

Please note there was a previous PR adding this functionality

Am I right to say that it uses the spark workers to prepare the Arrow files ? If so this should make the data preparation fast and won't fill up the executor's memory as in the previously proposed PR

@maddiedawson
Copy link
Contributor Author

Thanks for taking a look! Unlike the previous PR's approach, this implementation takes advantage of Spark mapping to distribute file writing over multiple tasks. (Also it doesn't load the entire dataset into memory :) )

Supporting Parquet here sgtm; I'll modify the PR.

I also updated the PR description with a common Spark-HF use case that we want to improve.

@maddiedawson
Copy link
Contributor Author

Hey @albertvillanova @lhoestq , would one of you be able to re-review please? Thank you!

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job with this, I left a few comment :)

src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/io/spark.py Outdated Show resolved Hide resolved
tests/test_arrow_dataset.py Outdated Show resolved Hide resolved
tests/test_arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/io/spark.py Outdated Show resolved Hide resolved
("3", 3, 3.0),
]
df = spark.createDataFrame(data, "col_1: string, col_2: int, col_3: float")
dataset = Dataset.from_spark(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test to make sure that calling from_spark twice in a row with two different DataFrames results in two different Dataset objects ?

The DatasetBuilder uses a cache_dir with subdirectories defined after the name of the builder ("spark") and the arguments of the dataset to generate. Here since the **kwargs passed to DatasetBuilder.__init__ won't change between two from_spark, it may reuse the same cache and return the same dataset.

If this is the case we can add a df_id parameter to the SparkConfig, which can be set to to be the python id of the Spark DataFrame for example (or any identifier of the Spark DataFrame).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified the DatasetBuilder to cache based on the dataframe's semanticHash and added a test for it. This isn't perfect; e.g if a user generates a dataset from query "SELECT * FROM T", adds rows to the table, and then regenerates the dataset, the semantic hash for the dataframe will be the same, so the user will get the stale data. However, they can pass the force_download=True option to from_spark to force redownloading. Wdyt?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good ! (I was off the last few days, sorry for the late reply)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question: could someone be able to reuse the cached dataset if they call .from_spark using the same DataFrame but on different sessions ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately no, the same df will generate a different a semantic hash in different Spark sessions.

@maddiedawson
Copy link
Contributor Author

@lhoestq this is ready for another pass! Thanks so much 🙏

@maddiedawson
Copy link
Contributor Author

Friendly ping @lhoestq , also cc @polinaeterna who may be able to help take a look?

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me :) cc @albertvillanova as well

I added my final comments/questions:

src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Outdated Show resolved Hide resolved
src/datasets/arrow_dataset.py Show resolved Hide resolved
src/datasets/packaged_modules/__init__.py Outdated Show resolved Hide resolved
setup.py Outdated Show resolved Hide resolved
("3", 3, 3.0),
]
df = spark.createDataFrame(data, "col_1: string, col_2: int, col_3: float")
dataset = Dataset.from_spark(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question: could someone be able to reuse the cached dataset if they call .from_spark using the same DataFrame but on different sessions ?

@maddiedawson maddiedawson requested a review from lhoestq April 19, 2023 18:42
@lhoestq
Copy link
Member

lhoestq commented Apr 20, 2023

Merging main into this branch should fix the CI

@maddiedawson
Copy link
Contributor Author

Just rebased @lhoestq

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome ! There's just a test that needs to be fixed before merging

I'd also be interested in having someone else review it and then we're good to merge :)

@maddiedawson
Copy link
Contributor Author

Thanks @lhoestq ! Is there a way for me to trigger the github workflow myself to triage the test failure? I'm not able to repro the test failures locally.

@maddiedawson
Copy link
Contributor Author

There were two test issues in the workflow that I wasn't able to reproduce locally:

  • Python 3.7: createDataFrame fails due to a pickling error. I modified the tests to instead write and read from json files
  • Python 3.10: A worker crashes for unknown reasons. I modified the spark setup to explicitly specify local mode in case it was trying to do something else; let's see if that fixes the issue

@maddiedawson
Copy link
Contributor Author

Also one more question @lhoestq when is the next datasets release? We're hoping this can make it in

@lhoestq
Copy link
Member

lhoestq commented Apr 21, 2023

I just re-ran the CI.
I think we can do a release right after this PR is merged ;)

@maddiedawson
Copy link
Contributor Author

The remaining CI issues have been addressed! They were

  • dill=0.3.1.1 is incompatible with cloudpickle, used by Spark. The min-dependency tests use this dill version, and those were failing. I added a skip-test annotation to skip Spark tests when using this dill version. This shouldn't be a production issue since if users are using that version of dill, they won't really be able to do anything with Spark anyway.
  • One of the Spark APIs used in this feature (mapInArrow) is incompatible with Windows. I filed a Spark ticket for the team to investigate. For the tests, I added another annotation to skip Spark tests on Windows. In the next PR (adding streaming mode), we should be able to support Windows since that won't use mapInArrow.

I ran the CI on my forked branch: maddiedawson#2 Everything passes except one instance of tests/test_metric_common.py::LocalMetricTest::test_load_metric_frugalscore; it looks like a flake.

@lhoestq granted that the CI passes here, is this ok to merge and release? We'd like to put out a blog post tomorrow to broadcast this to Spark users!

@maddiedawson
Copy link
Contributor Author

Thanks @lhoestq ! Could you help take a look at the error please? Seems unrelated...

FAILED tests/test_arrow_dataset.py::BaseDatasetTest::test_map_multiprocessing_on_disk - NotADirectoryError: [WinError 267] The directory name is invalid: 'C:\Users\RUNNER~1\AppData\Local\Temp\tmptfnrdj4x\cache-5c5687cf5629c97a_00000_of_00002.arrow'
===== 1 failed, 2152 passed, 23 skipped, 20 warnings in 461.68s (0:07:41) =====

@maddiedawson
Copy link
Contributor Author

The blog is live btw! https://www.databricks.com/blog/contributing-spark-loader-for-hugging-face-datasets Hopefully there can be a release today?

@lhoestq lhoestq merged commit ea251c7 into huggingface:main Apr 26, 2023
@github-actions
Copy link

Show benchmarks

PyArrow==8.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.012686 / 0.011353 (0.001333) 0.006051 / 0.011008 (-0.004957) 0.123057 / 0.038508 (0.084549) 0.033238 / 0.023109 (0.010128) 0.388207 / 0.275898 (0.112309) 0.393972 / 0.323480 (0.070492) 0.006645 / 0.007986 (-0.001340) 0.006715 / 0.004328 (0.002386) 0.098348 / 0.004250 (0.094097) 0.041410 / 0.037052 (0.004358) 0.380123 / 0.258489 (0.121634) 0.427982 / 0.293841 (0.134141) 0.052194 / 0.128546 (-0.076352) 0.018775 / 0.075646 (-0.056871) 0.399063 / 0.419271 (-0.020209) 0.061019 / 0.043533 (0.017487) 0.370943 / 0.255139 (0.115804) 0.398326 / 0.283200 (0.115127) 0.136893 / 0.141683 (-0.004790) 1.777431 / 1.452155 (0.325276) 1.844354 / 1.492716 (0.351638)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.267296 / 0.018006 (0.249289) 0.565133 / 0.000490 (0.564643) 0.005811 / 0.000200 (0.005611) 0.000122 / 0.000054 (0.000068)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.027009 / 0.037411 (-0.010402) 0.125907 / 0.014526 (0.111381) 0.122111 / 0.176557 (-0.054445) 0.189023 / 0.737135 (-0.548112) 0.140510 / 0.296338 (-0.155829)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.589269 / 0.215209 (0.374060) 6.038038 / 2.077655 (3.960384) 2.394681 / 1.504120 (0.890561) 2.099268 / 1.541195 (0.558073) 2.105146 / 1.468490 (0.636656) 1.216304 / 4.584777 (-3.368473) 5.823110 / 3.745712 (2.077397) 4.999323 / 5.269862 (-0.270539) 2.781554 / 4.565676 (-1.784122) 0.148370 / 0.424275 (-0.275905) 0.015163 / 0.007607 (0.007556) 0.775153 / 0.226044 (0.549109) 7.425314 / 2.268929 (5.156385) 3.320254 / 55.444624 (-52.124370) 2.718595 / 6.876477 (-4.157881) 2.696215 / 2.142072 (0.554142) 1.452249 / 4.805227 (-3.352978) 0.281355 / 6.500664 (-6.219309) 0.088146 / 0.075469 (0.012677)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.495718 / 1.841788 (-0.346070) 17.498714 / 8.074308 (9.424405) 20.109705 / 10.191392 (9.918313) 0.233053 / 0.680424 (-0.447371) 0.028336 / 0.534201 (-0.505865) 0.538146 / 0.579283 (-0.041137) 0.642106 / 0.434364 (0.207742) 0.597214 / 0.540337 (0.056876) 0.732219 / 1.386936 (-0.654717)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.008153 / 0.011353 (-0.003200) 0.005605 / 0.011008 (-0.005403) 0.096159 / 0.038508 (0.057651) 0.034102 / 0.023109 (0.010992) 0.428091 / 0.275898 (0.152193) 0.476535 / 0.323480 (0.153056) 0.006278 / 0.007986 (-0.001708) 0.006752 / 0.004328 (0.002424) 0.100553 / 0.004250 (0.096302) 0.045546 / 0.037052 (0.008494) 0.463236 / 0.258489 (0.204747) 0.502512 / 0.293841 (0.208671) 0.051014 / 0.128546 (-0.077533) 0.018499 / 0.075646 (-0.057148) 0.127587 / 0.419271 (-0.291685) 0.059254 / 0.043533 (0.015722) 0.432248 / 0.255139 (0.177109) 0.462002 / 0.283200 (0.178802) 0.124918 / 0.141683 (-0.016765) 1.689740 / 1.452155 (0.237585) 1.871546 / 1.492716 (0.378830)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.274844 / 0.018006 (0.256838) 0.570522 / 0.000490 (0.570032) 0.004008 / 0.000200 (0.003808) 0.000146 / 0.000054 (0.000091)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.025323 / 0.037411 (-0.012088) 0.116323 / 0.014526 (0.101797) 0.129434 / 0.176557 (-0.047122) 0.187069 / 0.737135 (-0.550067) 0.134459 / 0.296338 (-0.161880)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.633551 / 0.215209 (0.418341) 6.290078 / 2.077655 (4.212423) 2.692071 / 1.504120 (1.187951) 2.354344 / 1.541195 (0.813149) 2.409260 / 1.468490 (0.940770) 1.270515 / 4.584777 (-3.314261) 5.552982 / 3.745712 (1.807270) 3.041417 / 5.269862 (-2.228444) 1.920634 / 4.565676 (-2.645043) 0.142500 / 0.424275 (-0.281775) 0.014378 / 0.007607 (0.006770) 0.786444 / 0.226044 (0.560399) 7.711558 / 2.268929 (5.442630) 3.439688 / 55.444624 (-52.004936) 2.742314 / 6.876477 (-4.134163) 2.800531 / 2.142072 (0.658458) 1.405843 / 4.805227 (-3.399385) 0.245322 / 6.500664 (-6.255342) 0.076662 / 0.075469 (0.001193)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.592961 / 1.841788 (-0.248827) 18.165647 / 8.074308 (10.091339) 20.011433 / 10.191392 (9.820041) 0.240558 / 0.680424 (-0.439866) 0.026045 / 0.534201 (-0.508156) 0.529610 / 0.579283 (-0.049674) 0.652494 / 0.434364 (0.218130) 0.612284 / 0.540337 (0.071947) 0.733180 / 1.386936 (-0.653756)

@maddiedawson maddiedawson deleted the from_spark branch April 26, 2023 17:11
@lhoestq lhoestq mentioned this pull request Apr 26, 2023
@yanzia12138
Copy link

python 3.9.2
Got an error _pickle.PicklingError use Dataset.from_spark.

Did the dataset import load data from spark dataframe using multi-node Spark cluster
df = spark.read.parquet(args.input_data).repartition(50)
ds = Dataset.from_spark(df, keep_in_memory=True,
cache_dir="/pnc-data/data/nuplan/t5_spark/cache_data")
ds.save_to_disk(args.output_data)

Error :
_pickle.PicklingError: Could not serialize object: RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforma
tion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
23/06/16 21:17:20 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)

@lhoestq
Copy link
Member

lhoestq commented Jun 16, 2023

Hi @yanzia12138 ! Could you open a new issue please and share the full stack trace ? This will help to know what happened exactly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants