Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
431194d
wip
icexelloss Aug 29, 2018
1fbc2a2
Remove empty line
icexelloss Aug 30, 2018
2240032
Initial commit (WIP)
icexelloss Aug 31, 2018
8916284
Fix case for unbounded window
icexelloss Aug 31, 2018
db6f77a
Use numpy variants of pandas UDF
icexelloss Sep 5, 2018
9ae976c
Implement multiple frames and expressions
icexelloss Sep 13, 2018
d31a133
Refactor window logic
icexelloss Sep 14, 2018
9b731f5
Fix python style
icexelloss Sep 14, 2018
f982bf2
Fix memory leak by closing HybridRowQueue
icexelloss Sep 17, 2018
893cbc2
Add rules to split bounded/unbounded windows into separate nodes
icexelloss Sep 17, 2018
9de9513
Revert "Add rules to split bounded/unbounded windows into separate no…
icexelloss Sep 19, 2018
d2be73b
Merge bounded and unbounded window to a single physical node
icexelloss Sep 20, 2018
12d3ae1
Rename wrap_window_agg_pandas_udf to wrap_unbounded_window_agg_pandas…
icexelloss Sep 20, 2018
8c68727
Minor clean up
icexelloss Sep 20, 2018
6bfedd9
Remove println
icexelloss Sep 20, 2018
cc4c647
Address comments
icexelloss Nov 9, 2018
3ec05ab
Remove whitespace
icexelloss Nov 19, 2018
05fac37
Address comments
icexelloss Nov 19, 2018
64db3b0
Fix documentation. Tweak the implementation of wrap_bounded_window_ag…
icexelloss Nov 20, 2018
4df657a
Change eval type back to single Int; Use runner config for bounded/un…
icexelloss Nov 20, 2018
903cbed
Revert white space changes
icexelloss Nov 20, 2018
96b44df
Address latest comments
icexelloss Dec 6, 2018
2d24d5e
Revert changes to WindowInPandasExec and WindowExec w.r.t grouping ke…
icexelloss Dec 7, 2018
5d3bbd6
Address comments
icexelloss Dec 11, 2018
c2d574f
Remove private[package] modifier
icexelloss Dec 12, 2018
0408c26
Address comments
icexelloss Dec 13, 2018
04873bd
address comments
icexelloss Dec 17, 2018
03702d4
Small fix
icexelloss Dec 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2982,8 +2982,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 2| 6.0|
+---+-----------+

