Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
Upgrading PySpark
==================

Upgrading from PySpark 4.1 to 4.2
---------------------------------
* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache Arrow by default. The configuration ``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore the legacy (non-Arrow) row-based data exchange, set ``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDF execution, set ``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDTF execution, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.

Upgrading from PySpark 4.0 to 4.1
---------------------------------

Expand Down
20 changes: 14 additions & 6 deletions python/docs/source/tutorial/sql/arrow_pandas.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ Enabling for Conversion to/from Pandas

Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame
using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame from a Pandas DataFrame with
:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, users need to first set
the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to ``true``. This is disabled by default.
:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls,
the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` must be set to ``true``. This is enabled by default.

In addition, optimizations enabled by ``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual computation within Spark.
Expand Down Expand Up @@ -368,18 +368,26 @@ Here's an example that demonstrates the usage of both a default, pickled Python
:lines: 298-316
:dedent: 4

Type coercion:
~~~~~~~~~~~~~~

Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF
type coercion poses challenges when the Python instances returned by UDFs do not align with the user-specified
return type. The default, pickled Python UDFs' type coercion has certain limitations, such as relying on None as a
fallback for type mismatches, leading to potential ambiguity and data loss. Additionally, converting date, datetime,
and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the other hand, leverage Arrow's
capabilities to standardize type coercion and address these issues effectively.

A note on Arrow Python UDF type coercion: In Spark 4.1, unnecessary conversion to pandas instances is removed in the serializer
when ``spark.sql.execution.pythonUDF.arrow.enabled`` is enabled. As a result, the type coercion changes
when the produced output has a schema different from the specified schema. To restore the previous behavior,
enable ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled``.
Type coercion differences are introduced by the following changes:
* Since Spark 4.2, Arrow optimization is enabled by default for regular Python UDFs.
The full type coercion difference is summarized in the tables `here <https://github.com/apache/spark/pull/41706>`__.
To disable Arrow optimization, set ``spark.sql.execution.pythonUDF.arrow.enabled`` to false.

* Since Spark 4.1, unnecessary conversion to pandas instances in Arrow-optimized Python UDF is removed in the serializer
when ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` is disabled.

The behavior difference is summarized in the tables `here <https://github.com/apache/spark/pull/51225>`__.
To restore the legacy behavior, set ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` to true.

Usage Notes
-----------
Expand Down
3 changes: 2 additions & 1 deletion python/docs/source/tutorial/sql/python_udtf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ Arrow Optimization
------------------

Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer
data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs.
data between Java and Python processes. Beginning in Spark 4.2, Apache Arrow is enabled by default for Python UDTFs.
To disable Arrow optimization, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to false.

Arrow can improve performance when each input row generates a large result table from the UDTF.

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/tutorial/sql/type_conversions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ are listed below:
- Default
* - spark.sql.execution.pythonUDF.arrow.enabled
- Enable PyArrow in PySpark. See more `here <arrow_pandas.rst>`_.
- False
- True
* - spark.sql.pyspark.inferNestedDictAsStruct.enabled
- When enabled, nested dictionaries are inferred as StructType. Otherwise, they are inferred as MapType.
- False
Expand Down
56 changes: 29 additions & 27 deletions python/pyspark/sql/tests/test_unified_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,39 +351,41 @@ def test_regular_python_udf(self):
import pandas as pd
import pyarrow as pa

@udf(returnType=LongType())
def f1(x):
return x + 1
with self.sql_conf({"spark.sql.execution.pythonUDF.arrow.enabled": False}):

@udf(returnType=LongType())
def f2(x: int) -> int:
return x + 1
@udf(returnType=LongType())
def f1(x):
return x + 1

# Cannot infer a vectorized UDF type
@udf(returnType=LongType())
def f3(x: int) -> pd.Series:
return x + 1
@udf(returnType=LongType())
def f2(x: int) -> int:
return x + 1

# Cannot infer a vectorized UDF type
@udf(returnType=LongType())
def f4(x: int) -> pa.Array:
return x + 1
# Cannot infer a vectorized UDF type
@udf(returnType=LongType())
def f3(x: int) -> pd.Series:
return x + 1

# useArrow is explicitly set to false
@udf(returnType=LongType(), useArrow=False)
def f5(x: pd.Series) -> pd.Series:
return x + 1
# Cannot infer a vectorized UDF type
@udf(returnType=LongType())
def f4(x: int) -> pa.Array:
return x + 1

# useArrow is explicitly set to false
@udf(returnType=LongType(), useArrow=False)
def f6(x: pa.Array) -> pa.Array:
return x + 1
# useArrow is explicitly set to false
@udf(returnType=LongType(), useArrow=False)
def f5(x: pd.Series) -> pd.Series:
return x + 1

expected = self.spark.range(10).select((sf.col("id") + 1).alias("res")).collect()
for f in [f1, f2, f3, f4, f5, f6]:
self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
result = self.spark.range(10).select(f("id").alias("res")).collect()
self.assertEqual(result, expected)
# useArrow is explicitly set to false
@udf(returnType=LongType(), useArrow=False)
def f6(x: pa.Array) -> pa.Array:
return x + 1

expected = self.spark.range(10).select((sf.col("id") + 1).alias("res")).collect()
for f in [f1, f2, f3, f4, f5, f6]:
self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
result = self.spark.range(10).select(f("id").alias("res")).collect()
self.assertEqual(result, expected)

def test_arrow_optimized_python_udf(self):
import pandas as pd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3917,7 +3917,7 @@ object SQLConf {
.doc("(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.enabled'.)")
.version("2.3.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val ARROW_PYSPARK_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.pyspark.enabled")
Expand Down Expand Up @@ -4263,7 +4263,7 @@ object SQLConf {
"can only be enabled when the given function takes at least one argument.")
.version("3.4.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val PYTHON_UDF_ARROW_CONCURRENCY_LEVEL =
buildConf("spark.sql.execution.pythonUDF.arrow.concurrency.level")
Expand Down Expand Up @@ -4299,7 +4299,7 @@ object SQLConf {
.doc("Enable Arrow optimization for Python UDTFs.")
.version("3.5.0")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val PYTHON_TABLE_UDF_LEGACY_PANDAS_CONVERSION_ENABLED =
buildConf("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled")
Expand Down