-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add IterableDataset.from_spark #5770
Conversation
The documentation is not available anymore as the PR was closed or merged. |
cd77191
to
e182947
Compare
Hi again @lhoestq this is ready for review! Not sure I have permission to add people to the reviewers list... |
Cool ! I think you can define |
return generate_fn | ||
|
||
|
||
class SparkExamplesIterable(_BaseExamplesIterable): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love it !
Thanks for reviewing! I moved the streaming behavior to IterableDataset.from_spark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool thanks ! I added one last comment.
Also if you want feel free to enrich the documentation at use_with_spark.mdx, but we can also do it in a subsequent PR. E.g. have a dedicated new section for it and explain the advantage compared to Dataset.from_spark
bc84cd7
to
cf2402c
Compare
Thanks Quentin! I'll flesh out the docs in a follow-up PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Friendly ping @lhoestq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome ! I just added comments to fix a few things.
Also partition_order
doesn't seem to have any impact right now, can you check if something is missing ? Feel free to add tests for shuffle_data_sources and shard_data_sources to make sure they work as expected ;)
): | ||
def generate_fn(): | ||
row_id = 0 | ||
for row in df.rdd.toLocalIterator(True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if there's a way to iterate on pyarrow tables ? Or on pandas dataframes ? I expect it would also make it much faster to query the rows by batch.
Also asking because I'm implementing #5821 and it will be helpful :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm depending on your use case, maybe pyarrow.Table's to_reader (https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_reader) ? Either that or to_batches, both are zero-copy. For pandas, there's iterrows (https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.iterrows.html#pandas-dataframe-iterrows), but I think this makes a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I mean is it possible to get a Table or pandas DataFrame iterator from a spark DataFrame ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know of a great way beside using toPandas on the df, which will collect the entire thing to the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thanks for the info :)
Have you had a chance to test the speed of toLocalIterator
? Ideally it should be fast enough to be able to train models efficiently 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's about twice as fast as the non-streaming version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome ! This should be the preferred way that is showcased in the documentation then 😎
Thanks @lhoestq ! I fixed the partition order thing and added more unit tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, good job implementing this !
Show benchmarksPyArrow==8.0.0 Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
|
Follow-up from #5701
Related issue: #5678