Skip to content

Commit

Permalink
Add Window Functions for use with function builder (#808)
Browse files Browse the repository at this point in the history
* Add window function as template for others and function builder

* Adding docstrings

* Change last_value to use function builder instead of explicitly passing values

* Allow any value for lead function default value and add unit test

* Add lead window function and unit tests

* Temporarily commenting out deprecated functions in documenation so builder will pass

* Expose row_number window function

* Add rank window function

* Add percent rank and dense rank

* Add cume_dist

* Add ntile window function

* Add comment to update when upstream merges

* Window frame required calling inner value

* Add unit test for avg as window function

* Working on documentation for window functions

* Add pyo build config file to git ignore since this is user specific

* Add examples to docstring

* Optionally add window function parameters during function call

* Update sort and order_by to apply automatic ordering if any other expression is given

* Update unit tests to be cleaner and use default sort on expressions

* Ignore vscode folder specific settings

* Window frames should only apply to aggregate functions used as window functions. Also pass in scalar pyarrow values so we can set a range other than a uint

* Remove deprecated warning until we actually have a way to use all functions without calling window()

* Built in window functions do not have any impact by setting null_treatment so remove from user facing

* Update user documentation on how to pass parameters for different window functions and what their impacts are

* Make first_value and last_value identical in the interface
  • Loading branch information
timsaucer committed Sep 2, 2024
1 parent 003eea8 commit 90f5b5b
Show file tree
Hide file tree
Showing 12 changed files with 1,059 additions and 128 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ target
/docs/temp
/docs/build
.DS_Store
.vscode

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -31,3 +32,6 @@ apache-rat-*.jar
CHANGELOG.md.bak

docs/mdbook/book

.pyo3_build_config

2 changes: 2 additions & 0 deletions docs/source/user-guide/common-operations/aggregations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
.. specific language governing permissions and limitations
.. under the License.
.. _aggregation:

Aggregation
============

Expand Down
187 changes: 156 additions & 31 deletions docs/source/user-guide/common-operations/windows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
.. specific language governing permissions and limitations
.. under the License.
.. _window_functions:

Window Functions
================

In this section you will learn about window functions. A window function utilizes values from one or multiple rows to
produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows.
In this section you will learn about window functions. A window function utilizes values from one or
multiple rows to produce a result for each individual row, unlike an aggregate function that
provides a single value for multiple rows.

The functionality of window functions in DataFusion is supported by the dedicated :py:func:`~datafusion.functions.window` function.
The window functions are availble in the :py:mod:`~datafusion.functions` module.

We'll use the pokemon dataset (from Ritchie Vink) in the following examples.

Expand All @@ -40,54 +43,176 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
ctx = SessionContext()
df = ctx.read_csv("pokemon.csv")
Here is an example that shows how to compare each pokemons’s attack power with the average attack power in its ``"Type 1"``
Here is an example that shows how you can compare each pokemon's speed to the speed of the
previous row in the DataFrame.

.. ipython:: python
df.select(
col('"Name"'),
col('"Attack"'),
f.alias(
f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]),
"Average Attack",
)
col('"Speed"'),
f.lag(col('"Speed"')).alias("Previous Speed")
)
You can also control the order in which rows are processed by window functions by providing
Setting Parameters
------------------


Ordering
^^^^^^^^

You can control the order in which rows are processed by window functions by providing
a list of ``order_by`` functions for the ``order_by`` parameter.

.. ipython:: python
df.select(
col('"Name"'),
col('"Attack"'),
f.alias(
f.window(
"rank",
[],
partition_by=[col('"Type 1"')],
order_by=[f.order_by(col('"Attack"'))],
),
"rank",
),
col('"Type 1"'),
f.rank(
partition_by=[col('"Type 1"')],
order_by=[col('"Attack"').sort(ascending=True)],
).alias("rank"),
).sort(col('"Type 1"'), col('"Attack"'))
Partitions
^^^^^^^^^^

A window function can take a list of ``partition_by`` columns similar to an
:ref:`Aggregation Function<aggregation>`. This will cause the window values to be evaluated
independently for each of the partitions. In the example above, we found the rank of each
Pokemon per ``Type 1`` partitions. We can see the first couple of each partition if we do
the following:

.. ipython:: python
df.select(
col('"Name"'),
col('"Attack"'),
col('"Type 1"'),
f.rank(
partition_by=[col('"Type 1"')],
order_by=[col('"Attack"').sort(ascending=True)],
).alias("rank"),
).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank"))
Window Frame
^^^^^^^^^^^^

