[SPARK-54316][CORE][PYTHON][SQL] Consolidate GroupPandasIterUDFSerializer with GroupPandasUDFSerializer#53043
Conversation
373426a to
f8f7201
Compare
f8f7201 to
0cc9432
Compare
| batches_gen, | ||
| arrow_type, | ||
| ) in iterator: # tuple constructed in wrap_grouped_*_pandas_udf | ||
| # yields df for single UDF or [(df1, type1), (df2, type2), ...] for multiple UDFs |
There was a problem hiding this comment.
is this ser dedicated for SQL_GROUPED_MAP_PANDAS_UDF and SQL_GROUPED_MAP_PANDAS_ITER_UDF?
I think they don't support multiple UDFs?
There was a problem hiding this comment.
Ok I mistakenly thought they should support multiple UDFs, thus the implementation became more complex. I have removed this assumption and simplified the code.
python/pyspark/worker.py
Outdated
| return f(keys, value_series_gen) | ||
|
|
||
| elif eval_type in ( | ||
| PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, |
There was a problem hiding this comment.
can we exclude changes for SQL_GROUPED_AGG_PANDAS_UDF and SQL_WINDOW_AGG_PANDAS_UDF to make the PR more cleaner?
we can do it in a separate PR
GroupPandasIterUDFSerializer with GroupPandasUDFSerializer
GroupPandasIterUDFSerializer with GroupPandasUDFSerializerGroupPandasIterUDFSerializer with GroupPandasUDFSerializer
python/pyspark/worker.py
Outdated
|
|
||
| elif eval_type in ( | ||
| PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, | ||
| PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF, |
There was a problem hiding this comment.
why we need a new mapper for SQL_GROUPED_AGG_PANDAS_UDF and SQL_WINDOW_AGG_PANDAS_UDF?
There was a problem hiding this comment.
the return type of GroupPandasUDFSerializer is an iterator now. The mapper for SQL_GROUPED_AGG_PANDAS_UDF and SQL_WINDOW_AGG_PANDAS_UDF are expecting a list (so it can use a[0] to access the column). The iterator returned from GroupPandasUDFSerializer has to be converted to a list in their mapper.
There was a problem hiding this comment.
Now as we merged #53239, we don't need to change mappers for SQL_GROUPED_AGG_PANDAS_UDF and SQL_WINDOW_AGG_PANDAS_UDF
…OUPED_MAP_PANDAS_UDF and SQL_GROUPED_MAP_PANDAS_ITER_UDF
…OUPED_MAP_PANDAS_UDF and SQL_GROUPED_MAP_PANDAS_ITER_UDF
…er-serializer # Conflicts: # python/pyspark/sql/pandas/serializers.py # python/pyspark/worker.py
GroupPandasIterUDFSerializer with GroupPandasUDFSerializerGroupPandasIterUDFSerializer with GroupPandasUDFSerializer
|
merged to master |
What changes were proposed in this pull request?
This PR consolidates
GroupPandasUDFSerializerto support bothSQL_GROUPED_MAP_PANDAS_UDFandSQL_GROUPED_MAP_PANDAS_ITER_UDF, aligning with the design pattern used byGroupArrowUDFSerializer.Why are the changes needed?
When
Iterator[pandas.DataFrame]API was added togroupBy().applyInPandas()in SPARK-53614 (#52716), a newGroupPandasIterUDFSerializerclass was created. However, this class is nearly identical toGroupPandasUDFSerializer, differing only in whether batches are processed lazily (iterator mode) or all at once (regular mode).Does this PR introduce any user-facing change?
No, this is an internal refactoring that maintains backward compatibility. The API behavior remains the same from the user's perspective.
How was this patch tested?
Existing test cases.
Was this patch authored or co-authored using generative AI tooling?
Co-Generated-by: Cursor with Claude 4.5 Sonnet