Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task]: Spark runner flatMap output should not be required to fit in the memory #23852

Closed
JozoVilcek opened this issue Oct 26, 2022 · 4 comments

Comments

@JozoVilcek
Copy link
Contributor

JozoVilcek commented Oct 26, 2022

What needs to happen?

Currently on Spark runner, if single processElement call produces multiple output elements, they all needs to fit in the memory [1]. This is problematic e.g. for ParquetIO, which instead of Source<> based reads uses DoFn and let reader from inside DoFn push all elements to the output. Similar happens with JdbcIO and was discussed here [2].

The goal is to overcome this constraint and allow to produce large output from DoFn on Spark runner.

[1] https://github.com/apache/beam/blob/v2.39.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java#L125

[2] https://www.mail-archive.com/dev@beam.apache.org/msg16806.html

Issue Priority

Priority: 2

Issue Component

Component: runner-spark

@JozoVilcek
Copy link
Contributor Author

.take-issue

@JozoVilcek
Copy link
Contributor Author

e-mail thread for collecting feedback on initial WIP implementation
https://www.mail-archive.com/dev@beam.apache.org/msg27521.html

JozoVilcek pushed a commit to JozoVilcek/beam that referenced this issue Dec 29, 2022
JozoVilcek pushed a commit to JozoVilcek/beam that referenced this issue Dec 30, 2022
JozoVilcek pushed a commit to JozoVilcek/beam that referenced this issue Jan 31, 2023
@mosche mosche closed this as completed in 01aa470 Feb 2, 2023
@github-actions github-actions bot added this to the 2.46.0 Release milestone Feb 2, 2023
@JozoVilcek
Copy link
Contributor Author

This was fixed for SparkRunner by adding and option to enable it via experiment. @mosche I wonder if it make sense to make necessary changes also for structured streaming or portable runner. What do you think?

@mosche
Copy link
Member

mosche commented Feb 2, 2023

@JozoVilcek It looks like SDFs on portable pipelines are expanded using a different mechanisms. Though, I haven't ever looked deeply into it to be honest.
In any case it makes sense to open a similar issue for the structured streaming runner 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants