-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) #22305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) #22305
Conversation
|
The current state is a minimum working version - I copied some code from |
|
Test build #95551 has finished for PR 22305 at commit
|
|
Test build #95730 has finished for PR 22305 at commit
|
|
Test build #96048 has finished for PR 22305 at commit
|
|
Test build #96083 has finished for PR 22305 at commit
|
|
retest this please |
|
Test build #96084 has finished for PR 22305 at commit
|
|
Test build #96150 has finished for PR 22305 at commit
|
278abbf to
d947317
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit is new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is changed to have process == null for python UDF case
|
Test build #96156 has finished for PR 22305 at commit
|
|
Test build #96158 has finished for PR 22305 at commit
|
|
Test build #96155 has finished for PR 22305 at commit
|
|
Test build #96184 has finished for PR 22305 at commit
|
|
Test build #96201 has finished for PR 22305 at commit
|
|
Test build #96260 has finished for PR 22305 at commit
|
bb05ee0 to
c1771a2
Compare
|
Test build #96372 has finished for PR 22305 at commit
|
|
Test build #96373 has finished for PR 22305 at commit
|
|
Test build #96374 has finished for PR 22305 at commit
|
|
cc @HyukjinKwon @ueshin @BryanCutler @felixcheung This PR is ready for review. I have updated the description so hopefully it is easier to review. Please let me know if you need any clarification or anything I can help with the review. Thanks! Li |
|
Test build #96376 has finished for PR 22305 at commit
|
|
Gental ping @cloud-fan @gatorsmile @HyukjinKwon @ueshin |
|
Hey folks, any thoughts on this PR? |
|
I think there is a typo in your example in the description I think you didn't mean to have Also, I think exploring returning numpy arrays would be good, but lets discuss elsewhere and I would remove that from your description if it isn't supported as part of this. |
|
@BryanCutler Yes that was a typo :) Thanks! I am also +1 to support numpy data structure in addition to Pandas. So happy to discuss here or separately. |
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@icexelloss , I just have a few initial questions about the python worker. I'll have to look at the rest in more detail later.
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently not used right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is not used. I leave it here just to show the difference for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get rid of it then. Looks we're going to make it as a separate one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed
python/pyspark/worker.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't you use s.iloc(...) here and would that be any better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot the reason that I choose this take over iloc. I will do a bit more investigation and report back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also use s[begin:end]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, s[begin:end] is actually faster
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you have a Seq now to support a mix of unbounded/bounded eval types? Is there any other way to do this, like maybe decipher from the bounds given?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be other way to do this. I like extending a single eval type to a seq of eval types because it seems pretty simple and flexible to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only eval types that can be used together are unbounded and bounded windows right? I only worry that this might lead to more complications with the other types that cannot be sent to the worker together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point - I can see this being used for other things too, for example, numpy variant vectorized UDFs, or Window transform UDFs for unbounded window (n -> n mapping for unbounded window, such as rank). I choose this approach because of the flexibility.
For this particular case, it is possible to distinguish between bounded/unbounded, for example, maybe sending something in the arg offsets or sth like that, but this would be using arg offsets for sth else...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that the additional complexity this adds is worth it for now, but curious what others think.
If I understand correctly, the python worker just takes an index range for bounded windows and uses the entire range for unbounded. It does not really care about anything else. So couldn't you just send an index that encompasses the entire range for unbounded? Then you would only need to define SQL_WINDOW_AGG_PANDAS_UDF in the worker and all the same code for both, which would simplify quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So couldn't you just send an index that encompasses the entire range for unbounded
This is actually what I first did. However, I think this would require sending more data than necessary for the unbounded case. In worst case it will be 3x the number of columns (begin_index, end_index, data) comparing to just one column (data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just send a flag to indicate it has bounds or not? or add something to runner_conf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, setting evalTypes to Seq doesn't looks a great idea. I agree with https://github.com/apache/spark/pull/22305/files#r223774544.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's going to require another necessary change, then please go ahead. At least two committers here understood why it's introduced here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted the evalType change and use a runner config instead.
|
retest this please |
|
Test build #100043 has finished for PR 22305 at commit
|
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good @icexelloss ! I just had some minor things, and a little concern about using the udf index to reference the bound type.
python/pyspark/sql/functions.py
Outdated
| .. note:: For performance reasons, the input series to window functions are not copied. | ||
| Therefore, changing the value of the input series is not allowed and will | ||
| result incorrect results. For the same reason, users should also not rely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be "result in incorrect"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or "cause incorrect results" might sound better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
|
|
||
| @property | ||
| def pandas_agg_count_udf(self): | ||
| from pyspark.sql.functions import pandas_udf, PandasUDFType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this import and all the others should be moved to the top, it's repeated many times. It could done be a follow though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM. Opened https://jira.apache.org/jira/browse/SPARK-26364
python/pyspark/worker.py
Outdated
| def wrap_window_agg_pandas_udf(f, return_type): | ||
| def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index): | ||
| window_bound_types_str = runner_conf.get('pandas_window_bound_types') | ||
| window_bound_type = [t.strip() for t in window_bound_types_str.split(',')][udf_index] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change, I think using the runner_conf is cleaner than before. As an extra precaution, you might change to lower() also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good!
| /** | ||
| * This class calculates and outputs windowed aggregates over the rows in a single partition. | ||
| * | ||
| * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't not -> does not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed.
| for i in range(num_udfs): | ||
| arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf) | ||
| arg_offsets, udf = read_single_udf( | ||
| pickleSer, infile, eval_type, runner_conf, udf_index=i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So right now, since only window udfs can be sent together the mapping of udf_index -> window_bound_type should hold right? It is a little fragile though and cause problems if that ever changed in the future. Instead of indexing the udfs, what do you think about popping window_bound_type values from the runner_conf list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand you correctly, do you mean that we maintain a mutable list of the remaining window_bound_type and pass that to the read_single_udf function instead of using the udf_index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, basically this
window_eval_type_str, remaining_type_str = runner_conf['pandas_window_bound_types'].split(',', 1)
runner_conf['pandas_window_bound_types'] = remaining_type_str
window_eval_type = window_eval_type_str.strip().lower()
I'm not crazy about changing the conf inplace, but it wouldn't rely on any particular udf indexing then. Maybe it would make more sense to check the eval type before calling read_single_udf, process the conf and then send the window_eval_type as an optional param to read_single_udf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a big deal though, so don't block merging if the rest is ready. It can be improved upon later if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BryanCutler I don't know which way is better.. if we pass window_eval_type to read_single_udf then other non window codepath needs to handle that as well.
Yeah I agree we can revisit this later if needed.
|
Test build #100100 has started for PR 22305 at commit |
|
retest this please |
|
Test build #100168 has finished for PR 22305 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
Outdated
Show resolved
Hide resolved
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise as well.
|
@BryanCutler @HyukjinKwon Sorry I am out of town now but will take one more look at the recent comments on Monday. Thank you both. |
|
Test build #100258 has finished for PR 22305 at commit
|
|
Test build #100259 has finished for PR 22305 at commit
|
|
Merged to master! Thanks you for bearing with my swarming nit picks, @icexelloss! Post-hoc review is completely welcome. Please leave some comments. I can help make followups as well. |
|
Oh! BTW, let's dont forget about documenting this at docs anytime before 3.0 release |
|
Thank you @HyukjinKwon @BryanCutler @ueshin for the thorough review! |
…with Pandas UDF (bounded window) ## What changes were proposed in this pull request? This PR implements a new feature - window aggregation Pandas UDF for bounded window. #### Doc: https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj #### Example: ``` from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.window import Window df = spark.range(0, 10, 2).toDF('v') w1 = Window.partitionBy().orderBy('v').rangeBetween(-2, 4) w2 = Window.partitionBy().orderBy('v').rowsBetween(-2, 2) pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() df.withColumn('v_mean', avg(df['v']).over(w1)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 1.0| # | 2| 2.0| # | 4| 4.0| # | 6| 6.0| # | 8| 7.0| # +---+------+ df.withColumn('v_mean', avg(df['v']).over(w2)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 2.0| # | 2| 3.0| # | 4| 4.0| # | 6| 5.0| # | 8| 6.0| # +---+------+ ``` #### High level changes: This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows. * `WindowInPandasExec` now share the same base class as `WindowExec` and share utility functions. See `WindowExecBase` * `WindowFunctionFrame` now has two new functions `currentLowerBound` and `currentUpperBound` - to return the lower and upper window bound for the current output row. It is also modified to allow `AggregateProcessor` == null. Null aggregator processor is used for `WindowInPandasExec` where we don't have an aggregator and only uses lower and upper bound functions from `WindowFunctionFrame` * The biggest change is in `WindowInPandasExec`, where it is modified to take `currentLowerBound` and `currentUpperBound` and write those values together with the input data to the python process for rolling window aggregation. See `WindowInPandasExec` for more details. #### Discussion In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version: Spark SQL window function: 20s Pandas variant: ~80s Numpy variant: 10s Numpy variant with numba: 4s Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR. ## How was this patch tested? New tests Closes apache#22305 from icexelloss/SPARK-24561-bounded-window-udf. Authored-by: Li Jin <ice.xelloss@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…with Pandas UDF (bounded window) ## What changes were proposed in this pull request? This PR implements a new feature - window aggregation Pandas UDF for bounded window. #### Doc: https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj #### Example: ``` from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.window import Window df = spark.range(0, 10, 2).toDF('v') w1 = Window.partitionBy().orderBy('v').rangeBetween(-2, 4) w2 = Window.partitionBy().orderBy('v').rowsBetween(-2, 2) pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() df.withColumn('v_mean', avg(df['v']).over(w1)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 1.0| # | 2| 2.0| # | 4| 4.0| # | 6| 6.0| # | 8| 7.0| # +---+------+ df.withColumn('v_mean', avg(df['v']).over(w2)).show() # +---+------+ # | v|v_mean| # +---+------+ # | 0| 2.0| # | 2| 3.0| # | 4| 4.0| # | 6| 5.0| # | 8| 6.0| # +---+------+ ``` #### High level changes: This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows. * `WindowInPandasExec` now share the same base class as `WindowExec` and share utility functions. See `WindowExecBase` * `WindowFunctionFrame` now has two new functions `currentLowerBound` and `currentUpperBound` - to return the lower and upper window bound for the current output row. It is also modified to allow `AggregateProcessor` == null. Null aggregator processor is used for `WindowInPandasExec` where we don't have an aggregator and only uses lower and upper bound functions from `WindowFunctionFrame` * The biggest change is in `WindowInPandasExec`, where it is modified to take `currentLowerBound` and `currentUpperBound` and write those values together with the input data to the python process for rolling window aggregation. See `WindowInPandasExec` for more details. #### Discussion In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version: Spark SQL window function: 20s Pandas variant: ~80s Numpy variant: 10s Numpy variant with numba: 4s Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR. ## How was this patch tested? New tests Closes apache#22305 from icexelloss/SPARK-24561-bounded-window-udf. Authored-by: Li Jin <ice.xelloss@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
|
Are these changes already present in a release? Or will we have to wait for release 3.0? |
|
Yes, Please try Spark 3.0 preview. |
What changes were proposed in this pull request?
This PR implements a new feature - window aggregation Pandas UDF for bounded window.
Doc:
https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit#heading=h.c87w44wcj3wj
Example:
High level changes:
This PR modifies the existing WindowInPandasExec physical node to deal with unbounded (growing, shrinking and sliding) windows.
WindowInPandasExecnow share the same base class asWindowExecand share utility functions. SeeWindowExecBaseWindowFunctionFramenow has two new functionscurrentLowerBoundandcurrentUpperBound- to return the lower and upper window bound for the current output row. It is also modified to allowAggregateProcessor== null. Null aggregator processor is used forWindowInPandasExecwhere we don't have an aggregator and only uses lower and upper bound functions fromWindowFunctionFrameWindowInPandasExec, where it is modified to takecurrentLowerBoundandcurrentUpperBoundand write those values together with the input data to the python process for rolling window aggregation. SeeWindowInPandasExecfor more details.Discussion
In benchmarking, I found numpy variant of the rolling window UDF is much faster than the pandas version:
Spark SQL window function: 20s
Pandas variant: ~80s
Numpy variant: 10s
Numpy variant with numba: 4s
Allowing numpy variant of the vectorized UDFs is something I want to discuss because of the performance improvement, but doesn't have to be in this PR.
How was this patch tested?
New tests