This example shows using grouped aggregated UDFs as window functions. Note that only
unbounded window frame is supported at the moment:
This example shows using grouped aggregated UDFs as window functions.

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
Expand All @@ -2993,20 +2992,24 @@ def pandas_udf(f=None, returnType=None, functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
>>> w = Window \\
... .partitionBy('id') \\
... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
>>> w = (Window.partitionBy('id')
... .orderBy('v')
... .rowsBetween(-1, 0))
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+

.. note:: For performance reasons, the input series to window functions are not copied.
Therefore, mutating the input series is not allowed and will cause incorrect results.
For the same reason, users should also not rely on the index of the input series.

.. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window`

.. note:: The user-defined functions are considered deterministic by default. Due to
Expand Down
157 changes: 140 additions & 17 deletions python/pyspark/sql/tests/test_pandas_udf_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def pandas_scalar_time_two(self):
from pyspark.sql.functions import pandas_udf
return pandas_udf(lambda v: v * 2, 'double')

@property
def pandas_agg_count_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this import and all the others should be moved to the top, it's repeated many times. It could done be a follow though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@pandas_udf('long', PandasUDFType.GROUPED_AGG)
def count(v):
return len(v)
return count

@property
def pandas_agg_mean_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
Expand Down Expand Up @@ -77,7 +86,7 @@ def min(v):
@property
def unbounded_window(self):
return Window.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing).orderBy('v')

@property
def ordered_window(self):
Expand All @@ -87,6 +96,32 @@ def ordered_window(self):
def unpartitioned_window(self):
return Window.partitionBy()

@property
def sliding_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)

@property
def sliding_range_window(self):
return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)

@property
def growing_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)

@property
def growing_range_window(self):
return Window.partitionBy('id').orderBy('v') \
.rangeBetween(Window.unboundedPreceding, 4)

@property
def shrinking_row_window(self):
return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing)

@property
def shrinking_range_window(self):
return Window.partitionBy('id').orderBy('v') \
.rangeBetween(-3, Window.unboundedFollowing)

def test_simple(self):
from pyspark.sql.functions import mean

Expand All @@ -111,12 +146,12 @@ def test_multiple_udfs(self):
w = self.unbounded_window

result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \
.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \
.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w))
.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \
.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w))

expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \
.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('min_w', min(df['w']).over(w))
.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('min_w', min(df['w']).over(w))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

Expand Down Expand Up @@ -204,16 +239,16 @@ def test_mixed_sql_and_udf(self):

# Test chaining sql aggregate function and udf
result3 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('min_v', min(df['v']).over(w)) \
.withColumn('v_diff', col('max_v') - col('min_v')) \
.drop('max_v', 'min_v')
.withColumn('min_v', min(df['v']).over(w)) \
.withColumn('v_diff', col('max_v') - col('min_v')) \
.drop('max_v', 'min_v')
expected3 = expected1

# Test mixing sql window function and udf
result4 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))
.withColumn('rank', rank().over(ow))
expected4 = df.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))
.withColumn('rank', rank().over(ow))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
Expand All @@ -235,8 +270,6 @@ def test_invalid_args(self):

df = self.data
w = self.unbounded_window
ow = self.ordered_window
mean_udf = self.pandas_agg_mean_udf

with QuietTest(self.sc):
with self.assertRaisesRegexp(
Expand All @@ -245,11 +278,101 @@ def test_invalid_args(self):
foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP)
df.withColumn('v2', foo_udf(df['v']).over(w))

with QuietTest(self.sc):
with self.assertRaisesRegexp(
AnalysisException,
'.*Only unbounded window frame is supported.*'):
df.withColumn('mean_v', mean_udf(df['v']).over(ow))
def test_bounded_simple(self):
from pyspark.sql.functions import mean, max, min, count

df = self.data
w1 = self.sliding_row_window
w2 = self.shrinking_range_window

plus_one = self.python_plus_one
count_udf = self.pandas_agg_count_udf
mean_udf = self.pandas_agg_mean_udf
max_udf = self.pandas_agg_max_udf
min_udf = self.pandas_agg_min_udf

result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1)) \
.withColumn('count_v', count_udf(df['v']).over(w2)) \
.withColumn('max_v', max_udf(df['v']).over(w2)) \
.withColumn('min_v', min_udf(df['v']).over(w1))

expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1)) \
.withColumn('count_v', count(df['v']).over(w2)) \
.withColumn('max_v', max(df['v']).over(w2)) \
.withColumn('min_v', min(df['v']).over(w1))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_growing_window(self):
from pyspark.sql.functions import mean

df = self.data
w1 = self.growing_row_window
w2 = self.growing_range_window

mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
.withColumn('m2', mean_udf(df['v']).over(w2))

expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
.withColumn('m2', mean(df['v']).over(w2))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_sliding_window(self):
from pyspark.sql.functions import mean

df = self.data
w1 = self.sliding_row_window
w2 = self.sliding_range_window

mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
.withColumn('m2', mean_udf(df['v']).over(w2))

expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
.withColumn('m2', mean(df['v']).over(w2))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_shrinking_window(self):
from pyspark.sql.functions import mean

df = self.data
w1 = self.shrinking_row_window
w2 = self.shrinking_range_window

mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('m1', mean_udf(df['v']).over(w1)) \
.withColumn('m2', mean_udf(df['v']).over(w2))

expected1 = df.withColumn('m1', mean(df['v']).over(w1)) \
.withColumn('m2', mean(df['v']).over(w2))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_bounded_mixed(self):
from pyspark.sql.functions import mean, max

df = self.data
w1 = self.sliding_row_window
w2 = self.unbounded_window

mean_udf = self.pandas_agg_mean_udf
max_udf = self.pandas_agg_max_udf

result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w1)) \
.withColumn('max_v', max_udf(df['v']).over(w2)) \
.withColumn('mean_unbounded_v', mean_udf(df['v']).over(w1))

expected1 = df.withColumn('mean_v', mean(df['v']).over(w1)) \
.withColumn('max_v', max(df['v']).over(w2)) \
.withColumn('mean_unbounded_v', mean(df['v']).over(w1))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())


if __name__ == "__main__":
Expand Down
57 changes: 52 additions & 5 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,18 @@ def wrapped(*series):
return lambda *a: (wrapped(*a), arrow_return_type)


def wrap_window_agg_pandas_udf(f, return_type):
def wrap_window_agg_pandas_udf(f, return_type, runner_conf, udf_index):
window_bound_types_str = runner_conf.get('pandas_window_bound_types')
window_bound_type = [t.strip().lower() for t in window_bound_types_str.split(',')][udf_index]
if window_bound_type == 'bounded':
return wrap_bounded_window_agg_pandas_udf(f, return_type)
elif window_bound_type == 'unbounded':
return wrap_unbounded_window_agg_pandas_udf(f, return_type)
else:
raise RuntimeError("Invalid window bound type: {} ".format(window_bound_type))


def wrap_unbounded_window_agg_pandas_udf(f, return_type):
# This is similar to grouped_agg_pandas_udf, the only difference
# is that window_agg_pandas_udf needs to repeat the return value
# to match window length, where grouped_agg_pandas_udf just returns
Expand All @@ -160,7 +171,41 @@ def wrapped(*series):
return lambda *a: (wrapped(*a), arrow_return_type)


def read_single_udf(pickleSer, infile, eval_type, runner_conf):
def wrap_bounded_window_agg_pandas_udf(f, return_type):
arrow_return_type = to_arrow_type(return_type)

def wrapped(begin_index, end_index, *series):
import pandas as pd
result = []

# Index operation is faster on np.ndarray,
# So we turn the index series into np array
# here for performance
begin_array = begin_index.values
end_array = end_index.values

for i in range(len(begin_array)):
# Note: Create a slice from a series for each window is
# actually pretty expensive. However, there
# is no easy way to reduce cost here.
# Note: s.iloc[i : j] is about 30% faster than s[i: j], with
# the caveat that the created slices shares the same
# memory with s. Therefore, user are not allowed to
# change the value of input series inside the window
# function. It is rare that user needs to modify the
# input series in the window function, and therefore,
# it is be a reasonable restriction.
# Note: Calling reset_index on the slices will increase the cost
# of creating slices by about 100%. Therefore, for performance
# reasons we don't do it here.
series_slices = [s.iloc[begin_array[i]: end_array[i]] for s in series]
result.append(f(*series_slices))
return pd.Series(result)

return lambda *a: (wrapped(*a), arrow_return_type)


def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
num_arg = read_int(infile)
arg_offsets = [read_int(infile) for i in range(num_arg)]
row_func = None
Expand All @@ -184,7 +229,7 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf):
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
return arg_offsets, wrap_window_agg_pandas_udf(func, return_type)
return arg_offsets, wrap_window_agg_pandas_udf(func, return_type, runner_conf, udf_index)
elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
return arg_offsets, wrap_udf(func, return_type)
else:
Expand Down Expand Up @@ -226,7 +271,8 @@ def read_udfs(pickleSer, infile, eval_type):

# See FlatMapGroupsInPandasExec for how arg_offsets are used to
# distinguish between grouping attributes and data attributes
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=0)
udfs['f'] = udf
split_offset = arg_offsets[0] + 1
arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
Expand All @@ -238,7 +284,8 @@ def read_udfs(pickleSer, infile, eval_type):
# In the special case of a single UDF this will return a single result rather
# than a tuple of results; this is the format that the JVM side expects.
for i in range(num_udfs):
arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
arg_offsets, udf = read_single_udf(
pickleSer, infile, eval_type, runner_conf, udf_index=i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now, since only window udfs can be sent together the mapping of udf_index -> window_bound_type should hold right? It is a little fragile though and cause problems if that ever changed in the future. Instead of indexing the udfs, what do you think about popping window_bound_type values from the runner_conf list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you correctly, do you mean that we maintain a mutable list of the remaining window_bound_type and pass that to the read_single_udf function instead of using the udf_index?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, basically this

window_eval_type_str, remaining_type_str = runner_conf['pandas_window_bound_types'].split(',', 1)
runner_conf['pandas_window_bound_types'] = remaining_type_str
window_eval_type = window_eval_type_str.strip().lower()

I'm not crazy about changing the conf inplace, but it wouldn't rely on any particular udf indexing then. Maybe it would make more sense to check the eval type before calling read_single_udf, process the conf and then send the window_eval_type as an optional param to read_single_udf?

Copy link
Member

@BryanCutler BryanCutler Dec 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a big deal though, so don't block merging if the rest is ready. It can be improved upon later if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler I don't know which way is better.. if we pass window_eval_type to read_single_udf then other non window codepath needs to handle that as well.

Yeah I agree we can revisit this later if needed.

udfs['f%d' % i] = udf
args = ["a[%d]" % o for o in arg_offsets]
call_udf.append("f%d(%s)" % (i, ", ".join(args)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ trait CheckAnalysis extends PredicateHelper {
failAnalysis("An offset window function can only be evaluated in an ordered " +
s"row-based window frame with a single offset: $w")

case _ @ WindowExpression(_: PythonUDF,
WindowSpecDefinition(_, _, frame: SpecifiedWindowFrame))
if !frame.isUnbounded =>
failAnalysis("Only unbounded window frame is supported with Pandas UDFs.")

case w @ WindowExpression(e, s) =>
// Only allow window functions with an aggregate expression or an offset window
// function or a Pandas window UDF.
Expand Down
Loading