From da8dbafcee753ebb2f7f3a205d0b378c240a3469 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 26 Mar 2018 11:18:34 -0700 Subject: [PATCH 1/6] Demonstrate tokenize udf --- python/pyspark/sql/functions.py | 2 +- python/pyspark/sql/tests.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index ad3e37c87262..637dbd5ff0be 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2208,7 +2208,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): 1. SCALAR A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. - The returnType should be a primitive data type, e.g., :class:`DoubleType`. + The returnType should be a primitive data type, e.g., :class:`DoubleType` or arrays of a primitive data type. The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 967cc83166f3..255575b62012 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3806,6 +3806,7 @@ def test_timestamp_dst(self): not _have_pandas or not _have_pyarrow, _pandas_requirement_message or _pyarrow_requirement_message) class PandasUDFTests(ReusedSQLTestCase): + def test_pandas_udf_basic(self): from pyspark.rdd import PythonEvalType from pyspark.sql.functions import pandas_udf, PandasUDFType @@ -3966,6 +3967,15 @@ def random_udf(v): random_udf = random_udf.asNondeterministic() return random_udf + def test_pandas_udf_tokenize(self): + from pyspark.sql.functions import pandas_udf + tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), + ArrayType(StringType())) + self.assertEqual(udf.returnType, ArrayType(StringType())) + df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"]) + result = df.select(tokenize("vals").alias("hi")) + self.assertEqual([], result.collect()) + def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col, array df = self.spark.range(10).select( From 342d2228a5c68fd2c07bd8c1b518da6135ce1bf6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 26 Mar 2018 11:20:51 -0700 Subject: [PATCH 2/6] Long lines are bad, kthnx --- python/pyspark/sql/functions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 637dbd5ff0be..13d756802474 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2208,7 +2208,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): 1. SCALAR A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. - The returnType should be a primitive data type, e.g., :class:`DoubleType` or arrays of a primitive data type. + The returnType should be a primitive data type, e.g., :class:`DoubleType` or + arrays of a primitive data type (e.g. :class:`ArrayType`). The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and From 88b65c5847a9df01acef6b2857acdb4d3fac9019 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 30 Mar 2018 14:25:02 -0700 Subject: [PATCH 3/6] Update description to clarify non-supported types are only map/struct/nested arrays and add a test expected to fail for nested arrays, and fix test for tokenizing. --- python/pyspark/sql/functions.py | 4 ++-- python/pyspark/sql/tests.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 13d756802474..05aea8b8af29 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2208,10 +2208,10 @@ def pandas_udf(f=None, returnType=None, functionType=None): 1. SCALAR A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. - The returnType should be a primitive data type, e.g., :class:`DoubleType` or - arrays of a primitive data type (e.g. :class:`ArrayType`). The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + :class:`MapType`, :class:`StructType`, and nested :class:`ArrayType` are currently not supported as output types. + Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and :meth:`pyspark.sql.DataFrame.select`. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 255575b62012..6a70f047e84b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3971,10 +3971,18 @@ def test_pandas_udf_tokenize(self): from pyspark.sql.functions import pandas_udf tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), ArrayType(StringType())) - self.assertEqual(udf.returnType, ArrayType(StringType())) + self.assertEqual(tokenize.returnType, ArrayType(StringType())) df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"]) result = df.select(tokenize("vals").alias("hi")) - self.assertEqual([], result.collect()) + self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], result.collect()) + + def test_pandas_udf_nested_arrays_does_not_work(self): + from pyspark.sql.functions import pandas_udf + tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]), + ArrayType(ArrayType(StringType()))) + result = df.select(tokenize("vals").alias("hi")) + # If we start supporting nested arrays we should update the documentation in functions.py + self.assertRaises(ArrowTypeError, result.collect()) def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col, array From 091a76167726adfab934dc80357e79c702fb67a4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 2 Apr 2018 11:08:19 -0700 Subject: [PATCH 4/6] Fix long line --- python/pyspark/sql/functions.py | 3 ++- python/pyspark/sql/tests.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 05aea8b8af29..009bf52cd15c 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2210,7 +2210,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. - :class:`MapType`, :class:`StructType`, and nested :class:`ArrayType` are currently not supported as output types. + :class:`MapType`, :class:`StructType`, and nested :class:`ArrayType` are currently not + supported as output types. Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and :meth:`pyspark.sql.DataFrame.select`. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6a70f047e84b..3424a68b635b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3982,7 +3982,8 @@ def test_pandas_udf_nested_arrays_does_not_work(self): ArrayType(ArrayType(StringType()))) result = df.select(tokenize("vals").alias("hi")) # If we start supporting nested arrays we should update the documentation in functions.py - self.assertRaises(ArrowTypeError, result.collect()) + with QuietTest(self.sc): + self.assertRaises(ArrowTypeError, result.collect()) def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col, array From 729abc9c2db3af61fa2c525d3838c10c2033172b Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 6 Apr 2018 11:41:49 -0700 Subject: [PATCH 5/6] Switch nested array test to a positive test --- python/pyspark/sql/tests.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3424a68b635b..b1894d14e6d6 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3976,14 +3976,14 @@ def test_pandas_udf_tokenize(self): result = df.select(tokenize("vals").alias("hi")) self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])], result.collect()) - def test_pandas_udf_nested_arrays_does_not_work(self): + def test_pandas_udf_nested_arrays(self): from pyspark.sql.functions import pandas_udf tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]), ArrayType(ArrayType(StringType()))) + self.assertEqual(tokenize.returnType, ArrayType(StringType())) + df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"]) result = df.select(tokenize("vals").alias("hi")) - # If we start supporting nested arrays we should update the documentation in functions.py - with QuietTest(self.sc): - self.assertRaises(ArrowTypeError, result.collect()) + self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', u'boo']])], result.collect()) def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col, array From 03798e038f3acb1a43067b3eebbdf52ea8639faf Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 13 Aug 2018 10:40:31 -0700 Subject: [PATCH 6/6] Clarify we can support nested array types and add a test demonstrating it. --- python/pyspark/sql/functions.py | 3 +-- python/pyspark/sql/tests.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 01c02799691d..ae34766ff0c1 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2720,8 +2720,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. - :class:`MapType`, :class:`StructType`, and nested :class:`ArrayType` are currently not - supported as output types. + :class:`MapType`, :class:`StructType` are currently not supported as output types. Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and :meth:`pyspark.sql.DataFrame.select`. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6b9007b77611..a739181e1d82 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4587,7 +4587,7 @@ def test_pandas_udf_nested_arrays(self): from pyspark.sql.functions import pandas_udf tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]), ArrayType(ArrayType(StringType()))) - self.assertEqual(tokenize.returnType, ArrayType(StringType())) + self.assertEqual(tokenize.returnType, ArrayType(ArrayType(StringType()))) df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"]) result = df.select(tokenize("vals").alias("hi")) self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye', u'boo']])], result.collect())