Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3122,6 +3122,185 @@ def test_filtered_frame(self):
self.assertTrue(pdf.empty)


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
class VectorizedUDFTests(ReusedPySparkTestCase):

@classmethod
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)

@classmethod
def tearDownClass(cls):
ReusedPySparkTestCase.tearDownClass()
cls.spark.stop()

def test_vectorized_udf_basic(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
col('id').alias('long'),
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('boolean').alias('bool'))
f = lambda x: x
str_f = pandas_udf(f, StringType())
int_f = pandas_udf(f, IntegerType())
long_f = pandas_udf(f, LongType())
float_f = pandas_udf(f, FloatType())
double_f = pandas_udf(f, DoubleType())
bool_f = pandas_udf(f, BooleanType())
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_boolean(self):
from pyspark.sql.functions import pandas_udf, col
data = [(True,), (True,), (None,), (False,)]
schema = StructType().add("bool", BooleanType())
df = self.spark.createDataFrame(data, schema)
bool_f = pandas_udf(lambda x: x, BooleanType())
res = df.select(bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_byte(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("byte", ByteType())
df = self.spark.createDataFrame(data, schema)
byte_f = pandas_udf(lambda x: x, ByteType())
res = df.select(byte_f(col('byte')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_short(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("short", ShortType())
df = self.spark.createDataFrame(data, schema)
short_f = pandas_udf(lambda x: x, ShortType())
res = df.select(short_f(col('short')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_int(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("int", IntegerType())
df = self.spark.createDataFrame(data, schema)
int_f = pandas_udf(lambda x: x, IntegerType())
res = df.select(int_f(col('int')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_long(self):
from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("long", LongType())
df = self.spark.createDataFrame(data, schema)
long_f = pandas_udf(lambda x: x, LongType())
res = df.select(long_f(col('long')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_float(self):
from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("float", FloatType())
df = self.spark.createDataFrame(data, schema)
float_f = pandas_udf(lambda x: x, FloatType())
res = df.select(float_f(col('float')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_double(self):
from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("double", DoubleType())
df = self.spark.createDataFrame(data, schema)
double_f = pandas_udf(lambda x: x, DoubleType())
res = df.select(double_f(col('double')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_null_string(self):
from pyspark.sql.functions import pandas_udf, col
data = [("foo",), (None,), ("bar",), ("bar",)]
schema = StructType().add("str", StringType())
df = self.spark.createDataFrame(data, schema)
str_f = pandas_udf(lambda x: x, StringType())
res = df.select(str_f(col('str')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_zero_parameter(self):
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = self.spark.range(100000)
f0 = pandas_udf(lambda **kwargs: pd.Series(1).repeat(kwargs['size']), LongType())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that you use **kwargs way for size hint.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give it a try and we can see how it turns out, then dscuss

res = df.select(f0())
self.assertEquals(df.select(lit(1)).collect(), res.collect())

def test_vectorized_udf_datatype_string(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
col('id').alias('long'),
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('boolean').alias('bool'))
f = lambda x: x
str_f = pandas_udf(f, 'string')
int_f = pandas_udf(f, 'integer')
long_f = pandas_udf(f, 'long')
float_f = pandas_udf(f, 'float')
double_f = pandas_udf(f, 'double')
bool_f = pandas_udf(f, 'boolean')
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), bool_f(col('bool')))
self.assertEquals(df.collect(), res.collect())

def test_vectorized_udf_complex(self):
from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'),
col('id').cast('double').alias('c'))
add = pandas_udf(lambda x, y: x + y, IntegerType())
power2 = pandas_udf(lambda x: 2 ** x, IntegerType())
mul = pandas_udf(lambda x, y: x * y, DoubleType())
res = df.select(add(col('a'), col('b')), power2(col('a')), mul(col('b'), col('c')))
expected = df.select(expr('a + b'), expr('power(2, a)'), expr('b * c'))
self.assertEquals(expected.collect(), res.collect())

def test_vectorized_udf_exception(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'division( or modulo)? by zero'):
df.select(raise_exception(col('id'))).collect()

def test_vectorized_udf_invalid_length(self):
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
raise_exception = pandas_udf(lambda: pd.Series(1), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'The length of returned value should be the same as input value'):
df.select(raise_exception()).collect()

def test_vectorized_udf_mix_udf(self):
from pyspark.sql.functions import pandas_udf, udf, col
df = self.spark.range(10)
row_by_row_udf = udf(lambda x: x, LongType())
pd_udf = pandas_udf(lambda x: x, LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Can not mix vectorized and non-vectorized UDFs'):
df.select(row_by_row_udf(col('id')), pd_udf(col('id'))).collect()


if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
Expand Down