diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index c6cf69dadc930..fbd63539b380c 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -19,6 +19,12 @@ Upgrading PySpark ================== +Upgrading from PySpark 4.1 to 4.2 +--------------------------------- +* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache Arrow by default. The configuration ``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore the legacy (non-Arrow) row-based data exchange, set ``spark.sql.execution.arrow.pyspark.enabled`` to ``false``. +* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDF execution, set ``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``. +* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDTF execution, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``. + Upgrading from PySpark 4.0 to 4.1 --------------------------------- diff --git a/python/docs/source/tutorial/sql/arrow_pandas.rst b/python/docs/source/tutorial/sql/arrow_pandas.rst index 3bef50874d7ff..386fe83b48218 100644 --- a/python/docs/source/tutorial/sql/arrow_pandas.rst +++ b/python/docs/source/tutorial/sql/arrow_pandas.rst @@ -60,8 +60,8 @@ Enabling for Conversion to/from Pandas Arrow is available as an optimization when converting a Spark DataFrame to a Pandas DataFrame using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame from a Pandas DataFrame with -:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, users need to first set -the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to ``true``. This is disabled by default. +:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, +the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` must be set to ``true``. This is enabled by default. In addition, optimizations enabled by ``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically to non-Arrow optimization implementation if an error occurs before the actual computation within Spark. @@ -368,6 +368,9 @@ Here's an example that demonstrates the usage of both a default, pickled Python :lines: 298-316 :dedent: 4 +Type coercion: +~~~~~~~~~~~~~~ + Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more coherent type coercion mechanism. UDF type coercion poses challenges when the Python instances returned by UDFs do not align with the user-specified return type. The default, pickled Python UDFs' type coercion has certain limitations, such as relying on None as a @@ -375,11 +378,16 @@ fallback for type mismatches, leading to potential ambiguity and data loss. Addi and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the other hand, leverage Arrow's capabilities to standardize type coercion and address these issues effectively. -A note on Arrow Python UDF type coercion: In Spark 4.1, unnecessary conversion to pandas instances is removed in the serializer -when ``spark.sql.execution.pythonUDF.arrow.enabled`` is enabled. As a result, the type coercion changes -when the produced output has a schema different from the specified schema. To restore the previous behavior, -enable ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled``. +Type coercion differences are introduced by the following changes: +* Since Spark 4.2, Arrow optimization is enabled by default for regular Python UDFs. +The full type coercion difference is summarized in the tables `here `__. +To disable Arrow optimization, set ``spark.sql.execution.pythonUDF.arrow.enabled`` to false. + +* Since Spark 4.1, unnecessary conversion to pandas instances in Arrow-optimized Python UDF is removed in the serializer +when ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` is disabled. + The behavior difference is summarized in the tables `here `__. +To restore the legacy behavior, set ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` to true. Usage Notes ----------- diff --git a/python/docs/source/tutorial/sql/python_udtf.rst b/python/docs/source/tutorial/sql/python_udtf.rst index 26142e42e3633..e2d240a6f9de1 100644 --- a/python/docs/source/tutorial/sql/python_udtf.rst +++ b/python/docs/source/tutorial/sql/python_udtf.rst @@ -429,7 +429,8 @@ Arrow Optimization ------------------ Apache Arrow is an in-memory columnar data format used in Spark to efficiently transfer -data between Java and Python processes. Apache Arrow is disabled by default for Python UDTFs. +data between Java and Python processes. Beginning in Spark 4.2, Apache Arrow is enabled by default for Python UDTFs. +To disable Arrow optimization, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to false. Arrow can improve performance when each input row generates a large result table from the UDTF. diff --git a/python/docs/source/tutorial/sql/type_conversions.rst b/python/docs/source/tutorial/sql/type_conversions.rst index 2a88731ce8197..625a68340f201 100644 --- a/python/docs/source/tutorial/sql/type_conversions.rst +++ b/python/docs/source/tutorial/sql/type_conversions.rst @@ -57,7 +57,7 @@ are listed below: - Default * - spark.sql.execution.pythonUDF.arrow.enabled - Enable PyArrow in PySpark. See more `here `_. - - False + - True * - spark.sql.pyspark.inferNestedDictAsStruct.enabled - When enabled, nested dictionaries are inferred as StructType. Otherwise, they are inferred as MapType. - False diff --git a/python/pyspark/sql/tests/test_unified_udf.py b/python/pyspark/sql/tests/test_unified_udf.py index 2d3446bd0b5ba..4b89d92111c88 100644 --- a/python/pyspark/sql/tests/test_unified_udf.py +++ b/python/pyspark/sql/tests/test_unified_udf.py @@ -351,39 +351,41 @@ def test_regular_python_udf(self): import pandas as pd import pyarrow as pa - @udf(returnType=LongType()) - def f1(x): - return x + 1 + with self.sql_conf({"spark.sql.execution.pythonUDF.arrow.enabled": False}): - @udf(returnType=LongType()) - def f2(x: int) -> int: - return x + 1 + @udf(returnType=LongType()) + def f1(x): + return x + 1 - # Cannot infer a vectorized UDF type - @udf(returnType=LongType()) - def f3(x: int) -> pd.Series: - return x + 1 + @udf(returnType=LongType()) + def f2(x: int) -> int: + return x + 1 - # Cannot infer a vectorized UDF type - @udf(returnType=LongType()) - def f4(x: int) -> pa.Array: - return x + 1 + # Cannot infer a vectorized UDF type + @udf(returnType=LongType()) + def f3(x: int) -> pd.Series: + return x + 1 - # useArrow is explicitly set to false - @udf(returnType=LongType(), useArrow=False) - def f5(x: pd.Series) -> pd.Series: - return x + 1 + # Cannot infer a vectorized UDF type + @udf(returnType=LongType()) + def f4(x: int) -> pa.Array: + return x + 1 - # useArrow is explicitly set to false - @udf(returnType=LongType(), useArrow=False) - def f6(x: pa.Array) -> pa.Array: - return x + 1 + # useArrow is explicitly set to false + @udf(returnType=LongType(), useArrow=False) + def f5(x: pd.Series) -> pd.Series: + return x + 1 - expected = self.spark.range(10).select((sf.col("id") + 1).alias("res")).collect() - for f in [f1, f2, f3, f4, f5, f6]: - self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF) - result = self.spark.range(10).select(f("id").alias("res")).collect() - self.assertEqual(result, expected) + # useArrow is explicitly set to false + @udf(returnType=LongType(), useArrow=False) + def f6(x: pa.Array) -> pa.Array: + return x + 1 + + expected = self.spark.range(10).select((sf.col("id") + 1).alias("res")).collect() + for f in [f1, f2, f3, f4, f5, f6]: + self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF) + result = self.spark.range(10).select(f("id").alias("res")).collect() + self.assertEqual(result, expected) def test_arrow_optimized_python_udf(self): import pandas as pd diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d6fa8b21b21a8..c331f1724854b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3917,7 +3917,7 @@ object SQLConf { .doc("(Deprecated since Spark 3.0, please set 'spark.sql.execution.arrow.pyspark.enabled'.)") .version("2.3.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val ARROW_PYSPARK_EXECUTION_ENABLED = buildConf("spark.sql.execution.arrow.pyspark.enabled") @@ -4263,7 +4263,7 @@ object SQLConf { "can only be enabled when the given function takes at least one argument.") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val PYTHON_UDF_ARROW_CONCURRENCY_LEVEL = buildConf("spark.sql.execution.pythonUDF.arrow.concurrency.level") @@ -4299,7 +4299,7 @@ object SQLConf { .doc("Enable Arrow optimization for Python UDTFs.") .version("3.5.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val PYTHON_TABLE_UDF_LEGACY_PANDAS_CONVERSION_ENABLED = buildConf("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled")