From 9b1ce07b885a42b833be9576011daf67e725bff6 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 4 Nov 2022 10:56:48 +0100 Subject: [PATCH 01/17] Correct type error message for mapInArrows, check iterator type --- .../tests/connect/test_parity_pandas_map.py | 4 +-- .../sql/tests/pandas/test_pandas_map.py | 22 ++++++++++----- python/pyspark/sql/tests/test_arrow_map.py | 24 +++++++++++++++++ python/pyspark/worker.py | 27 ++++++++++++++----- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_map.py b/python/pyspark/sql/tests/connect/test_parity_pandas_map.py index 539fd98266b28..f61a866ed9046 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_map.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_map.py @@ -22,8 +22,8 @@ class MapInPandasParityTests(MapInPandasTestsMixin, ReusedConnectTestCase): def test_empty_dataframes_with_less_columns(self): self.check_empty_dataframes_with_less_columns() - def test_other_than_dataframe(self): - self.check_other_than_dataframe() + def test_other_than_dataframe_iter(self): + self.check_other_than_dataframe_iter() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 3d9a90bc81c40..0307747d1efcd 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -94,20 +94,30 @@ def func(iterator): actual = df.repartition(1).mapInPandas(func, "a long").collect() self.assertEqual(set((r.a for r in actual)), set(range(100))) - def test_other_than_dataframe(self): + def test_other_than_dataframe_iter(self): with QuietTest(self.sc): - self.check_other_than_dataframe() + self.check_other_than_dataframe_iter() - def check_other_than_dataframe(self): - def bad_iter(_): + def check_other_than_dataframe_iter(self): + def no_iter(_): + return 1 + + def bad_iter_elem(_): return iter([1]) with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be Pandas.DataFrame, " + "Return type of the user-defined function should be iterator of Pandas.DataFrame, " "but is ", ): - self.spark.range(10, numPartitions=3).mapInPandas(bad_iter, "a int, b string").count() + (self.spark.range(10, numPartitions=3).mapInPandas(no_iter, "a int").count()) + + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be iterator of Pandas.DataFrame, " + "but is iterator of ", + ): + (self.spark.range(10, numPartitions=3).mapInPandas(bad_iter_elem, "a int").count()) def test_empty_iterator(self): def empty_iter(_): diff --git a/python/pyspark/sql/tests/test_arrow_map.py b/python/pyspark/sql/tests/test_arrow_map.py index 050f2c3266507..5e84527ea06e2 100644 --- a/python/pyspark/sql/tests/test_arrow_map.py +++ b/python/pyspark/sql/tests/test_arrow_map.py @@ -18,6 +18,7 @@ import time import unittest +from pyspark.sql.utils import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -25,6 +26,7 @@ pandas_requirement_message, pyarrow_requirement_message, ) +from pyspark.testing.utils import QuietTest if have_pyarrow: import pyarrow as pa @@ -88,6 +90,28 @@ def func(iterator): actual = df.repartition(1).mapInArrow(func, "a long").collect() self.assertEqual(set((r.a for r in actual)), set(range(100))) + def test_other_than_recordbatch_iter(self): + def not_iter(_): + return 1 + + def bad_iter_elem(_): + return iter([1]) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be iterator of PyArrow.RecordBatch, " + "but is ", + ): + (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) + + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be iterator of PyArrow.RecordBatch, " + "but is iterator of ", + ): + (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) + def test_empty_iterator(self): def empty_iter(_): return iter([]) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 1d28e6add2eb7..47bd3c232db01 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -24,6 +24,7 @@ from inspect import currentframe, getframeinfo, getfullargspec import importlib import json +from typing import Iterator # 'resource' is a Unix specific module. has_resource_module = True @@ -133,20 +134,32 @@ def verify_result_length(result, length): ) -def wrap_batch_iter_udf(f, return_type): +def wrap_batch_iter_udf(f, return_type, is_arrow_iter=False): arrow_return_type = to_arrow_type(return_type) + iter_type_label = ( + "PyArrow.RecordBatch" + if is_arrow_iter + else ("Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series") + ) - def verify_result_type(result): - if not hasattr(result, "__len__"): - pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series" + def verify_result(result): + if not isinstance(result, Iterator): raise TypeError( "Return type of the user-defined function should be " - "{}, but is {}".format(pd_type, type(result)) + "iterator of {}, but is {}".format(iter_type_label, type(result)) ) return result + def verify_element(elem): + if not hasattr(elem, "__len__"): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) + ) + return elem + return lambda *iterator: map( - lambda res: (res, arrow_return_type), map(verify_result_type, f(*iterator)) + lambda res: (res, arrow_return_type), map(verify_element, verify_result(f(*iterator))) ) @@ -426,7 +439,7 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: return arg_offsets, wrap_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF: - return arg_offsets, wrap_batch_iter_udf(func, return_type) + return arg_offsets, wrap_batch_iter_udf(func, return_type, is_arrow_iter=True) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: argspec = getfullargspec(chained_func) # signature was lost when wrapping it return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) From c4d4333e76281e1526b7c012aa8eb8cf75d6943f Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Tue, 7 Feb 2023 10:43:17 +0100 Subject: [PATCH 02/17] Improve error messages for applyInPandas --- .../sql/tests/pandas/test_pandas_map.py | 224 ++++++++++++++++-- python/pyspark/worker.py | 83 ++++--- 2 files changed, 252 insertions(+), 55 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 0307747d1efcd..ede7f9c4f1dc2 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -42,15 +42,36 @@ cast(str, pandas_requirement_message or pyarrow_requirement_message), ) class MapInPandasTestsMixin: - def test_map_in_pandas(self): + @staticmethod + def identity_dataframes_iter(*columns: str): def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) - assert pdf.columns == ["id"] + assert pdf.columns.tolist() == list(columns) yield pdf + return func + @staticmethod + def dataframes_wo_column_names_iter(*columns: str): + def func(iterator): + for pdf in iterator: + assert isinstance(pdf, pd.DataFrame) + assert pdf.columns.tolist() == list(columns) + yield pdf.rename(columns=list(pdf.columns).index) + return func + + @staticmethod + def dataframes_and_empty_dataframe_iter(*columns: str): + def func(iterator): + for pdf in iterator: + yield pdf + # after yielding all elements, also yield an empty dataframe with given columns + yield pd.DataFrame([], columns=list(columns)) + return func + + def test_map_in_pandas(self): df = self.spark.range(10, numPartitions=3) - actual = df.mapInPandas(func, "id long").collect() + actual = df.mapInPandas(self.identity_dataframes_iter("id"), "id long").collect() expected = df.collect() self.assertEqual(actual, expected) @@ -85,6 +106,18 @@ def func(iterator): expected = df.collect() self.assertEqual(actual, expected) + def test_no_column_names(self): + data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")] + df = self.spark.createDataFrame(data, "a int, b string") + + def func(iterator): + for pdf in iterator: + yield pdf.rename(columns=list(pdf.columns).index) + + actual = df.mapInPandas(func, df.schema).collect() + expected = df.collect() + self.assertEqual(actual, expected) + def test_different_output_length(self): def func(iterator): for _ in iterator: @@ -119,6 +152,145 @@ def bad_iter_elem(_): ): (self.spark.range(10, numPartitions=3).mapInPandas(bad_iter_elem, "a int").count()) + def test_dataframes_with_other_column_names(self): + def dataframes_with_other_column_names(iterator): + for pdf in iterator: + yield pdf.rename(columns={"id": "iid"}) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id. Unexpected: iid.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .toDF("id", "value") + .mapInPandas(dataframes_with_other_column_names, "id int, value int") + .collect() + ) + + def test_dataframes_with_duplicate_column_names(self): + def dataframes_with_other_column_names(iterator): + for pdf in iterator: + yield pdf.rename(columns={"id2": "id"}) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id2.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("id2", lit(0)) + .withColumn("value", lit(1)) + .toDF("id", "id2", "value") + .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") + .collect() + ) + + def test_dataframes_with_less_columns(self): + df = self.spark.range(10, numPartitions=3).withColumn("value", lit(0)) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id2.\n", + ): + f = self.identity_dataframes_iter("id", "value") + ( + df.mapInPandas(f, "id int, id2 long, value int") + .collect() + ) + + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " + "specified schema. Expected: 3 Actual: 2\n", + ): + f = self.dataframes_wo_column_names_iter("id", "value") + ( + df.mapInPandas(f, "id int, id2 long, value int") + .collect() + ) + + def test_dataframes_with_more_columns(self): + df = ( + self.spark.range(10, numPartitions=3) + .withColumn("id2", lit(0)) + .withColumn("value", lit(1)) + .toDF("id", "id2", "value") + ) + + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Unexpected: id2.\n", + ): + f = self.identity_dataframes_iter("id", "id2", "value") + ( + df.mapInPandas(f, "id int, value int") + .collect() + ) + + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " + "specified schema. Expected: 2 Actual: 3\n", + ): + f = self.dataframes_wo_column_names_iter("id", "id2", "value") + ( + df.mapInPandas(f, "id int, value int") + .collect() + ) + + def test_dataframes_with_incompatible_types(self): + def func(iterator): + for pdf in iterator: + yield pdf.assign(id=pdf["id"].apply(str)) + + with QuietTest(self.sc): + for safely in [True, False]: + with self.subTest(convertToArrowArraySafely=safely), self.sql_conf( + {"spark.sql.execution.pandas.convertToArrowArraySafely": safely} + ): + # sometimes we see ValueErrors + with self.subTest(convert="string to double"): + expected = ( + r"ValueError: Exception thrown when converting pandas.Series \(object\) " + r"with name 'id' to Arrow Array \(double\)." + ) + if safely: + expected = expected + ( + " It can be caused by overflows or other " + "unsafe conversions warned by Arrow. Arrow safe type check " + "can be disabled by using SQL config " + "`spark.sql.execution.pandas.convertToArrowArraySafely`." + ) + with self.assertRaisesRegex(PythonException, expected + "\n"): + ( + self.spark.range(10, numPartitions=3) + .mapInPandas(func, "id double") + .collect() + ) + + # sometimes we see TypeErrors + with self.subTest(convert="double to string"): + with self.assertRaisesRegex( + PythonException, + r"TypeError: Exception thrown when converting pandas.Series \(float64\) " + r"with name 'id' to Arrow Array \(string\).\n", + ): + ( + self.spark.range(10, numPartitions=3).select(col("id").cast("double")) + .mapInPandas(self.identity_dataframes_iter("id"), "id string") + .collect() + ) + def test_empty_iterator(self): def empty_iter(_): return iter([]) @@ -134,16 +306,10 @@ def empty_dataframes(_): self.assertEqual(mapped.count(), 0) def test_empty_dataframes_without_columns(self): - def empty_dataframes_wo_columns(iterator): - for pdf in iterator: - yield pdf - # after yielding all elements of the iterator, also yield one dataframe without columns - yield pd.DataFrame([]) - mapped = ( self.spark.range(10, numPartitions=3) .toDF("id") - .mapInPandas(empty_dataframes_wo_columns, "id int") + .mapInPandas(self.dataframes_and_empty_dataframe_iter(), "id int") ) self.assertEqual(mapped.count(), 10) @@ -152,16 +318,38 @@ def test_empty_dataframes_with_less_columns(self): self.check_empty_dataframes_with_less_columns() def check_empty_dataframes_with_less_columns(self): - def empty_dataframes_with_less_columns(iterator): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: value.\n", + ): + f = self.dataframes_and_empty_dataframe_iter("id") + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .toDF("id", "value") + .mapInPandas(f, "id int, value int") + .collect() + ) + + def test_empty_dataframes_with_other_columns(self): + def empty_dataframes_with_other_columns(iterator): for pdf in iterator: - yield pdf - # after yielding all elements of the iterator, also yield a dataframe with less columns - yield pd.DataFrame([(1,)], columns=["id"]) + yield pdf.rename(columns={"id": "iid"}) - with self.assertRaisesRegex(PythonException, "KeyError: 'value'"): - self.spark.range(10, numPartitions=3).withColumn("value", lit(0)).toDF( - "id", "value" - ).mapInPandas(empty_dataframes_with_less_columns, "id int, value int").collect() + with QuietTest(self.sc): + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id. Unexpected: iid.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .toDF("id", "value") + .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") + .collect() + ) def test_chain_map_partitions_in_pandas(self): def func(iterator): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 47bd3c232db01..0d7cdd685f18d 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -156,6 +156,8 @@ def verify_element(elem): "Return type of the user-defined function should be " "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) ) + if not is_arrow_iter: + verify_pandas_result(elem, return_type, True) return elem return lambda *iterator: map( @@ -166,45 +168,52 @@ def verify_element(elem): def verify_pandas_result(result, return_type, assign_cols_by_name): import pandas as pd - if not isinstance(result, pd.DataFrame): - raise TypeError( - "Return type of the user-defined function should be " - "pandas.DataFrame, but is {}".format(type(result)) - ) + if type(return_type) == StructType: + if not isinstance(result, pd.DataFrame): + raise TypeError( + "Return type of the user-defined function should be " + "pandas.DataFrame, but is {}".format(type(result)) + ) - # check the schema of the result only if it is not empty or has columns - if not result.empty or len(result.columns) != 0: - # if any column name of the result is a string - # the column names of the result have to match the return type - # see create_array in pyspark.sql.pandas.serializers.ArrowStreamPandasSerializer - field_names = set([field.name for field in return_type.fields]) - column_names = set(result.columns) - if ( - assign_cols_by_name - and any(isinstance(name, str) for name in result.columns) - and column_names != field_names - ): - missing = sorted(list(field_names.difference(column_names))) - missing = f" Missing: {', '.join(missing)}." if missing else "" - - extra = sorted(list(column_names.difference(field_names))) - extra = f" Unexpected: {', '.join(extra)}." if extra else "" + # check the schema of the result only if it is not empty or has columns + if not result.empty or len(result.columns) != 0: + # if any column name of the result is a string + # the column names of the result have to match the return type + # see create_array in pyspark.sql.pandas.serializers.ArrowStreamPandasSerializer + field_names = set([field.name for field in return_type.fields]) + column_names = set(result.columns) + if ( + assign_cols_by_name + and any(isinstance(name, str) for name in result.columns) + and column_names != field_names + ): + missing = sorted(list(field_names.difference(column_names))) + missing = f" Missing: {', '.join(missing)}." if missing else "" - raise PySparkRuntimeError( - error_class="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", - message_parameters={ - "missing": missing, - "extra": extra, - }, - ) - # otherwise the number of columns of result have to match the return type - elif len(result.columns) != len(return_type): - raise PySparkRuntimeError( - error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", - message_parameters={ - "expected": str(len(return_type)), - "actual": str(len(result.columns)), - }, + extra = sorted(list(column_names.difference(field_names))) + extra = f" Unexpected: {', '.join(extra)}." if extra else "" + + raise PySparkRuntimeError( + error_class="RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "missing": missing, + "extra": extra, + }, + ) + # otherwise the number of columns of result have to match the return type + elif len(result.columns) != len(return_type): + raise PySparkRuntimeError( + error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", + message_parameters={ + "expected": str(len(return_type)), + "actual": str(len(result.columns)), + }, + ) + else: + if not isinstance(result, pd.Series): + raise TypeError( + "Return type of the user-defined function should be " + "Pandas.Series, but is {}".format(type(result)) ) From 15a13798a4f4a57a0d93e51274e58cad0d373326 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 09:18:22 +0100 Subject: [PATCH 03/17] Rename Pandas.DataFrame in strings and docstrings --- python/pyspark/pandas/frame.py | 2 +- python/pyspark/sql/pandas/serializers.py | 2 +- python/pyspark/sql/tests/pandas/test_pandas_map.py | 4 ++-- python/pyspark/sql/tests/test_arrow_map.py | 4 ++-- python/pyspark/worker.py | 8 ++++---- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 6f2c8389a4c12..d8a3f812c33ab 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -399,7 +399,7 @@ class DataFrame(Frame, Generic[T]): `compute.ops_on_diff_frames` should be turned on; 2, when `data` is a local dataset (Pandas DataFrame/numpy ndarray/list/etc), it will first collect the `index` to driver if necessary, and then apply - the `Pandas.DataFrame(...)` creation internally; + the `pandas.DataFrame(...)` creation internally; Examples -------- diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py index 4c095249957c0..e2bc9ef4cf1a3 100644 --- a/python/pyspark/sql/pandas/serializers.py +++ b/python/pyspark/sql/pandas/serializers.py @@ -155,7 +155,7 @@ def wrap_and_init_stream(): class ArrowStreamPandasSerializer(ArrowStreamSerializer): """ - Serializes Pandas.Series as Arrow data with Arrow streaming format. + Serializes pandas.Series as Arrow data with Arrow streaming format. Parameters ---------- diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index ede7f9c4f1dc2..b2c2611bd9cb9 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -140,14 +140,14 @@ def bad_iter_elem(_): with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of Pandas.DataFrame, " + "Return type of the user-defined function should be iterator of pandas.DataFrame, " "but is ", ): (self.spark.range(10, numPartitions=3).mapInPandas(no_iter, "a int").count()) with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of Pandas.DataFrame, " + "Return type of the user-defined function should be iterator of pandas.DataFrame, " "but is iterator of ", ): (self.spark.range(10, numPartitions=3).mapInPandas(bad_iter_elem, "a int").count()) diff --git a/python/pyspark/sql/tests/test_arrow_map.py b/python/pyspark/sql/tests/test_arrow_map.py index 5e84527ea06e2..4f1398e25eddc 100644 --- a/python/pyspark/sql/tests/test_arrow_map.py +++ b/python/pyspark/sql/tests/test_arrow_map.py @@ -100,14 +100,14 @@ def bad_iter_elem(_): with QuietTest(self.sc): with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of PyArrow.RecordBatch, " + "Return type of the user-defined function should be iterator of pyarrow.RecordBatch, " "but is ", ): (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of PyArrow.RecordBatch, " + "Return type of the user-defined function should be iterator of pyarrow.RecordBatch, " "but is iterator of ", ): (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 0d7cdd685f18d..6abe87a63db4f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -110,7 +110,7 @@ def wrap_scalar_pandas_udf(f, return_type): def verify_result_type(result): if not hasattr(result, "__len__"): - pd_type = "Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series" + pd_type = "pandas.DataFrame" if type(return_type) == StructType else "pandas.Series" raise TypeError( "Return type of the user-defined function should be " "{}, but is {}".format(pd_type, type(result)) @@ -137,9 +137,9 @@ def verify_result_length(result, length): def wrap_batch_iter_udf(f, return_type, is_arrow_iter=False): arrow_return_type = to_arrow_type(return_type) iter_type_label = ( - "PyArrow.RecordBatch" + "pyarrow.RecordBatch" if is_arrow_iter - else ("Pandas.DataFrame" if type(return_type) == StructType else "Pandas.Series") + else ("pandas.DataFrame" if type(return_type) == StructType else "pandas.Series") ) def verify_result(result): @@ -213,7 +213,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name): if not isinstance(result, pd.Series): raise TypeError( "Return type of the user-defined function should be " - "Pandas.Series, but is {}".format(type(result)) + "pandas.Series, but is {}".format(type(result)) ) From 65459edb5b99a5e5e4fc18976c065cf106552a32 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 09:28:54 +0100 Subject: [PATCH 04/17] Remove redundant .toDF from pandas tests --- .../pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 -- python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py | 1 - python/pyspark/sql/tests/pandas/test_pandas_map.py | 6 ------ 3 files changed, 9 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index 8def08323bec3..ecdd51e3ab9c6 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -56,7 +56,6 @@ class CogroupedApplyInPandasTestsMixin: def data1(self): return ( self.spark.range(10) - .toDF("id") .withColumn("ks", array([lit(i) for i in range(20, 30)])) .withColumn("k", explode(col("ks"))) .withColumn("v", col("k") * 10) @@ -67,7 +66,6 @@ def data1(self): def data2(self): return ( self.spark.range(10) - .toDF("id") .withColumn("ks", array([lit(i) for i in range(20, 30)])) .withColumn("k", explode(col("ks"))) .withColumn("v2", col("k") * 100) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 84e61d42843ae..2efb1a3c988aa 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -79,7 +79,6 @@ class GroupedApplyInPandasTestsMixin: def data(self): return ( self.spark.range(10) - .toDF("id") .withColumn("vs", array([lit(i) for i in range(20, 30)])) .withColumn("v", explode(col("vs"))) .drop("vs") diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index b2c2611bd9cb9..aeac87f8428f6 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -166,7 +166,6 @@ def dataframes_with_other_column_names(iterator): ( self.spark.range(10, numPartitions=3) .withColumn("value", lit(0)) - .toDF("id", "value") .mapInPandas(dataframes_with_other_column_names, "id int, value int") .collect() ) @@ -186,7 +185,6 @@ def dataframes_with_other_column_names(iterator): self.spark.range(10, numPartitions=3) .withColumn("id2", lit(0)) .withColumn("value", lit(1)) - .toDF("id", "id2", "value") .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") .collect() ) @@ -222,7 +220,6 @@ def test_dataframes_with_more_columns(self): self.spark.range(10, numPartitions=3) .withColumn("id2", lit(0)) .withColumn("value", lit(1)) - .toDF("id", "id2", "value") ) with QuietTest(self.sc): @@ -308,7 +305,6 @@ def empty_dataframes(_): def test_empty_dataframes_without_columns(self): mapped = ( self.spark.range(10, numPartitions=3) - .toDF("id") .mapInPandas(self.dataframes_and_empty_dataframe_iter(), "id int") ) self.assertEqual(mapped.count(), 10) @@ -327,7 +323,6 @@ def check_empty_dataframes_with_less_columns(self): ( self.spark.range(10, numPartitions=3) .withColumn("value", lit(0)) - .toDF("id", "value") .mapInPandas(f, "id int, value int") .collect() ) @@ -346,7 +341,6 @@ def empty_dataframes_with_other_columns(iterator): ( self.spark.range(10, numPartitions=3) .withColumn("value", lit(0)) - .toDF("id", "value") .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") .collect() ) From 6d817753ac81167ae8c9dfbe3008543cd0687b95 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 10:31:44 +0100 Subject: [PATCH 05/17] DataFrame.mapInPandas allows for extra columns --- .../sql/tests/pandas/test_pandas_map.py | 42 ++++++++----------- python/pyspark/worker.py | 16 ++++--- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index aeac87f8428f6..aabdf442abc94 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -52,7 +52,7 @@ def func(iterator): return func @staticmethod - def dataframes_wo_column_names_iter(*columns: str): + def identity_dataframes_wo_column_names_iter(*columns: str): def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) @@ -209,7 +209,7 @@ def test_dataframes_with_less_columns(self): "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " "specified schema. Expected: 3 Actual: 2\n", ): - f = self.dataframes_wo_column_names_iter("id", "value") + f = self.identity_dataframes_wo_column_names_iter("id", "value") ( df.mapInPandas(f, "id int, id2 long, value int") .collect() @@ -218,32 +218,17 @@ def test_dataframes_with_less_columns(self): def test_dataframes_with_more_columns(self): df = ( self.spark.range(10, numPartitions=3) - .withColumn("id2", lit(0)) - .withColumn("value", lit(1)) + .select("id", col("id").alias("value"), col("id").alias("extra")) ) + expected = df.select("id", "value").collect() - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Unexpected: id2.\n", - ): - f = self.identity_dataframes_iter("id", "id2", "value") - ( - df.mapInPandas(f, "id int, value int") - .collect() - ) + f = self.identity_dataframes_iter("id", "value", "extra") + actual = df.repartition(1).mapInPandas(f, "id long, value long").collect() + self.assertEqual(actual, expected) - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 2 Actual: 3\n", - ): - f = self.dataframes_wo_column_names_iter("id", "id2", "value") - ( - df.mapInPandas(f, "id int, value int") - .collect() - ) + f = self.identity_dataframes_wo_column_names_iter("id", "value", "extra") + actual = df.repartition(1).mapInPandas(f, "id long, value long").collect() + self.assertEqual(actual, expected) def test_dataframes_with_incompatible_types(self): def func(iterator): @@ -327,6 +312,13 @@ def check_empty_dataframes_with_less_columns(self): .collect() ) + def test_empty_dataframes_with_more_columns(self): + mapped = ( + self.spark.range(10, numPartitions=3) + .mapInPandas(self.dataframes_and_empty_dataframe_iter("id", "extra"), "id int") + ) + self.assertEqual(mapped.count(), 10) + def test_empty_dataframes_with_other_columns(self): def empty_dataframes_with_other_columns(iterator): for pdf in iterator: diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 6abe87a63db4f..d26dc6281c339 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -157,7 +157,7 @@ def verify_element(elem): "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) ) if not is_arrow_iter: - verify_pandas_result(elem, return_type, True) + verify_pandas_result(elem, return_type, True, True) return elem return lambda *iterator: map( @@ -165,7 +165,7 @@ def verify_element(elem): ) -def verify_pandas_result(result, return_type, assign_cols_by_name): +def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_return_schema): import pandas as pd if type(return_type) == StructType: @@ -181,7 +181,11 @@ def verify_pandas_result(result, return_type, assign_cols_by_name): # the column names of the result have to match the return type # see create_array in pyspark.sql.pandas.serializers.ArrowStreamPandasSerializer field_names = set([field.name for field in return_type.fields]) - column_names = set(result.columns) + # only the first len(field_names) result columns are considered + # when truncating the return schema + result_columns = (result.columns[:len(field_names)] + if truncate_return_schema else result.columns) + column_names = set(result_columns) if ( assign_cols_by_name and any(isinstance(name, str) for name in result.columns) @@ -201,7 +205,7 @@ def verify_pandas_result(result, return_type, assign_cols_by_name): }, ) # otherwise the number of columns of result have to match the return type - elif len(result.columns) != len(return_type): + elif len(result_columns) != len(return_type): raise PySparkRuntimeError( error_class="RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF", message_parameters={ @@ -232,7 +236,7 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se key_series = left_key_series if not left_df.empty else right_key_series key = tuple(s[0] for s in key_series) result = f(key, left_df, right_df) - verify_pandas_result(result, return_type, _assign_cols_by_name) + verify_pandas_result(result, return_type, _assign_cols_by_name, False) return result @@ -250,7 +254,7 @@ def wrapped(key_series, value_series): elif len(argspec.args) == 2: key = tuple(s[0] for s in key_series) result = f(key, pd.concat(value_series, axis=1)) - verify_pandas_result(result, return_type, _assign_cols_by_name) + verify_pandas_result(result, return_type, _assign_cols_by_name, False) return result From 2fda259fb1d0a483d965c4490b851f31090b8fe6 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 10:55:04 +0100 Subject: [PATCH 06/17] Reformatting Python --- .../sql/tests/pandas/test_pandas_map.py | 51 +++++++++---------- python/pyspark/worker.py | 5 +- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index aabdf442abc94..6d33ef6610ff6 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -49,6 +49,7 @@ def func(iterator): assert isinstance(pdf, pd.DataFrame) assert pdf.columns.tolist() == list(columns) yield pdf + return func @staticmethod @@ -58,6 +59,7 @@ def func(iterator): assert isinstance(pdf, pd.DataFrame) assert pdf.columns.tolist() == list(columns) yield pdf.rename(columns=list(pdf.columns).index) + return func @staticmethod @@ -67,6 +69,7 @@ def func(iterator): yield pdf # after yielding all elements, also yield an empty dataframe with given columns yield pd.DataFrame([], columns=list(columns)) + return func def test_map_in_pandas(self): @@ -183,10 +186,10 @@ def dataframes_with_other_column_names(iterator): ): ( self.spark.range(10, numPartitions=3) - .withColumn("id2", lit(0)) - .withColumn("value", lit(1)) - .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") - .collect() + .withColumn("id2", lit(0)) + .withColumn("value", lit(1)) + .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") + .collect() ) def test_dataframes_with_less_columns(self): @@ -199,10 +202,7 @@ def test_dataframes_with_less_columns(self): "specified schema. Missing: id2.\n", ): f = self.identity_dataframes_iter("id", "value") - ( - df.mapInPandas(f, "id int, id2 long, value int") - .collect() - ) + (df.mapInPandas(f, "id int, id2 long, value int").collect()) with self.assertRaisesRegex( PythonException, @@ -210,15 +210,11 @@ def test_dataframes_with_less_columns(self): "specified schema. Expected: 3 Actual: 2\n", ): f = self.identity_dataframes_wo_column_names_iter("id", "value") - ( - df.mapInPandas(f, "id int, id2 long, value int") - .collect() - ) + (df.mapInPandas(f, "id int, id2 long, value int").collect()) def test_dataframes_with_more_columns(self): - df = ( - self.spark.range(10, numPartitions=3) - .select("id", col("id").alias("value"), col("id").alias("extra")) + df = self.spark.range(10, numPartitions=3).select( + "id", col("id").alias("value"), col("id").alias("extra") ) expected = df.select("id", "value").collect() @@ -268,7 +264,8 @@ def func(iterator): r"with name 'id' to Arrow Array \(string\).\n", ): ( - self.spark.range(10, numPartitions=3).select(col("id").cast("double")) + self.spark.range(10, numPartitions=3) + .select(col("id").cast("double")) .mapInPandas(self.identity_dataframes_iter("id"), "id string") .collect() ) @@ -288,9 +285,8 @@ def empty_dataframes(_): self.assertEqual(mapped.count(), 0) def test_empty_dataframes_without_columns(self): - mapped = ( - self.spark.range(10, numPartitions=3) - .mapInPandas(self.dataframes_and_empty_dataframe_iter(), "id int") + mapped = self.spark.range(10, numPartitions=3).mapInPandas( + self.dataframes_and_empty_dataframe_iter(), "id int" ) self.assertEqual(mapped.count(), 10) @@ -313,9 +309,8 @@ def check_empty_dataframes_with_less_columns(self): ) def test_empty_dataframes_with_more_columns(self): - mapped = ( - self.spark.range(10, numPartitions=3) - .mapInPandas(self.dataframes_and_empty_dataframe_iter("id", "extra"), "id int") + mapped = self.spark.range(10, numPartitions=3).mapInPandas( + self.dataframes_and_empty_dataframe_iter("id", "extra"), "id int" ) self.assertEqual(mapped.count(), 10) @@ -326,15 +321,15 @@ def empty_dataframes_with_other_columns(iterator): with QuietTest(self.sc): with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id. Unexpected: iid.\n", ): ( self.spark.range(10, numPartitions=3) - .withColumn("value", lit(0)) - .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") - .collect() + .withColumn("value", lit(0)) + .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") + .collect() ) def test_chain_map_partitions_in_pandas(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index d26dc6281c339..1c96143a1924e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -183,8 +183,9 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu field_names = set([field.name for field in return_type.fields]) # only the first len(field_names) result columns are considered # when truncating the return schema - result_columns = (result.columns[:len(field_names)] - if truncate_return_schema else result.columns) + result_columns = ( + result.columns[: len(field_names)] if truncate_return_schema else result.columns + ) column_names = set(result_columns) if ( assign_cols_by_name From 8f290cb881a803db1a8f1b8ad5a8d472d9b42823 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 11:15:38 +0100 Subject: [PATCH 07/17] Make mapInPandas work with iterables again --- python/pyspark/sql/tests/pandas/test_pandas_map.py | 7 +++++++ python/pyspark/worker.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 6d33ef6610ff6..6c1915dae7326 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -73,11 +73,18 @@ def func(iterator): return func def test_map_in_pandas(self): + # test returning iterator of DataFrames df = self.spark.range(10, numPartitions=3) actual = df.mapInPandas(self.identity_dataframes_iter("id"), "id long").collect() expected = df.collect() self.assertEqual(actual, expected) + # test returning list of DataFrames + df = self.spark.range(10, numPartitions=3) + actual = df.mapInPandas(lambda it: [pdf for pdf in it], "id long").collect() + expected = df.collect() + self.assertEqual(actual, expected) + def test_multiple_columns(self): data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")] df = self.spark.createDataFrame(data, "a int, b string") diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 1c96143a1924e..c180279ddd04c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -143,7 +143,7 @@ def wrap_batch_iter_udf(f, return_type, is_arrow_iter=False): ) def verify_result(result): - if not isinstance(result, Iterator): + if not isinstance(result, Iterator) and not hasattr(result, "__iter__"): raise TypeError( "Return type of the user-defined function should be " "iterator of {}, but is {}".format(iter_type_label, type(result)) From f14d8e601a0a541768898987497d2c61e6eb3366 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 12:01:00 +0100 Subject: [PATCH 08/17] Fixing Python lints --- python/pyspark/sql/tests/pandas/test_pandas_map.py | 8 ++++---- python/pyspark/sql/tests/test_arrow_map.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 6c1915dae7326..f9674cbbb5416 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -246,8 +246,8 @@ def func(iterator): # sometimes we see ValueErrors with self.subTest(convert="string to double"): expected = ( - r"ValueError: Exception thrown when converting pandas.Series \(object\) " - r"with name 'id' to Arrow Array \(double\)." + r"ValueError: Exception thrown when converting pandas.Series " + r"\(object\) with name 'id' to Arrow Array \(double\)." ) if safely: expected = expected + ( @@ -267,8 +267,8 @@ def func(iterator): with self.subTest(convert="double to string"): with self.assertRaisesRegex( PythonException, - r"TypeError: Exception thrown when converting pandas.Series \(float64\) " - r"with name 'id' to Arrow Array \(string\).\n", + r"TypeError: Exception thrown when converting pandas.Series " + r"\(float64\) with name 'id' to Arrow Array \(string\).\n", ): ( self.spark.range(10, numPartitions=3) diff --git a/python/pyspark/sql/tests/test_arrow_map.py b/python/pyspark/sql/tests/test_arrow_map.py index 4f1398e25eddc..546f35b807505 100644 --- a/python/pyspark/sql/tests/test_arrow_map.py +++ b/python/pyspark/sql/tests/test_arrow_map.py @@ -100,15 +100,15 @@ def bad_iter_elem(_): with QuietTest(self.sc): with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of pyarrow.RecordBatch, " - "but is ", + "Return type of the user-defined function should be iterator " + "of pyarrow.RecordBatch, but is ", ): (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be iterator of pyarrow.RecordBatch, " - "but is iterator of ", + "Return type of the user-defined function should be iterator " + "of pyarrow.RecordBatch, but is iterator of ", ): (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) From b3caabb05f7b0d38a6d811bc1ad4ad4adb9b1191 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Thu, 9 Feb 2023 15:15:35 +0100 Subject: [PATCH 09/17] Assert actual element type, not __len__ attribute --- python/pyspark/worker.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c180279ddd04c..ea10a2add482f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -151,13 +151,25 @@ def verify_result(result): return result def verify_element(elem): - if not hasattr(elem, "__len__"): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) - ) - if not is_arrow_iter: + if is_arrow_iter: + import pyarrow as pa + + if not isinstance(elem, pa.RecordBatch): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) + ) + else: + import pandas as pd + + if not isinstance(elem, pd.DataFrame if type(return_type) == StructType else pd.Series): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) + ) + verify_pandas_result(elem, return_type, True, True) + return elem return lambda *iterator: map( From d095274b7464e910f3ec11f026937cf067c0c7ca Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Tue, 28 Feb 2023 16:39:41 +0100 Subject: [PATCH 10/17] Remove QuietTest from MapInPandasParityTests, skip failing test --- .../tests/connect/test_parity_pandas_map.py | 23 ++- .../sql/tests/pandas/test_pandas_map.py | 193 ++++++++++-------- 2 files changed, 125 insertions(+), 91 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_map.py b/python/pyspark/sql/tests/connect/test_parity_pandas_map.py index f61a866ed9046..6ff9b0cb33b28 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_map.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_map.py @@ -14,16 +14,35 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import unittest + + from pyspark.sql.tests.pandas.test_pandas_map import MapInPandasTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase class MapInPandasParityTests(MapInPandasTestsMixin, ReusedConnectTestCase): + def test_other_than_dataframe_iter(self): + self.check_other_than_dataframe_iter() + + def test_dataframes_with_other_column_names(self): + self.check_dataframes_with_other_column_names() + + def test_dataframes_with_duplicate_column_names(self): + self.check_dataframes_with_duplicate_column_names() + + def test_dataframes_with_less_columns(self): + self.check_dataframes_with_less_columns() + + @unittest.skip("Fails in Spark Connect, should enable.") + def test_dataframes_with_incompatible_types(self): + self.check_dataframes_with_incompatible_types() + def test_empty_dataframes_with_less_columns(self): self.check_empty_dataframes_with_less_columns() - def test_other_than_dataframe_iter(self): - self.check_other_than_dataframe_iter() + def test_empty_dataframes_with_other_columns(self): + self.check_empty_dataframes_with_other_columns() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index f9674cbbb5416..7849f688a1fd8 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -163,61 +163,70 @@ def bad_iter_elem(_): (self.spark.range(10, numPartitions=3).mapInPandas(bad_iter_elem, "a int").count()) def test_dataframes_with_other_column_names(self): + with QuietTest(self.sc): + self.check_dataframes_with_other_column_names() + + def check_dataframes_with_other_column_names(self): def dataframes_with_other_column_names(iterator): for pdf in iterator: yield pdf.rename(columns={"id": "iid"}) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", - ): - ( - self.spark.range(10, numPartitions=3) - .withColumn("value", lit(0)) - .mapInPandas(dataframes_with_other_column_names, "id int, value int") - .collect() - ) + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id. Unexpected: iid.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .mapInPandas(dataframes_with_other_column_names, "id int, value int") + .collect() + ) def test_dataframes_with_duplicate_column_names(self): + with QuietTest(self.sc): + self.check_dataframes_with_duplicate_column_names() + + def check_dataframes_with_duplicate_column_names(self): def dataframes_with_other_column_names(iterator): for pdf in iterator: yield pdf.rename(columns={"id2": "id"}) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", - ): - ( - self.spark.range(10, numPartitions=3) - .withColumn("id2", lit(0)) - .withColumn("value", lit(1)) - .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") - .collect() - ) + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id2.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("id2", lit(0)) + .withColumn("value", lit(1)) + .mapInPandas(dataframes_with_other_column_names, "id int, id2 long, value int") + .collect() + ) def test_dataframes_with_less_columns(self): + with QuietTest(self.sc): + self.check_dataframes_with_less_columns() + + def check_dataframes_with_less_columns(self): df = self.spark.range(10, numPartitions=3).withColumn("value", lit(0)) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id2.\n", - ): - f = self.identity_dataframes_iter("id", "value") - (df.mapInPandas(f, "id int, id2 long, value int").collect()) + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id2.\n", + ): + f = self.identity_dataframes_iter("id", "value") + (df.mapInPandas(f, "id int, id2 long, value int").collect()) - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " - "specified schema. Expected: 3 Actual: 2\n", - ): - f = self.identity_dataframes_wo_column_names_iter("id", "value") - (df.mapInPandas(f, "id int, id2 long, value int").collect()) + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " + "specified schema. Expected: 3 Actual: 2\n", + ): + f = self.identity_dataframes_wo_column_names_iter("id", "value") + (df.mapInPandas(f, "id int, id2 long, value int").collect()) def test_dataframes_with_more_columns(self): df = self.spark.range(10, numPartitions=3).select( @@ -234,48 +243,51 @@ def test_dataframes_with_more_columns(self): self.assertEqual(actual, expected) def test_dataframes_with_incompatible_types(self): + with QuietTest(self.sc): + self.check_dataframes_with_incompatible_types() + + def check_dataframes_with_incompatible_types(self): def func(iterator): for pdf in iterator: yield pdf.assign(id=pdf["id"].apply(str)) - with QuietTest(self.sc): - for safely in [True, False]: - with self.subTest(convertToArrowArraySafely=safely), self.sql_conf( - {"spark.sql.execution.pandas.convertToArrowArraySafely": safely} - ): - # sometimes we see ValueErrors - with self.subTest(convert="string to double"): - expected = ( - r"ValueError: Exception thrown when converting pandas.Series " - r"\(object\) with name 'id' to Arrow Array \(double\)." + for safely in [True, False]: + with self.subTest(convertToArrowArraySafely=safely), self.sql_conf( + {"spark.sql.execution.pandas.convertToArrowArraySafely": safely} + ): + # sometimes we see ValueErrors + with self.subTest(convert="string to double"): + expected = ( + r"ValueError: Exception thrown when converting pandas.Series " + r"\(object\) with name 'id' to Arrow Array \(double\)." + ) + if safely: + expected = expected + ( + " It can be caused by overflows or other " + "unsafe conversions warned by Arrow. Arrow safe type check " + "can be disabled by using SQL config " + "`spark.sql.execution.pandas.convertToArrowArraySafely`." + ) + with self.assertRaisesRegex(PythonException, expected + "\n"): + ( + self.spark.range(10, numPartitions=3) + .mapInPandas(func, "id double") + .collect() + ) + + # sometimes we see TypeErrors + with self.subTest(convert="double to string"): + with self.assertRaisesRegex( + PythonException, + r"TypeError: Exception thrown when converting pandas.Series " + r"\(float64\) with name 'id' to Arrow Array \(string\).\n", + ): + ( + self.spark.range(10, numPartitions=3) + .select(col("id").cast("double")) + .mapInPandas(self.identity_dataframes_iter("id"), "id string") + .collect() ) - if safely: - expected = expected + ( - " It can be caused by overflows or other " - "unsafe conversions warned by Arrow. Arrow safe type check " - "can be disabled by using SQL config " - "`spark.sql.execution.pandas.convertToArrowArraySafely`." - ) - with self.assertRaisesRegex(PythonException, expected + "\n"): - ( - self.spark.range(10, numPartitions=3) - .mapInPandas(func, "id double") - .collect() - ) - - # sometimes we see TypeErrors - with self.subTest(convert="double to string"): - with self.assertRaisesRegex( - PythonException, - r"TypeError: Exception thrown when converting pandas.Series " - r"\(float64\) with name 'id' to Arrow Array \(string\).\n", - ): - ( - self.spark.range(10, numPartitions=3) - .select(col("id").cast("double")) - .mapInPandas(self.identity_dataframes_iter("id"), "id string") - .collect() - ) def test_empty_iterator(self): def empty_iter(_): @@ -322,22 +334,25 @@ def test_empty_dataframes_with_more_columns(self): self.assertEqual(mapped.count(), 10) def test_empty_dataframes_with_other_columns(self): + with QuietTest(self.sc): + self.check_empty_dataframes_with_other_columns() + + def check_empty_dataframes_with_other_columns(self): def empty_dataframes_with_other_columns(iterator): for pdf in iterator: yield pdf.rename(columns={"id": "iid"}) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " - "specified schema. Missing: id. Unexpected: iid.\n", - ): - ( - self.spark.range(10, numPartitions=3) - .withColumn("value", lit(0)) - .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") - .collect() - ) + with self.assertRaisesRegex( + PythonException, + "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "specified schema. Missing: id. Unexpected: iid.\n", + ): + ( + self.spark.range(10, numPartitions=3) + .withColumn("value", lit(0)) + .mapInPandas(empty_dataframes_with_other_columns, "id int, value int") + .collect() + ) def test_chain_map_partitions_in_pandas(self): def func(iterator): From b8b7da118bd07f6345f0f0c553855249414312b9 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Wed, 24 May 2023 14:50:00 +0200 Subject: [PATCH 11/17] Fix test_other_than_recordbatch_iter in ArrowMapParityTests --- .../tests/connect/test_parity_arrow_map.py | 3 +- python/pyspark/sql/tests/test_arrow_map.py | 31 ++++++++++--------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow_map.py b/python/pyspark/sql/tests/connect/test_parity_arrow_map.py index ed51d0d3d1996..868aeaeff7fe6 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow_map.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow_map.py @@ -22,7 +22,8 @@ class ArrowMapParityTests(MapInArrowTestsMixin, ReusedConnectTestCase): - pass + def test_other_than_recordbatch_iter(self): + self.check_other_than_recordbatch_iter() if __name__ == "__main__": diff --git a/python/pyspark/sql/tests/test_arrow_map.py b/python/pyspark/sql/tests/test_arrow_map.py index 546f35b807505..325964bb71079 100644 --- a/python/pyspark/sql/tests/test_arrow_map.py +++ b/python/pyspark/sql/tests/test_arrow_map.py @@ -91,26 +91,29 @@ def func(iterator): self.assertEqual(set((r.a for r in actual)), set(range(100))) def test_other_than_recordbatch_iter(self): + with QuietTest(self.sc): + self.check_other_than_recordbatch_iter() + + def check_other_than_recordbatch_iter(self): def not_iter(_): return 1 def bad_iter_elem(_): return iter([1]) - with QuietTest(self.sc): - with self.assertRaisesRegex( - PythonException, - "Return type of the user-defined function should be iterator " - "of pyarrow.RecordBatch, but is ", - ): - (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) - - with self.assertRaisesRegex( - PythonException, - "Return type of the user-defined function should be iterator " - "of pyarrow.RecordBatch, but is iterator of ", - ): - (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be iterator " + "of pyarrow.RecordBatch, but is ", + ): + (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) + + with self.assertRaisesRegex( + PythonException, + "Return type of the user-defined function should be iterator " + "of pyarrow.RecordBatch, but is iterator of ", + ): + (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) def test_empty_iterator(self): def empty_iter(_): From 291fe79e812dc2569f9794315fef803fd7ddbbec Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 10 Jul 2023 10:26:22 +0200 Subject: [PATCH 12/17] Split wrap_batch_iter_udf into wrap_pandas_batch_iter_udf and wrap_arrow_batch_iter_udf --- python/pyspark/worker.py | 64 ++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index ea10a2add482f..f471ef495c325 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -134,13 +134,9 @@ def verify_result_length(result, length): ) -def wrap_batch_iter_udf(f, return_type, is_arrow_iter=False): +def wrap_pandas_batch_iter_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) - iter_type_label = ( - "pyarrow.RecordBatch" - if is_arrow_iter - else ("pandas.DataFrame" if type(return_type) == StructType else "pandas.Series") - ) + iter_type_label = "pandas.DataFrame" if type(return_type) == StructType else "pandas.Series" def verify_result(result): if not isinstance(result, Iterator) and not hasattr(result, "__iter__"): @@ -151,24 +147,15 @@ def verify_result(result): return result def verify_element(elem): - if is_arrow_iter: - import pyarrow as pa - - if not isinstance(elem, pa.RecordBatch): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) - ) - else: - import pandas as pd + import pandas as pd - if not isinstance(elem, pd.DataFrame if type(return_type) == StructType else pd.Series): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) - ) + if not isinstance(elem, pd.DataFrame if type(return_type) == StructType else pd.Series): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) + ) - verify_pandas_result(elem, return_type, True, True) + verify_pandas_result(elem, return_type, True, True) return elem @@ -234,6 +221,33 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu ) +def wrap_arrow_batch_iter_udf(f, return_type): + arrow_return_type = to_arrow_type(return_type) + + def verify_result(result): + if not isinstance(result, Iterator) and not hasattr(result, "__iter__"): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of pyarrow.RecordBatch, but is {}".format(type(result)) + ) + return result + + def verify_element(elem): + import pyarrow as pa + + if not isinstance(elem, pa.RecordBatch): + raise TypeError( + "Return type of the user-defined function should be " + "iterator of pyarrow.RecordBatch, but is iterator of {}".format(type(elem)) + ) + + return elem + + return lambda *iterator: map( + lambda res: (res, arrow_return_type), map(verify_element, verify_result(f(*iterator))) + ) + + def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, runner_conf): _assign_cols_by_name = assign_cols_by_name(runner_conf) @@ -461,11 +475,11 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index): if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, PythonEvalType.SQL_ARROW_BATCHED_UDF): return arg_offsets, wrap_scalar_pandas_udf(func, return_type) elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: - return arg_offsets, wrap_batch_iter_udf(func, return_type) + return arg_offsets, wrap_pandas_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: - return arg_offsets, wrap_batch_iter_udf(func, return_type) + return arg_offsets, wrap_pandas_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF: - return arg_offsets, wrap_batch_iter_udf(func, return_type, is_arrow_iter=True) + return arg_offsets, wrap_arrow_batch_iter_udf(func, return_type) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: argspec = getfullargspec(chained_func) # signature was lost when wrapping it return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec, runner_conf) From 696dc3fb3582cf643affacc9e08af077f9b163d1 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 10 Jul 2023 10:46:34 +0200 Subject: [PATCH 13/17] Really test with empty dataframe --- python/pyspark/sql/tests/pandas/test_pandas_map.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 7849f688a1fd8..843edda14cc21 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -339,8 +339,8 @@ def test_empty_dataframes_with_other_columns(self): def check_empty_dataframes_with_other_columns(self): def empty_dataframes_with_other_columns(iterator): - for pdf in iterator: - yield pdf.rename(columns={"id": "iid"}) + for _ in iterator: + yield pd.DataFrame({"iid": [], "value": []}) with self.assertRaisesRegex( PythonException, From 962a943f3a107547a434c35f6d27df7ac6be025c Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 10 Jul 2023 11:32:53 +0200 Subject: [PATCH 14/17] Fix pandas map tests --- .../sql/tests/pandas/test_pandas_map.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 843edda14cc21..6ea7bdcb19230 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -23,7 +23,7 @@ from pyspark.sql import Row from pyspark.sql.functions import col, encode, lit -from pyspark.errors import PythonException +from pyspark.errors import PythonException, PySparkRuntimeError from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -173,7 +173,8 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: id. Unexpected: iid.\n", ): ( @@ -194,7 +195,8 @@ def dataframes_with_other_column_names(iterator): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: id2.\n", ): ( @@ -214,7 +216,8 @@ def check_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: id2.\n", ): f = self.identity_dataframes_iter("id", "value") @@ -222,7 +225,8 @@ def check_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "RuntimeError: Number of columns of the returned pandas.DataFrame doesn't match " + "PySparkRuntimeError: \\[RESULT_LENGTH_MISMATCH_FOR_PANDAS_UDF\\] " + "Number of columns of the returned pandas.DataFrame doesn't match " "specified schema. Expected: 3 Actual: 2\n", ): f = self.identity_dataframes_wo_column_names_iter("id", "value") @@ -316,7 +320,8 @@ def test_empty_dataframes_with_less_columns(self): def check_empty_dataframes_with_less_columns(self): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: value.\n", ): f = self.dataframes_and_empty_dataframe_iter("id") @@ -344,7 +349,8 @@ def empty_dataframes_with_other_columns(iterator): with self.assertRaisesRegex( PythonException, - "RuntimeError: Column names of the returned pandas.DataFrame do not match " + "PySparkRuntimeError: \\[RESULT_COLUMNS_MISMATCH_FOR_PANDAS_UDF\\] " + "Column names of the returned pandas.DataFrame do not match " "specified schema. Missing: id. Unexpected: iid.\n", ): ( From 579a47323cc41ae702160943c4322daa004430b9 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Mon, 10 Jul 2023 14:12:47 +0200 Subject: [PATCH 15/17] Use PySparkTypeError instead of TypeError --- python/pyspark/errors/error_classes.py | 5 ++ .../tests/pandas/test_pandas_cogrouped_map.py | 2 +- .../tests/pandas/test_pandas_grouped_map.py | 2 +- .../sql/tests/pandas/test_pandas_map.py | 6 +- python/pyspark/sql/tests/test_arrow_map.py | 4 +- python/pyspark/worker.py | 86 ++++++++++++------- 6 files changed, 67 insertions(+), 38 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index e87d37c63e77b..fb8483e00f29c 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -643,6 +643,11 @@ "Expected values for ``, got ." ] }, + "UDF_RETURN_TYPE" : { + "message" : [ + "Return type of the user-defined function should be , but is ." + ] + }, "UNEXPECTED_RESPONSE_FROM_SERVER" : { "message" : [ "Unexpected response from iterator server." diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index ecdd51e3ab9c6..b867156e71a5d 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -166,7 +166,7 @@ def check_apply_in_pandas_not_returning_pandas_dataframe(self): fn=lambda lft, rgt: lft.size + rgt.size, error_class=PythonException, error_message_regex="Return type of the user-defined function " - "should be pandas.DataFrame, but is ", + "should be pandas.DataFrame, but is int64.", ) def test_apply_in_pandas_returning_column_names(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 2efb1a3c988aa..9e70402e93b4a 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -287,7 +287,7 @@ def check_apply_in_pandas_not_returning_pandas_dataframe(self): with self.assertRaisesRegex( PythonException, "Return type of the user-defined function should be pandas.DataFrame, " - "but is ", + "but is tuple.", ): self._test_apply_in_pandas(lambda key, pdf: key) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index 6ea7bdcb19230..fb2f9214c5d8f 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -23,7 +23,7 @@ from pyspark.sql import Row from pyspark.sql.functions import col, encode, lit -from pyspark.errors import PythonException, PySparkRuntimeError +from pyspark.errors import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -151,14 +151,14 @@ def bad_iter_elem(_): with self.assertRaisesRegex( PythonException, "Return type of the user-defined function should be iterator of pandas.DataFrame, " - "but is ", + "but is int.", ): (self.spark.range(10, numPartitions=3).mapInPandas(no_iter, "a int").count()) with self.assertRaisesRegex( PythonException, "Return type of the user-defined function should be iterator of pandas.DataFrame, " - "but is iterator of ", + "but is iterator of int.", ): (self.spark.range(10, numPartitions=3).mapInPandas(bad_iter_elem, "a int").count()) diff --git a/python/pyspark/sql/tests/test_arrow_map.py b/python/pyspark/sql/tests/test_arrow_map.py index 325964bb71079..15367743585e3 100644 --- a/python/pyspark/sql/tests/test_arrow_map.py +++ b/python/pyspark/sql/tests/test_arrow_map.py @@ -104,14 +104,14 @@ def bad_iter_elem(_): with self.assertRaisesRegex( PythonException, "Return type of the user-defined function should be iterator " - "of pyarrow.RecordBatch, but is ", + "of pyarrow.RecordBatch, but is int.", ): (self.spark.range(10, numPartitions=3).mapInArrow(not_iter, "a int").count()) with self.assertRaisesRegex( PythonException, "Return type of the user-defined function should be iterator " - "of pyarrow.RecordBatch, but is iterator of ", + "of pyarrow.RecordBatch, but is iterator of int.", ): (self.spark.range(10, numPartitions=3).mapInArrow(bad_iter_elem, "a int").count()) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index f471ef495c325..cef59032aa4b1 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -65,7 +65,7 @@ from pyspark.sql.types import StructType, _parse_datatype_json_string from pyspark.util import fail_on_stopiteration, try_simplify_traceback from pyspark import shuffle -from pyspark.errors import PySparkRuntimeError +from pyspark.errors import PySparkRuntimeError, PySparkTypeError pickleSer = CPickleSerializer() utf8_deserializer = UTF8Deserializer() @@ -111,9 +111,12 @@ def wrap_scalar_pandas_udf(f, return_type): def verify_result_type(result): if not hasattr(result, "__len__"): pd_type = "pandas.DataFrame" if type(return_type) == StructType else "pandas.Series" - raise TypeError( - "Return type of the user-defined function should be " - "{}, but is {}".format(pd_type, type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": pd_type, + "actual": type(result).__name__, + }, ) return result @@ -140,9 +143,12 @@ def wrap_pandas_batch_iter_udf(f, return_type): def verify_result(result): if not isinstance(result, Iterator) and not hasattr(result, "__iter__"): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of {}, but is {}".format(iter_type_label, type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterator of {}".format(iter_type_label), + "actual": type(result).__name__, + }, ) return result @@ -150,9 +156,12 @@ def verify_element(elem): import pandas as pd if not isinstance(elem, pd.DataFrame if type(return_type) == StructType else pd.Series): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of {}, but is iterator of {}".format(iter_type_label, type(elem)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterator of {}".format(iter_type_label), + "actual": "iterator of {}".format(type(elem).__name__), + }, ) verify_pandas_result(elem, return_type, True, True) @@ -169,9 +178,12 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu if type(return_type) == StructType: if not isinstance(result, pd.DataFrame): - raise TypeError( - "Return type of the user-defined function should be " - "pandas.DataFrame, but is {}".format(type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "pandas.DataFrame", + "actual": type(result).__name__, + }, ) # check the schema of the result only if it is not empty or has columns @@ -215,9 +227,9 @@ def verify_pandas_result(result, return_type, assign_cols_by_name, truncate_retu ) else: if not isinstance(result, pd.Series): - raise TypeError( - "Return type of the user-defined function should be " - "pandas.Series, but is {}".format(type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={"expected": "pandas.Series", "actual": type(result).__name__}, ) @@ -226,9 +238,12 @@ def wrap_arrow_batch_iter_udf(f, return_type): def verify_result(result): if not isinstance(result, Iterator) and not hasattr(result, "__iter__"): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of pyarrow.RecordBatch, but is {}".format(type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterator of pyarrow.RecordBatch", + "actual": type(result).__name__, + }, ) return result @@ -236,9 +251,12 @@ def verify_element(elem): import pyarrow as pa if not isinstance(elem, pa.RecordBatch): - raise TypeError( - "Return type of the user-defined function should be " - "iterator of pyarrow.RecordBatch, but is iterator of {}".format(type(elem)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterator of pyarrow.RecordBatch", + "actual": "iterator of {}".format(type(elem).__name__), + }, ) return elem @@ -330,9 +348,12 @@ def wrapped(key_series, value_series_gen, state): def verify_element(result): if not isinstance(result, pd.DataFrame): - raise TypeError( - "The type of element in return iterator of the user-defined function " - "should be pandas.DataFrame, but is {}".format(type(result)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterator of pandas.DataFrame", + "actual": "iterator of {}".format(type(result).__name__), + }, ) # the number of columns of result have to match the return type # but it is fine for result to have no columns at all if it is empty @@ -351,17 +372,20 @@ def verify_element(result): return result if isinstance(result_iter, pd.DataFrame): - raise TypeError( - "Return type of the user-defined function should be " - "iterable of pandas.DataFrame, but is {}".format(type(result_iter)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={ + "expected": "iterable of pandas.DataFrame", + "actual": type(result_iter).__name__, + }, ) try: iter(result_iter) except TypeError: - raise TypeError( - "Return type of the user-defined function should be " - "iterable, but is {}".format(type(result_iter)) + raise PySparkTypeError( + error_class="UDF_RETURN_TYPE", + message_parameters={"expected": "iterable", "actual": type(result_iter).__name__}, ) result_iter_with_validation = (verify_element(x) for x in result_iter) From 7328294ce3a319e8ae788379ece04809da91f5c2 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Tue, 11 Jul 2023 08:15:35 +0200 Subject: [PATCH 16/17] Fix lint --- python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index 9e70402e93b4a..742b3657f6e75 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -286,8 +286,7 @@ def test_apply_in_pandas_not_returning_pandas_dataframe(self): def check_apply_in_pandas_not_returning_pandas_dataframe(self): with self.assertRaisesRegex( PythonException, - "Return type of the user-defined function should be pandas.DataFrame, " - "but is tuple.", + "Return type of the user-defined function should be pandas.DataFrame, but is tuple.", ): self._test_apply_in_pandas(lambda key, pdf: key) From 6dd87eccca6c91c0a5333a2b390b86cb51fe141c Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 14 Jul 2023 08:16:15 +0200 Subject: [PATCH 17/17] Add trucate_return_schema to wrap_arrow_udtf --- python/pyspark/worker.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c9bccc718da7d..8d07772b2148e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -165,7 +165,9 @@ def verify_element(elem): }, ) - verify_pandas_result(elem, return_type, True, True) + verify_pandas_result( + elem, return_type, assign_cols_by_name=True, truncate_return_schema=True + ) return elem @@ -282,7 +284,9 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se key_series = left_key_series if not left_df.empty else right_key_series key = tuple(s[0] for s in key_series) result = f(key, left_df, right_df) - verify_pandas_result(result, return_type, _assign_cols_by_name, False) + verify_pandas_result( + result, return_type, _assign_cols_by_name, truncate_return_schema=False + ) return result @@ -300,7 +304,9 @@ def wrapped(key_series, value_series): elif len(argspec.args) == 2: key = tuple(s[0] for s in key_series) result = f(key, pd.concat(value_series, axis=1)) - verify_pandas_result(result, return_type, _assign_cols_by_name, False) + verify_pandas_result( + result, return_type, _assign_cols_by_name, truncate_return_schema=False + ) return result @@ -621,7 +627,9 @@ def verify_result(result): ) # Verify the type and the schema of the result. - verify_pandas_result(result, return_type, assign_cols_by_name=False) + verify_pandas_result( + result, return_type, assign_cols_by_name=False, truncate_return_schema=False + ) return result return lambda *a: map(lambda res: (res, arrow_return_type), map(verify_result, f(*a)))