-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Python UDF should only compute the upstream once #8833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If i understand this correctly, we are assuming the following in order for this to work:
- Each task gets their own copy of the deserialized closure, and thus their own copy of the queue.
- All closures are serialized together in one shot, rather than in multiple places (e.g. they are all done in the serializer, not in the ctor of the RDD)
- Java serializer does not serialize objects twice within the same stream, since it uses it to detect cycles. When they are deserialized, they still point to the same copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
|
Test build #42714 has finished for PR 8833 at commit
|
|
Test build #1775 has finished for PR 8833 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pull this out in a constant? And also the same value in the Python, and put a comment on each saying that they have to equal? It's very dangerous if this value goes out of sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two values don't need to be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I was still thinking about my first attempt which involved a blocking queue.
|
lgtm! So this avoids deadlock by getting rid of the blocking queue (duh!) and then assumes the OS buffer will rate limit how much gets buffered on the writer side? Looking forward to getting this fix in. |
|
Based on some extended offline discussion / debate and code-review, we've decided to merge #8835 instead of this fix. The basic approaches in both patches are the same, except #8835 makes the scope + lifecycle of the queue a little clearer and will be easier to understand / maintain in the long term. I've reviewed that other patch pretty carefully and don't believe that it represents significantly more backporting risk than this patch. |
…ndling This patch refactors Python UDF handling: 1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs. 2. Use PythonRunner in Spark SQL's BatchPythonEvaluation. 3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5. There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small. This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution. Author: Reynold Xin <rxin@databricks.com> Closes #8835 from rxin/python-iter-refactor.
…ndling This patch refactors Python UDF handling: 1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs. 2. Use PythonRunner in Spark SQL's BatchPythonEvaluation. 3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5. There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small. This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution. Author: Reynold Xin <rxin@databricks.com> Closes #8835 from rxin/python-iter-refactor. (cherry picked from commit a96ba40) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
…ndling This patch refactors Python UDF handling: 1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs. 2. Use PythonRunner in Spark SQL's BatchPythonEvaluation. 3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5. There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small. This basically implements the approach in apache/spark#8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution. Author: Reynold Xin <rxin@databricks.com> Closes #8835 from rxin/python-iter-refactor.
This PR changes to buffer the rows from upstream into a Queue, then zip them with result from Python UDF, to avoid the double computation of upstream.
Thanks the idea from @rxin to simplify the buffer greatly!
cc @marmbrus
Closes #8662