Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] mapInPandas doesn't invoke udf on empty partitions #9480

Closed
eordentlich opened this issue Oct 19, 2023 · 2 comments · Fixed by #9557
Closed

[BUG] mapInPandas doesn't invoke udf on empty partitions #9480

eordentlich opened this issue Oct 19, 2023 · 2 comments · Fixed by #9557
Assignees
Labels
bug Something isn't working

Comments

@eordentlich
Copy link
Contributor

eordentlich commented Oct 19, 2023

Describe the bug
With the spark-rapids plugin enabled, mapInPandas doesn't invoke the udf on an empty partition and just returns. This can be problematic when the task is part of a barrier stage and results in deadlock/hang. The non-empty tasks reach the barrier while the empty ones complete without ever executing the barrier code in the udf.

I think this is the empty-partition-skipping logic: https://github.com/NVIDIA/spark-rapids/blob/branch-23.12/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala#L111-L114

Steps/Code to reproduce bug
spark rapids plugin jar, Spark 3.4.1 (probably any 3.x is similar)

$ pyspark --master local[1] --conf spark.plugins=com.nvidia.spark.SQLPlugin --conf spark.rapids.memory.gpu.pooling.enabled=false --conf spark.rapids.sql.batchSizeBytes=512m --conf spark.rapids.sql.concurrentGpuTasks=2 --jars ./rapids-4-spark_2.12-23.08.1.jar
>>> import pandas as pd
>>> from pyspark.sql.functions import lit,sum
>>> df = spark.range(10).withColumn("const",lit(1))
>>> df.repartition(2,"const").mapInPandas(lambda data: [pd.DataFrame([1])], schema="result:integer").select(sum("result")).collect()
[Row(sum(result)=1)]

running the same commands and code but without --conf spark.plugins=com.nvidia.spark.SQLPlugin, i.e. baseline Spark returns
[Row(sum(result)=2)]

Note that df.repartition(2,"const") has two partitions with one of them being empty.

Expected behavior
This is a bit of a corner case, but the behavior should match baseline Spark and execute the udf even on empty partitions.

Environment details (please complete the following information)
Ubuntu 22.04.2, Spark 3.4.1, python 3.9

Additional context
I think this is the root cause of hangs observed here on a toy example: NVIDIA/spark-rapids-ml#453

Maybe other python operators have a similar issue and should match Spark behavior wrt to empty partitions.

@eordentlich eordentlich added ? - Needs Triage Need team to review and classify bug Something isn't working labels Oct 19, 2023
@firestarman
Copy link
Collaborator

firestarman commented Oct 20, 2023

Thanks for catching this. I will take a look. This may be related to #1343.
We look like to need to figure out another fix for the IPC error.

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Oct 24, 2023
firestarman added a commit that referenced this issue Oct 30, 2023
fixes #9480

This PR adds support of launching Map Pandas UDF on empty partitions to align with Spark's behavior.

So far I don't see other types of Pandas UDF will be called for empty partitions.

The test is copied from the example in the linked issue.

---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator

Maybe other python operators have a similar issue and should match Spark behavior wrt to empty partitions.

Hi @eordentlich, so far I don't see other types of Pandas UDF will be called on empty partitions. If you find one, free to file an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants