Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve original Dask partitions by default in Dataset.to_parquet #254

Merged
merged 4 commits into from
Mar 27, 2023

Conversation

rjzamora
Copy link
Contributor

Modifies the default behavior of Dataset.to_parquet to be more consistent with dask.dataframe.DataFrame.to_parquet in the sense that each output file will correspond to distinct Dask partition.

This may cause breakage for some workflows that are expecting a different number of output files (one that is related to the number of worker processes). With that in mind, I feel pretty strongly that we need this change.

@rnyak - I'm hoping this will begin to clear up the confusion around what users need to do to control/preserve partition sizes. In general, I'd advise against anyone using the out_files_per_proc argument unless their primary goal is effective shuffling. The historical reason that we have not preserved Dask partitions is that we have prioritized shuffling over deterministic partition sizes. However, when out_files_per_proc is not specified, I think we can assume that partitioning can/should be much simpler. Does this make sense to you?

@rjzamora rjzamora added the breaking Breaking change label Mar 23, 2023
@rnyak
Copy link
Contributor

rnyak commented Mar 24, 2023

@rjzamora thanks for this PR. I have 2 questions:

This may cause breakage for some workflows that are expecting a different number of output files (one that is related to the number of worker processes). With that in mind, I feel pretty strongly that we need this change.

Currently, when we run multi-gpu training we expect that each parquet files has the number of partition that is divisible by the number of gpus. if we do multi-gpu training on 2 gpus, then we expect the parquet files have 2, 4, 6,8, .. partitions. if they dont, we do .repartition() under the hood in TF4Rec. With your changes, are we still able to do that? are we able to repartition a single parquet file?

In general, I'd advise against anyone using the out_files_per_proc argument unless their primary goal is effective shuffling.

Well, we might recommend out_files_per_proc when users have OOM issue during model training with exported parquet files. Since NVT exports one single parquet file with single GPU, this single parquet file might be very large. In that case, setting ``out_files_per_proc` and exporting multiple parquet files from NVT workflow would help in model training, would not?

@rjzamora
Copy link
Contributor Author

Currently, when we run multi-gpu training we expect that each parquet files has the number of partition that is divisible by the number of gpus. if we do multi-gpu training on 2 gpus, then we expect the parquet files have 2, 4, 6,8, .. partitions. if they dont, we do .repartition() under the hood in TF4Rec. With your changes, are we still able to do that? are we able to repartition a single parquet file?

This question is tricky to answer because the behavior depends on the options use in both the original to_parquet call and the final Dataset() call before data-loading (that is, you are asking more about the training component of a full workflow than you are about the to_parquet phase). With or without this PR, the number of partitions at training time will usually have more to do with the options used in the Dataset() call than it will have to do with the original to_parquet call.

With that said, you are correct that the number of files written by the to_parquet call is still important, because that will correspond to the minimum number of partitions you can get at read time (unless they pass out_files_per_proc=True - which I've never seen anyone use in Merlin yet).

There are already multiple ways to specify the number of files you want written out in Dataset.to_parquet. The easiest and recommended way to do this is to pass output_files=n, where n is the exact number of files you want written. However, if you just want the number of files to be a multiple of the number of workers, you can also set out_files_per_proc to this multiple. This PR does not add or remove either of these two options, it merely changes the default output_files value to be the number of Dask partitions whenever out_files_per_proc is not specified.

No matter the number of files written out by to_parquet, the number of partitions you get at read time will typically be larger than that number unless the part_size (or part_mem_frac) setting is larger than the in-memory size of each file. (Side Note: If out_files_per_proc=True, the number of partitions can actually be less than the number of files).

Altogether, there is currently no argument on the read side that has the explicit purpose of specifying the desired number of partitions, and the changes in this PR do not affect this limitation in any way. With that said, if the user wants a perfect file-to-partition mapping at read time, we could certainly tweak the Dataset API to make it possible to explicitly request this.

Well, we might recommend out_files_per_proc when users have OOM issue during model training with exported parquet files. Since NVT exports one single parquet file with single GPU, this single parquet file might be very large. In that case, setting ``out_files_per_proc` and exporting multiple parquet files from NVT workflow would help in model training, would not?

This PR makes it so that merlin/NVT will not export one large parquet file unless the user specifically sets out_files_per_proc=1 (or the original data was read in using a single Dask partition). The new default will be to write a distinct file for each Dask partition, unless the user specifies a output_files or out_files_per_proc argument to change this. My recommendation still stands that using output_files should be preferred over out_files_per_proc, unless the desire is to produce a well-shuffled dataset (since one needs out_files_per_proc>1 for the workers to shuffle data from each input partition into multiple output files).

@karlhigley karlhigley added this to the Merlin 23.04 milestone Mar 24, 2023
@karlhigley karlhigley merged commit b7cf860 into NVIDIA-Merlin:main Mar 27, 2023
@rjzamora rjzamora deleted the preserve-partitions branch March 27, 2023 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking Breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants