-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41005][CONNECT][PYTHON][FOLLOW-UP] Fetch/send partitions in parallel for Arrow based collect #38613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can create a threadpool? (shared across collect invocations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I just noticed the review comment. I believe this is matched with our current implementation in PySpark. If we should improve, let's improve both paths together. I would prefer to match them and deduplicate the logic first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for match the implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a thread pool if you have thread sitting around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do it need to be synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it doesn't (because it's guided by the index). This approach is actually from the initial ordered implementation of collect with Arrow (that were in production for very long time), 82c18c2#diff-459628811d7786c705fbb2b7a381ecd2b88f183f44ab607d43b3d33ea48d390fR3282-R3318.
Really? I think the best case is also sending partitions one by one. Anyway, this PR looks good as it avoids the lock. |
|
It collects all results first because of synced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I suggested to use locks and the main thread to write the results is exactly what this comment is trying to convey. You don't want these operations to happen inside the DAGScheduler thread. If you keep that blocked for something none scheduling related, you will stop all other scheduling. This is particularly bad in an environment where you might have multiple users running code at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have seen in higher concurrency scenarios that this does become a problem. Throughput will plateau because the DAGScheduler is doing the wrong things. I would like to avoid that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! We should write it down as code comments. @zhengruifeng can you help with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. let me add a comment for this
f9a28bc to
7268303
Compare
|
Actually let's just go with #38614 approach which is simpler. This approach can't easily dedup the codes anyway because of ordering anyway. |
… batch in main thread ### What changes were proposed in this pull request? Document the reason of sending batch in main thread ### Why are the changes needed? as per #38613 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? no, doc-only Closes #38654 from zhengruifeng/connect_doc_collect. Lead-authored-by: Ruifeng Zheng <ruifengz@apache.org> Co-authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… batch in main thread ### What changes were proposed in this pull request? Document the reason of sending batch in main thread ### Why are the changes needed? as per apache#38613 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? no, doc-only Closes apache#38654 from zhengruifeng/connect_doc_collect. Lead-authored-by: Ruifeng Zheng <ruifengz@apache.org> Co-authored-by: Ruifeng Zheng <ruifengz@foxmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR is a followup of #38468 that proposes to remove notify-wait approach, and introduce a new way to collect partitions in parallel, and send them in order.
Previously, it actually waits until all results are stored all first, and then send them one by one in Protobuf message; (therefore, notify-wait isn't needed in fact).
Both worse and best cases, we will always collect all partitions first and send them partition by partition.
Now, it sends Protobuf messages in an order whenever 0th partition is available (and send the next if available).
Worse case, we will collect all partitions and send them one by one. Best case is to send partition by partition as it's collected.
Why are the changes needed?
For better performance, less memory usage, and better readability and maintinability (by removing synchronization)
Does this PR introduce any user-facing change?
No, this feature is not released yet, and this is performance only fix.
How was this patch tested?
CI in this PR should test it out.