When using aggregate functions, the Window Frame of defines the rows over which it operates.
If you do not specify a Window Frame, the frame will be set depending on the following
criteria.

* If an ``order_by`` clause is set, the default window frame is defined as the rows between
unbounded preceeding and the current row.
* If an ``order_by`` is not set, the default frame is defined as the rows betwene unbounded
and unbounded following (the entire partition).

Window Frames are defined by three parameters: unit type, starting bound, and ending bound.

The unit types available are:

* Rows: The starting and ending boundaries are defined by the number of rows relative to the
current row.
* Range: When using Range, the ``order_by`` clause must have exactly one term. The boundaries
are defined bow how close the rows are to the value of the expression in the ``order_by``
parameter.
* Groups: A "group" is the set of all rows that have equivalent values for all terms in the
``order_by`` clause.

In this example we perform a "rolling average" of the speed of the current Pokemon and the
two preceeding rows.

.. ipython:: python
from datafusion.expr import WindowFrame
df.select(
col('"Name"'),
col('"Speed"'),
f.window("avg",
[col('"Speed"')],
order_by=[col('"Speed"')],
window_frame=WindowFrame("rows", 2, 0)
).alias("Previous Speed")
)
Null Treatment
^^^^^^^^^^^^^^

When using aggregate functions as window functions, it is often useful to specify how null values
should be treated. In order to do this you need to use the builder function. In future releases
we expect this to be simplified in the interface.

One common usage for handling nulls is the case where you want to find the last value up to the
current row. In the following example we demonstrate how setting the null treatment to ignore
nulls will fill in with the value of the most recent non-null row. To do this, we also will set
the window frame so that we only process up to the current row.

In this example, we filter down to one specific type of Pokemon that does have some entries in
it's ``Type 2`` column that are null.

.. ipython:: python
from datafusion.common import NullTreatment
df.filter(col('"Type 1"') == lit("Bug")).select(
'"Name"',
'"Type 2"',
f.window("last_value", [col('"Type 2"')])
.window_frame(WindowFrame("rows", None, 0))
.order_by(col('"Speed"'))
.null_treatment(NullTreatment.IGNORE_NULLS)
.build()
.alias("last_wo_null"),
f.window("last_value", [col('"Type 2"')])
.window_frame(WindowFrame("rows", None, 0))
.order_by(col('"Speed"'))
.null_treatment(NullTreatment.RESPECT_NULLS)
.build()
.alias("last_with_null")
)
Aggregate Functions
-------------------

You can use any :ref:`Aggregation Function<aggregation>` as a window function. Currently
aggregate functions must use the deprecated
:py:func:`datafusion.functions.window` API but this should be resolved in
DataFusion 42.0 (`Issue Link <https://github.com/apache/datafusion-python/issues/833>`_). Here
is an example that shows how to compare each pokemons’s attack power with the average attack
power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function.

.. ipython:: python
:okwarning:
df.select(
col('"Name"'),
col('"Attack"'),
col('"Type 1"'),
f.window("avg", [col('"Attack"')])
.partition_by(col('"Type 1"'))
.build()
.alias("Average Attack"),
)
Available Functions
-------------------

The possible window functions are:

1. Rank Functions
- rank
- dense_rank
- row_number
- ntile
- :py:func:`datafusion.functions.rank`
- :py:func:`datafusion.functions.dense_rank`
- :py:func:`datafusion.functions.ntile`
- :py:func:`datafusion.functions.row_number`

2. Analytical Functions
- cume_dist
- percent_rank
- lag
- lead
- first_value
- last_value
- nth_value
- :py:func:`datafusion.functions.cume_dist`
- :py:func:`datafusion.functions.percent_rank`
- :py:func:`datafusion.functions.lag`
- :py:func:`datafusion.functions.lead`

3. Aggregate Functions
- All aggregate functions can be used as window functions.
- All :ref:`Aggregation Functions<aggregation>` can be used as window functions.
7 changes: 3 additions & 4 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ def select(self, *exprs: Expr | str) -> DataFrame:
df = df.select("a", col("b"), col("a").alias("alternate_a"))
"""
exprs = [
arg.expr if isinstance(arg, Expr) else Expr.column(arg).expr
for arg in exprs
exprs_internal = [
Expr.column(arg).expr if isinstance(arg, str) else arg.expr for arg in exprs
]
return DataFrame(self.df.select(*exprs))
return DataFrame(self.df.select(*exprs_internal))

def filter(self, *predicates: Expr) -> DataFrame:
"""Return a DataFrame for which ``predicate`` evaluates to ``True``.
Expand Down
Loading

0 comments on commit 90f5b5b

Please sign in to comment.