From ad858dcbf0e78bb892ba37d7c2c640a8d863c707 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 28 Oct 2024 14:43:43 +0800 Subject: [PATCH 01/15] reuse config --- python/pyspark/sql/pandas/conversion.py | 3 ++- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 172a4fc4b234..42979af3de71 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -785,8 +785,9 @@ def _create_from_arrow_table( if not isinstance(schema, StructType): schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) + safecheck = self._jconf.arrowSafeTypeConversion() table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( - to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True) + to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safecheck ) # Chunk the Arrow Table into RecordBatches diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5218a683a8fa..fd283bd8a2da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3353,9 +3353,10 @@ object SQLConf { buildConf("spark.sql.execution.pandas.convertToArrowArraySafely") .internal() .doc("When true, Arrow will perform safe type conversion when converting " + - "Pandas.Series to Arrow array during serialization. Arrow will raise errors " + - "when detecting unsafe type conversion like overflow. When false, disabling Arrow's type " + - "check and do type conversions anyway. This config only works for Arrow 0.11.0+.") + "Pandas.Series to Arrow array during serialization, and when casting Arrow tables to " + + "create DataFrames. Arrow will raise errors when detecting unsafe type conversion like " + + "overflow. When false, disabling Arrow's type check and do type conversions anyway. This " + + "config only works for Arrow 0.11.0+.") .version("3.0.0") .booleanConf .createWithDefault(false) From 738aba33992f7607f727e01eae418eae2b80a97b Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 28 Oct 2024 14:57:26 +0800 Subject: [PATCH 02/15] new config --- python/pyspark/sql/pandas/conversion.py | 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 42979af3de71..780c751a9f15 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -785,7 +785,7 @@ def _create_from_arrow_table( if not isinstance(schema, StructType): schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - safecheck = self._jconf.arrowSafeTypeConversion() + safecheck = self._jconf.arrowSafeTypeCasting() table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safecheck ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fd283bd8a2da..f57626737b22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3353,11 +3353,21 @@ object SQLConf { buildConf("spark.sql.execution.pandas.convertToArrowArraySafely") .internal() .doc("When true, Arrow will perform safe type conversion when converting " + - "Pandas.Series to Arrow array during serialization, and when casting Arrow tables to " + + "Pandas.Series to Arrow array during serialization. Arrow will raise errors " + + "when detecting unsafe type conversion like overflow. When false, disabling Arrow's type " + + "check and do type conversions anyway. This config only works for Arrow 0.11.0+.") + .version("3.0.0") + .booleanConf + .createWithDefault(false) + + val ARROW_SAFE_TYPE_CASTING = + buildConf("spark.sql.execution.castArrowTableSafely") + .internal() + .doc("When true, Arrow will perform safe type conversion when casting Arrow tables to " + "create DataFrames. Arrow will raise errors when detecting unsafe type conversion like " + "overflow. When false, disabling Arrow's type check and do type conversions anyway. This " + "config only works for Arrow 0.11.0+.") - .version("3.0.0") + .version("4.0.0") .booleanConf .createWithDefault(false) @@ -6011,6 +6021,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def arrowSafeTypeConversion: Boolean = getConf(SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION) + def arrowSafeTypeCasting: Boolean = getConf(SQLConf.ARROW_SAFE_TYPE_CASTING) + def pysparkWorkerPythonExecutable: Option[String] = getConf(SQLConf.PYSPARK_WORKER_PYTHON_EXECUTABLE) From 6e04074ebab0b27179fa5990662aaf133e6ebbea Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 28 Oct 2024 17:08:31 +0800 Subject: [PATCH 03/15] test --- python/pyspark/sql/tests/test_arrow.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index b71bdb1eece2..7d7be8f36218 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -532,6 +532,26 @@ def test_createDataFrame_arrow_pandas(self): df_pandas = self.spark.createDataFrame(pdf) self.assertEqual(df_arrow.collect(), df_pandas.collect()) + def test_createDataFrame_arrow_safe_cast(self): + data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]} + table = pa.table(data) + schema = StructType( + [StructField("id", IntegerType(), True), StructField("value", IntegerType(), True)] + ) + expected = [ + Row(id=1, value=1215752192), + Row(id=2, value=-1863462912), + Row(id=3, value=-647710720), + ] + + with self.sql_conf({"spark.sql.execution.castArrowTableSafely": False}): + df = self.spark.createDataFrame(table, schema=schema) + self.assertEqual(df.collect(), expected) + + with self.sql_conf({"spark.sql.execution.castArrowTableSafely": True}): + with self.assertRaises(Exception): + self.spark.createDataFrame(table, schema=schema) + def _createDataFrame_toggle(self, data, schema=None): with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): df_no_arrow = self.spark.createDataFrame(data, schema=schema) From 4be8b042f2ff374db0f995324cabcc6b36ae9a5b Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 31 Oct 2024 15:01:05 +0800 Subject: [PATCH 04/15] connect parity --- python/pyspark/sql/connect/session.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 83b0496a8427..8c7b9fb71232 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -611,9 +611,11 @@ def createDataFrame( if not isinstance(schema, StructType): schema = from_arrow_schema(data.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) + safe_cast = self._client.get_configs("spark.sql.execution.castArrowTableSafely") == "true" + _table = ( _check_arrow_table_timestamps_localize(data, schema, True, timezone) - .cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True)) + .cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safe_cast) .rename_columns(schema.names) ) From 73e154653359bb7bcd521e6bad730f47c32c37ca Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 11 Nov 2024 21:01:13 +0800 Subject: [PATCH 05/15] fmt --- python/pyspark/sql/connect/session.py | 9 +++++++-- python/pyspark/sql/pandas/conversion.py | 10 +++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 8c7b9fb71232..23ac8fed1c87 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -611,11 +611,16 @@ def createDataFrame( if not isinstance(schema, StructType): schema = from_arrow_schema(data.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - safe_cast = self._client.get_configs("spark.sql.execution.castArrowTableSafely") == "true" + safe_cast = ( + self._client.get_configs("spark.sql.execution.castArrowTableSafely") == "true" + ) _table = ( _check_arrow_table_timestamps_localize(data, schema, True, timezone) - .cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safe_cast) + .cast( + to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), + safe=safe_cast, + ) .rename_columns(schema.names) ) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 780c751a9f15..574d52e6af10 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -392,7 +392,7 @@ def createDataFrame( # type: ignore[misc] if schema is None: schema = data.schema.names - return self._create_from_arrow_table(data, schema, timezone) + return self._create_from_arrow_table(data, schema, timezone, verifySchema=verifySchema) # `data` is a PandasDataFrameLike object from pyspark.sql.pandas.utils import require_minimum_pandas_version @@ -745,7 +745,11 @@ def create_iter_server(): return df def _create_from_arrow_table( - self, table: "pa.Table", schema: Union[StructType, List[str]], timezone: str + self, + table: "pa.Table", + schema: Union[StructType, List[str]], + timezone: str, + verifySchema: bool, ) -> "DataFrame": """ Create a DataFrame from a given pyarrow.Table by slicing it into partitions then @@ -785,7 +789,7 @@ def _create_from_arrow_table( if not isinstance(schema, StructType): schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - safecheck = self._jconf.arrowSafeTypeCasting() + safecheck = verifySchema or self._jconf.arrowSafeTypeCasting() table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safecheck ) From e08b1f90dedeb6eca3f08f6116c3c4133bbdcb82 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 12 Nov 2024 19:07:17 +0800 Subject: [PATCH 06/15] - conf; classic --- python/pyspark/sql/pandas/conversion.py | 35 ++++++++++++------ python/pyspark/sql/session.py | 37 ++++++++++++------- .../apache/spark/sql/internal/SQLConf.scala | 13 ------- 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 574d52e6af10..456e4065056a 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -27,6 +27,7 @@ ) from warnings import warn +from pyspark._globals import _NoValue, _NoValueType from pyspark.errors.exceptions.captured import unwrap_spark_exception from pyspark.loose_version import LooseVersion from pyspark.util import _load_from_socket @@ -352,7 +353,7 @@ def createDataFrame( self, data: "PandasDataFrameLike", schema: Union[StructType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> "DataFrame": ... @@ -361,7 +362,7 @@ def createDataFrame( self, data: "pa.Table", schema: Union[StructType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> "DataFrame": ... @@ -370,7 +371,7 @@ def createDataFrame( # type: ignore[misc] data: Union["PandasDataFrameLike", "pa.Table"], schema: Optional[Union[StructType, List[str]]] = None, samplingRatio: Optional[float] = None, - verifySchema: bool = True, + verifySchema: Union[_NoValueType, bool] = _NoValue, ) -> "DataFrame": from pyspark.sql import SparkSession @@ -392,7 +393,7 @@ def createDataFrame( # type: ignore[misc] if schema is None: schema = data.schema.names - return self._create_from_arrow_table(data, schema, timezone, verifySchema=verifySchema) + return self._create_from_arrow_table(data, schema, timezone, verifySchema) # `data` is a PandasDataFrameLike object from pyspark.sql.pandas.utils import require_minimum_pandas_version @@ -405,7 +406,7 @@ def createDataFrame( # type: ignore[misc] if self._jconf.arrowPySparkEnabled() and len(data) > 0: try: - return self._create_from_pandas_with_arrow(data, schema, timezone) + return self._create_from_pandas_with_arrow(data, schema, timezone, verifySchema) except Exception as e: if self._jconf.arrowPySparkFallbackEnabled(): msg = ( @@ -624,7 +625,11 @@ def _get_numpy_record_dtype(self, rec: "np.recarray") -> Optional["np.dtype"]: return np.dtype(record_type_list) if has_rec_fix else None def _create_from_pandas_with_arrow( - self, pdf: "PandasDataFrameLike", schema: Union[StructType, List[str]], timezone: str + self, + pdf: "PandasDataFrameLike", + schema: Union[StructType, List[str]], + timezone: str, + verifySchema: Union[_NoValueType, bool], ) -> "DataFrame": """ Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting @@ -657,6 +662,10 @@ def _create_from_pandas_with_arrow( ) import pyarrow as pa + if verifySchema is _NoValue: + # (With Arrow optimization) createDataFrame with `pandas.DataFrame` + verifySchema = False + infer_pandas_dict_as_map = ( str(self.conf.get("spark.sql.execution.pandas.inferPandasDictAsMap")).lower() == "true" ) @@ -725,8 +734,8 @@ def _create_from_pandas_with_arrow( jsparkSession = self._jsparkSession - safecheck = self._jconf.arrowSafeTypeConversion() - ser = ArrowStreamPandasSerializer(timezone, safecheck) + # safecheck = self._jconf.arrowSafeTypeConversion() + ser = ArrowStreamPandasSerializer(timezone, verifySchema) @no_type_check def reader_func(temp_filename): @@ -749,7 +758,7 @@ def _create_from_arrow_table( table: "pa.Table", schema: Union[StructType, List[str]], timezone: str, - verifySchema: bool, + verifySchema: Union[_NoValueType, bool], ) -> "DataFrame": """ Create a DataFrame from a given pyarrow.Table by slicing it into partitions then @@ -771,6 +780,10 @@ def _create_from_arrow_table( require_minimum_pyarrow_version() + if verifySchema is _NoValue: + # createDataFrame with `pyarrow.Table` + verifySchema = False + prefer_timestamp_ntz = is_timestamp_ntz_preferred() # Create the Spark schema from list of names passed in with Arrow types @@ -789,9 +802,9 @@ def _create_from_arrow_table( if not isinstance(schema, StructType): schema = from_arrow_schema(table.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - safecheck = verifySchema or self._jconf.arrowSafeTypeCasting() table = _check_arrow_table_timestamps_localize(table, schema, True, timezone).cast( - to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), safe=safecheck + to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), + safe=verifySchema, ) # Chunk the Arrow Table into RecordBatches diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 4979ce712673..8a3b706a88dc 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -38,6 +38,7 @@ TYPE_CHECKING, ) +from pyspark._globals import _NoValue, _NoValueType from pyspark.conf import SparkConf from pyspark.util import is_remote_only from pyspark.sql.conf import RuntimeConfig @@ -1265,7 +1266,7 @@ def createDataFrame( data: Iterable["RowLike"], schema: Union[StructType, str], *, - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1275,7 +1276,7 @@ def createDataFrame( data: "RDD[RowLike]", schema: Union[StructType, str], *, - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1284,7 +1285,7 @@ def createDataFrame( self, data: "RDD[AtomicValue]", schema: Union[AtomicType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1293,7 +1294,7 @@ def createDataFrame( self, data: Iterable["AtomicValue"], schema: Union[AtomicType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1312,7 +1313,7 @@ def createDataFrame( self, data: "PandasDataFrameLike", schema: Union[StructType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1321,7 +1322,7 @@ def createDataFrame( self, data: "pa.Table", schema: Union[StructType, str], - verifySchema: bool = ..., + verifySchema: Union[_NoValueType, bool] = ..., ) -> DataFrame: ... @@ -1330,7 +1331,7 @@ def createDataFrame( # type: ignore[misc] data: Union["RDD[Any]", Iterable[Any], "PandasDataFrameLike", "ArrayLike", "pa.Table"], schema: Optional[Union[AtomicType, StructType, str]] = None, samplingRatio: Optional[float] = None, - verifySchema: bool = True, + verifySchema: Union[_NoValueType, bool] = _NoValue, ) -> DataFrame: """ Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`, @@ -1374,13 +1375,18 @@ def createDataFrame( # type: ignore[misc] if ``samplingRatio`` is ``None``. This option is effective only when the input is :class:`RDD`. verifySchema : bool, optional - verify data types of every row against schema. Enabled by default. - When the input is :class:`pyarrow.Table` or when the input class is - :class:`pandas.DataFrame` and `spark.sql.execution.arrow.pyspark.enabled` is enabled, - this option is not effective. It follows Arrow type coercion. This option is not - supported with Spark Connect. + verify data types of every row against schema. + If not provided, + - createDataFrame with :class:`pyarrow.Table`, verifySchema = False + - (With Arrow optimization) createDataFrame with :class:`pandas.DataFrame`, verifySchema = False + - (Without Arrow optimization) createDataFrame with :class:`pandas.DataFrame`, verifySchema = True + - createDataFrame with regular Python instances, verifySchema = True + Arrow optimization is enabled/disabled via `spark.sql.execution.arrow.pyspark.enabled`. .. versionadded:: 2.1.0 + .. versionchanged:: 4.0.0 + Adjusts default value to pyspark._NoValue. + Returns ------- @@ -1578,8 +1584,13 @@ def _create_dataframe( data: Union["RDD[Any]", Iterable[Any]], schema: Optional[Union[DataType, List[str]]], samplingRatio: Optional[float], - verifySchema: bool, + verifySchema: Union[_NoValueType, bool], ) -> DataFrame: + if verifySchema is _NoValue: + # createDataFrame with regular Python instances + # or (without Arrow optimization) createDataFrame with Pandas DataFrame + verifySchema = True + if isinstance(schema, StructType): verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f57626737b22..5218a683a8fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3360,17 +3360,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val ARROW_SAFE_TYPE_CASTING = - buildConf("spark.sql.execution.castArrowTableSafely") - .internal() - .doc("When true, Arrow will perform safe type conversion when casting Arrow tables to " + - "create DataFrames. Arrow will raise errors when detecting unsafe type conversion like " + - "overflow. When false, disabling Arrow's type check and do type conversions anyway. This " + - "config only works for Arrow 0.11.0+.") - .version("4.0.0") - .booleanConf - .createWithDefault(false) - val PYSPARK_WORKER_PYTHON_EXECUTABLE = buildConf("spark.sql.execution.pyspark.python") .internal() @@ -6021,8 +6010,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def arrowSafeTypeConversion: Boolean = getConf(SQLConf.PANDAS_ARROW_SAFE_TYPE_CONVERSION) - def arrowSafeTypeCasting: Boolean = getConf(SQLConf.ARROW_SAFE_TYPE_CASTING) - def pysparkWorkerPythonExecutable: Option[String] = getConf(SQLConf.PYSPARK_WORKER_PYTHON_EXECUTABLE) From e93a26a30f12666c52d7fe7c58ea2a15fff95934 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 12 Nov 2024 19:12:33 +0800 Subject: [PATCH 07/15] unuse conf --- python/pyspark/sql/pandas/conversion.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index 456e4065056a..e00c113e5e27 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -734,7 +734,6 @@ def _create_from_pandas_with_arrow( jsparkSession = self._jsparkSession - # safecheck = self._jconf.arrowSafeTypeConversion() ser = ArrowStreamPandasSerializer(timezone, verifySchema) @no_type_check From 71bb3cf48b15aeaf678a929ef48b653546b5b27b Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Tue, 12 Nov 2024 19:27:04 +0800 Subject: [PATCH 08/15] test from arrow table --- python/pyspark/sql/tests/test_arrow.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 7d7be8f36218..bcea1d9ab6a2 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -544,13 +544,11 @@ def test_createDataFrame_arrow_safe_cast(self): Row(id=3, value=-647710720), ] - with self.sql_conf({"spark.sql.execution.castArrowTableSafely": False}): - df = self.spark.createDataFrame(table, schema=schema) - self.assertEqual(df.collect(), expected) + df = self.spark.createDataFrame(table, schema=schema) + self.assertEqual(df.collect(), expected) - with self.sql_conf({"spark.sql.execution.castArrowTableSafely": True}): - with self.assertRaises(Exception): - self.spark.createDataFrame(table, schema=schema) + with self.assertRaises(Exception): + self.spark.createDataFrame(table, schema=schema, verifySchema=True) def _createDataFrame_toggle(self, data, schema=None): with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): From 77faf89495aca76fdeb7e4f9a94671e63bb8c767 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 11:17:01 +0800 Subject: [PATCH 09/15] fmt --- python/pyspark/sql/session.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 8a3b706a88dc..d015106614dc 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1376,11 +1376,11 @@ def createDataFrame( # type: ignore[misc] :class:`RDD`. verifySchema : bool, optional verify data types of every row against schema. - If not provided, - - createDataFrame with :class:`pyarrow.Table`, verifySchema = False - - (With Arrow optimization) createDataFrame with :class:`pandas.DataFrame`, verifySchema = False - - (Without Arrow optimization) createDataFrame with :class:`pandas.DataFrame`, verifySchema = True - - createDataFrame with regular Python instances, verifySchema = True + If not provided, createDataFrame with + - pyarrow.Table, verifySchema=False + - pandas.DataFrame with Arrow optimization, verifySchema=False + - pandas.DataFrame without Arrow optimization, verifySchema=True + - regular Python instances, verifySchema=True Arrow optimization is enabled/disabled via `spark.sql.execution.arrow.pyspark.enabled`. .. versionadded:: 2.1.0 From 95319377883dee3fbd51385e9a27103802f60019 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 11:18:41 +0800 Subject: [PATCH 10/15] restore connect --- python/pyspark/sql/connect/session.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 23ac8fed1c87..83b0496a8427 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -611,16 +611,9 @@ def createDataFrame( if not isinstance(schema, StructType): schema = from_arrow_schema(data.schema, prefer_timestamp_ntz=prefer_timestamp_ntz) - safe_cast = ( - self._client.get_configs("spark.sql.execution.castArrowTableSafely") == "true" - ) - _table = ( _check_arrow_table_timestamps_localize(data, schema, True, timezone) - .cast( - to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True), - safe=safe_cast, - ) + .cast(to_arrow_schema(schema, error_on_duplicated_field_names_in_struct=True)) .rename_columns(schema.names) ) From 95ba8acfbb3f02e0b618eef9f6a6d1e334d25cdb Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 11:20:43 +0800 Subject: [PATCH 11/15] test --- python/pyspark/sql/tests/connect/test_parity_arrow.py | 4 ++++ python/pyspark/sql/tests/test_arrow.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index 885b3001b1db..d47a367a5460 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -137,6 +137,10 @@ def test_toPandas_udt(self): def test_create_dataframe_namedtuples(self): self.check_create_dataframe_namedtuples(True) + @unittest.skip("Spark Connect does not support verifySchema.") + def test_createDataFrame_verifySchema(self): + super().test_createDataFrame_verifySchema() + if __name__ == "__main__": from pyspark.sql.tests.connect.test_parity_arrow import * # noqa: F401 diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index bcea1d9ab6a2..43dff108d9a2 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -532,7 +532,7 @@ def test_createDataFrame_arrow_pandas(self): df_pandas = self.spark.createDataFrame(pdf) self.assertEqual(df_arrow.collect(), df_pandas.collect()) - def test_createDataFrame_arrow_safe_cast(self): + def test_createDataFrame_verifySchema(self): data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]} table = pa.table(data) schema = StructType( From 3bb5f1327246c00d7011bafead2fdc133d2e340c Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 14:33:28 +0800 Subject: [PATCH 12/15] mypy data test --- python/pyspark/sql/tests/typing/test_session.yml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/tests/typing/test_session.yml b/python/pyspark/sql/tests/typing/test_session.yml index d6eee82a7678..98587458efe8 100644 --- a/python/pyspark/sql/tests/typing/test_session.yml +++ b/python/pyspark/sql/tests/typing/test_session.yml @@ -17,6 +17,7 @@ - case: createDataFrameStructsValid main: | + from pyspark._globals import _NoValueType from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType @@ -78,14 +79,14 @@ main:18: note: Possible overload variants: main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[list[str], tuple[str, ...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[list[str], tuple[str, ...]] = ..., samplingRatio: Optional[float] = ...) -> DataFrame - main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], *, verifySchema: bool = ...) -> DataFrame - main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, verifySchema: bool = ...) -> DataFrame - main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame - main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: Union[AtomicType, str], verifySchema: bool = ...) -> DataFrame + main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: Iterable[RowLike], schema: Union[StructType, str], *, verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame + main:18: note: def [RowLike in (list[Any], tuple[Any, ...], Row)] createDataFrame(self, data: RDD[RowLike], schema: Union[StructType, str], *, verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame + main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: RDD[AtomicValue], schema: Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame + main:18: note: def [AtomicValue in (datetime, date, Decimal, bool, str, int, float)] createDataFrame(self, data: Iterable[AtomicValue], schema: Union[AtomicType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame main:18: note: def createDataFrame(self, data: DataFrame, samplingRatio: Optional[float] = ...) -> DataFrame main:18: note: def createDataFrame(self, data: Any, samplingRatio: Optional[float] = ...) -> DataFrame - main:18: note: def createDataFrame(self, data: DataFrame, schema: Union[StructType, str], verifySchema: bool = ...) -> DataFrame - main:18: note: def createDataFrame(self, data: Any, schema: Union[StructType, str], verifySchema: bool = ...) -> DataFrame + main:18: note: def createDataFrame(self, data: DataFrame, schema: Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame + main:18: note: def createDataFrame(self, data: Any, schema: Union[StructType, str], verifySchema: Union[_NoValueType, bool] = ...) -> DataFrame - case: createDataFrameFromEmptyRdd main: | From fa1a99c8e714b36ae0cb8b6ff3866cdced6c6b8f Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 15:08:28 +0800 Subject: [PATCH 13/15] complete tests --- python/pyspark/sql/tests/test_arrow.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 43dff108d9a2..b8ea7b12cf2b 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -534,7 +534,7 @@ def test_createDataFrame_arrow_pandas(self): def test_createDataFrame_verifySchema(self): data = {"id": [1, 2, 3], "value": [100000000000, 200000000000, 300000000000]} - table = pa.table(data) + # data.value should fail schema validation when verifySchema is True schema = StructType( [StructField("id", IntegerType(), True), StructField("value", IntegerType(), True)] ) @@ -543,13 +543,29 @@ def test_createDataFrame_verifySchema(self): Row(id=2, value=-1863462912), Row(id=3, value=-647710720), ] - + # Arrow table + table = pa.table(data) df = self.spark.createDataFrame(table, schema=schema) self.assertEqual(df.collect(), expected) with self.assertRaises(Exception): self.spark.createDataFrame(table, schema=schema, verifySchema=True) + # pandas DataFrame with Arrow optimization + pdf = pd.DataFrame(data) + df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to False + self.assertEqual(df.collect(), expected) + with self.assertRaises(Exception): + df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True) + + # pandas DataFrame without Arrow optimization + with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): + pdf = pd.DataFrame(data) + with self.assertRaises(Exception): + df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to True + df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=False) + self.assertEqual(df.collect(), expected) + def _createDataFrame_toggle(self, data, schema=None): with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": False}): df_no_arrow = self.spark.createDataFrame(data, schema=schema) From b707d2a65f68119f057d0399ce0cfa0adb45f8e1 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 13 Nov 2024 15:39:07 +0800 Subject: [PATCH 14/15] rmv versionchanged --- python/pyspark/sql/session.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index d015106614dc..8a5ef720f050 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1384,9 +1384,6 @@ def createDataFrame( # type: ignore[misc] Arrow optimization is enabled/disabled via `spark.sql.execution.arrow.pyspark.enabled`. .. versionadded:: 2.1.0 - .. versionchanged:: 4.0.0 - Adjusts default value to pyspark._NoValue. - Returns ------- From 3f49f193190fb2d018b7cd10ee5dd2696d81c287 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 14 Nov 2024 14:08:35 +0800 Subject: [PATCH 15/15] defaults to conf --- python/pyspark/sql/pandas/conversion.py | 2 +- python/pyspark/sql/session.py | 3 ++- python/pyspark/sql/tests/test_arrow.py | 7 ++++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/pandas/conversion.py b/python/pyspark/sql/pandas/conversion.py index e00c113e5e27..0c612bf4eae3 100644 --- a/python/pyspark/sql/pandas/conversion.py +++ b/python/pyspark/sql/pandas/conversion.py @@ -664,7 +664,7 @@ def _create_from_pandas_with_arrow( if verifySchema is _NoValue: # (With Arrow optimization) createDataFrame with `pandas.DataFrame` - verifySchema = False + verifySchema = self._jconf.arrowSafeTypeConversion() infer_pandas_dict_as_map = ( str(self.conf.get("spark.sql.execution.pandas.inferPandasDictAsMap")).lower() == "true" diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 8a5ef720f050..ef8750b6e72d 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -1378,7 +1378,8 @@ def createDataFrame( # type: ignore[misc] verify data types of every row against schema. If not provided, createDataFrame with - pyarrow.Table, verifySchema=False - - pandas.DataFrame with Arrow optimization, verifySchema=False + - pandas.DataFrame with Arrow optimization, verifySchema defaults to + `spark.sql.execution.pandas.convertToArrowArraySafely` - pandas.DataFrame without Arrow optimization, verifySchema=True - regular Python instances, verifySchema=True Arrow optimization is enabled/disabled via `spark.sql.execution.arrow.pyspark.enabled`. diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index b8ea7b12cf2b..19d0db989431 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -553,8 +553,13 @@ def test_createDataFrame_verifySchema(self): # pandas DataFrame with Arrow optimization pdf = pd.DataFrame(data) - df = self.spark.createDataFrame(pdf, schema=schema) # verifySchema defaults to False + df = self.spark.createDataFrame(pdf, schema=schema) + # verifySchema defaults to `spark.sql.execution.pandas.convertToArrowArraySafely`, + # which is false by default self.assertEqual(df.collect(), expected) + with self.assertRaises(Exception): + with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": True}): + df = self.spark.createDataFrame(pdf, schema=schema) with self.assertRaises(Exception): df = self.spark.createDataFrame(pdf, schema=schema, verifySchema=True)