Skip to content

ENH: Support groupby.ewm operations #37878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 43 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
730571a
Add class to enable groupby ewm
Nov 9, 2020
5dd1557
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 9, 2020
2b54756
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 10, 2020
af1d0cf
Add stock function for numba groupby ewma
Nov 11, 2020
335d3ab
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 11, 2020
b479dd7
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 14, 2020
4423905
Finish jitted function for groupby_ewma
Nov 14, 2020
332b750
Add EWMA indexer, and adjust generated function
Nov 14, 2020
1880f2d
push a dummy function
Nov 14, 2020
2b181c8
make cache in rolling reuseable by groupby ewma
Nov 14, 2020
c6854c6
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 14, 2020
78718da
Add initial tests, fix caching function
Nov 14, 2020
722f7d3
Add the appropriate method on groupby
Nov 14, 2020
3e2d7dd
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 15, 2020
2070143
Fix conftest and unused params
Nov 15, 2020
2af5304
Make ewm reuse BaseWindow._apply
Nov 15, 2020
868d736
Remove redefined EWM._apply
Nov 15, 2020
da838b3
Cant use super because of np.ndarray copies
Nov 15, 2020
0bf6a80
Fix bugs in groupby ewma, add kwargs for groupby ewma
Nov 15, 2020
cc0800b
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 15, 2020
19dfd3e
Remove breakpoint
Nov 15, 2020
ecbbe76
Add more direct import
Nov 15, 2020
2219df6
Add docstring
Nov 15, 2020
0feb852
Add corr and tests
Nov 15, 2020
7de126f
Format tests and add whatsnew
Nov 15, 2020
e3822ba
clarify whatsnew
Nov 15, 2020
d03dfdd
Change groupby support
Nov 15, 2020
fdc4d66
Rename grouper -> groupby
Nov 15, 2020
27955a1
isort
Nov 15, 2020
08ca227
Make agg docs more consistent
Nov 15, 2020
e862d4f
lint
Nov 15, 2020
33b81ba
Deprivatize helper method
Nov 15, 2020
a7af894
Change com for typing, asv bench
Nov 15, 2020
3e5a216
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 16, 2020
d9e77d2
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 16, 2020
20f0908
Use numba_cache_key instead of 2 variables
Nov 16, 2020
0009dfa
Mypy
Nov 16, 2020
208e2d8
Remove copy paste error
Nov 16, 2020
45e1ba4
Fix groupby tests for new ewm method
Nov 16, 2020
fc02c52
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 16, 2020
a0f0017
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 17, 2020
3baa13e
Add tests comparing to groupby.apply
Nov 17, 2020
f3800f3
Merge remote-tracking branch 'upstream/master' into feature/groupby_ewma
Nov 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,17 @@ def time_rolling_offset(self, method):
getattr(self.groupby_roll_offset, method)()


class GroupbyEWM:

params = ["cython", "numba"]
param_names = ["engine"]

def setup(self, engine):
df = pd.DataFrame({"A": range(50), "B": range(50)})
self.gb_ewm = df.groupby("A").ewm(com=1.0)

def time_groupby_mean(self, engine):
self.gb_ewm.mean(engine=engine)


from .pandas_vb_common import setup # noqa: F401 isort:skip
2 changes: 1 addition & 1 deletion doc/source/user_guide/window.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Concept Method Returned Object
Rolling window ``rolling`` ``Rolling`` Yes Yes
Weighted window ``rolling`` ``Window`` No No
Expanding window ``expanding`` ``Expanding`` No Yes
Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No No
Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2)
============================= ================= =========================== =========================== ========================

As noted above, some operations support specifying a window based on a time offset:
Expand Down
17 changes: 17 additions & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,23 @@ example where the index name is preserved:
The same is true for :class:`MultiIndex`, but the logic is applied separately on a
level-by-level basis.

.. _whatsnew_120.groupby_ewm:

Groupby supports EWM operations directly
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:class:`DataFrameGroupBy` now supports exponentially weighted window operations directly (:issue:`16037`).

.. ipython:: python

df = pd.DataFrame({'A': ['a', 'b', 'a', 'b'], 'B': range(4)})
df
df.groupby('A').ewm(com=1.0).mean()

Additionally ``mean`` supports execution via `Numba <https://numba.pydata.org/>`__ with
the ``engine`` and ``engine_kwargs`` arguments. Numba must be installed as an optional dependency
to use this feature.

.. _whatsnew_120.enhancements.other:

Other enhancements
Expand Down
21 changes: 14 additions & 7 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1496,15 +1496,17 @@ def roll_weighted_var(float64_t[:] values, float64_t[:] weights,
# ----------------------------------------------------------------------
# Exponentially weighted moving average

def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times,
int64_t halflife):
def ewma_time(const float64_t[:] vals, int64_t[:] start, int64_t[:] end,
int minp, ndarray[int64_t] times, int64_t halflife):
"""
Compute exponentially-weighted moving average using halflife and time
distances.

Parameters
----------
vals : ndarray[float_64]
start: ndarray[int_64]
end: ndarray[int_64]
minp : int
times : ndarray[int64]
halflife : int64
Expand Down Expand Up @@ -1552,17 +1554,20 @@ def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times,
return output


def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp):
def ewma(float64_t[:] vals, int64_t[:] start, int64_t[:] end, int minp,
float64_t com, bint adjust, bint ignore_na):
"""
Compute exponentially-weighted moving average using center-of-mass.

Parameters
----------
vals : ndarray (float64 type)
start: ndarray (int64 type)
end: ndarray (int64 type)
minp : int
com : float64
adjust : int
ignore_na : bool
minp : int

Returns
-------
Expand Down Expand Up @@ -1620,19 +1625,21 @@ def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp
# Exponentially weighted moving covariance


def ewmcov(float64_t[:] input_x, float64_t[:] input_y,
float64_t com, bint adjust, bint ignore_na, int minp, bint bias):
def ewmcov(float64_t[:] input_x, int64_t[:] start, int64_t[:] end, int minp,
float64_t[:] input_y, float64_t com, bint adjust, bint ignore_na, bint bias):
"""
Compute exponentially-weighted moving variance using center-of-mass.

Parameters
----------
input_x : ndarray (float64 type)
start: ndarray (int64 type)
end: ndarray (int64 type)
minp : int
input_y : ndarray (float64 type)
com : float64
adjust : int
ignore_na : bool
minp : int
bias : int

Returns
Expand Down
1 change: 1 addition & 0 deletions pandas/core/groupby/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def _gotitem(self, key, ndim, subset=None):
"describe",
"dtypes",
"expanding",
"ewm",
"filter",
"get_group",
"groups",
Expand Down
10 changes: 10 additions & 0 deletions pandas/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,16 @@ def expanding(self, *args, **kwargs):

return ExpandingGroupby(self, *args, **kwargs)

@Substitution(name="groupby")
@Appender(_common_see_also)
def ewm(self, *args, **kwargs):
"""
Return an ewm grouper, providing ewm functionality per group.
"""
from pandas.core.window import ExponentialMovingWindowGroupby

return ExponentialMovingWindowGroupby(self, *args, **kwargs)

def _fill(self, direction, limit=None):
"""
Shared function for `pad` and `backfill` to call Cython method.
Expand Down
5 changes: 4 additions & 1 deletion pandas/core/window/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from pandas.core.window.ewm import ExponentialMovingWindow # noqa:F401
from pandas.core.window.ewm import ( # noqa:F401
ExponentialMovingWindow,
ExponentialMovingWindowGroupby,
)
from pandas.core.window.expanding import Expanding, ExpandingGroupby # noqa:F401
from pandas.core.window.rolling import Rolling, RollingGroupby, Window # noqa:F401
156 changes: 122 additions & 34 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,20 @@
from pandas.core.dtypes.common import is_datetime64_ns_dtype

import pandas.core.common as common
from pandas.core.window.common import _doc_template, _shared_docs, zsqrt
from pandas.core.window.rolling import BaseWindow, flex_binary_moment
from pandas.core.util.numba_ import maybe_use_numba
from pandas.core.window.common import (
_doc_template,
_shared_docs,
flex_binary_moment,
zsqrt,
)
from pandas.core.window.indexers import (
BaseIndexer,
ExponentialMovingWindowIndexer,
GroupbyIndexer,
)
from pandas.core.window.numba_ import generate_numba_groupby_ewma_func
from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, dispatch

if TYPE_CHECKING:
from pandas import Series
Expand Down Expand Up @@ -219,14 +231,16 @@ def __init__(
ignore_na: bool = False,
axis: int = 0,
times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None,
**kwargs,
):
self.com: Optional[float]
self.obj = obj
self.min_periods = max(int(min_periods), 1)
self.adjust = adjust
self.ignore_na = ignore_na
self.axis = axis
self.on = None
self.center = False
self.closed = None
if times is not None:
if isinstance(times, str):
times = self._selected_obj[times]
Expand All @@ -245,7 +259,7 @@ def __init__(
if common.count_not_none(com, span, alpha) > 0:
self.com = get_center_of_mass(com, span, None, alpha)
else:
self.com = None
self.com = 0.0
else:
if halflife is not None and isinstance(halflife, (str, datetime.timedelta)):
raise ValueError(
Expand All @@ -260,6 +274,12 @@ def __init__(
def _constructor(self):
return ExponentialMovingWindow

def _get_window_indexer(self) -> BaseIndexer:
"""
Return an indexer class that will compute the window start and end bounds
"""
return ExponentialMovingWindowIndexer()

_agg_see_also_doc = dedent(
"""
See Also
Expand Down Expand Up @@ -299,27 +319,6 @@ def aggregate(self, func, *args, **kwargs):

agg = aggregate

def _apply(self, func):
"""
Rolling statistical measure using supplied function. Designed to be
used with passed-in Cython array-based functions.

Parameters
----------
func : str/callable to apply

Returns
-------
y : same type as input argument
"""

def homogeneous_func(values: np.ndarray):
if values.size == 0:
return values.copy()
return np.apply_along_axis(func, self.axis, values)

return self._apply_blockwise(homogeneous_func)

@Substitution(name="ewm", func_name="mean")
@Appender(_doc_template)
def mean(self, *args, **kwargs):
Expand All @@ -336,7 +335,6 @@ def mean(self, *args, **kwargs):
window_func = self._get_roll_func("ewma_time")
window_func = partial(
window_func,
minp=self.min_periods,
times=self.times,
halflife=self.halflife,
)
Expand All @@ -347,7 +345,6 @@ def mean(self, *args, **kwargs):
com=self.com,
adjust=self.adjust,
ignore_na=self.ignore_na,
minp=self.min_periods,
)
return self._apply(window_func)

Expand All @@ -371,13 +368,19 @@ def var(self, bias: bool = False, *args, **kwargs):
Exponential weighted moving variance.
"""
nv.validate_window_func("var", args, kwargs)
window_func = self._get_roll_func("ewmcov")
window_func = partial(
window_func,
com=self.com,
adjust=self.adjust,
ignore_na=self.ignore_na,
bias=bias,
)

def f(arg):
return window_aggregations.ewmcov(
arg, arg, self.com, self.adjust, self.ignore_na, self.min_periods, bias
)
def var_func(values, begin, end, min_periods):
return window_func(values, begin, end, min_periods, values)

return self._apply(f)
return self._apply(var_func)

@Substitution(name="ewm", func_name="cov")
@Appender(_doc_template)
Expand Down Expand Up @@ -419,11 +422,13 @@ def _get_cov(X, Y):
Y = self._shallow_copy(Y)
cov = window_aggregations.ewmcov(
X._prep_values(),
np.array([0], dtype=np.int64),
np.array([0], dtype=np.int64),
self.min_periods,
Y._prep_values(),
self.com,
self.adjust,
self.ignore_na,
self.min_periods,
bias,
)
return wrap_result(X, cov)
Expand Down Expand Up @@ -470,7 +475,15 @@ def _get_corr(X, Y):

def _cov(x, y):
return window_aggregations.ewmcov(
x, y, self.com, self.adjust, self.ignore_na, self.min_periods, 1
x,
np.array([0], dtype=np.int64),
np.array([0], dtype=np.int64),
self.min_periods,
y,
self.com,
self.adjust,
self.ignore_na,
1,
)

x_values = X._prep_values()
Expand All @@ -485,3 +498,78 @@ def _cov(x, y):
return flex_binary_moment(
self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise)
)


class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow):
"""
Provide an exponential moving window groupby implementation.
"""

def _get_window_indexer(self) -> GroupbyIndexer:
"""
Return an indexer class that will compute the window start and end bounds

Returns
-------
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._groupby.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer

var = dispatch("var", bias=False)
std = dispatch("std", bias=False)
cov = dispatch("cov", other=None, pairwise=None, bias=False)
corr = dispatch("corr", other=None, pairwise=None)

def mean(self, engine=None, engine_kwargs=None):
"""
Parameters
----------
engine : str, default None
* ``'cython'`` : Runs mean through C-extensions from cython.
* ``'numba'`` : Runs mean through JIT compiled code from numba.
Only available when ``raw`` is set to ``True``.
* ``None`` : Defaults to ``'cython'`` or globally setting
``compute.use_numba``

.. versionadded:: 1.2.0

engine_kwargs : dict, default None
* For ``'cython'`` engine, there are no accepted ``engine_kwargs``
* For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil``
and ``parallel`` dictionary keys. The values must either be ``True`` or
``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is
``{'nopython': True, 'nogil': False, 'parallel': False}``.

.. versionadded:: 1.2.0

Returns
-------
Series or DataFrame
Return type is determined by the caller.
"""
if maybe_use_numba(engine):
groupby_ewma_func = generate_numba_groupby_ewma_func(
engine_kwargs,
self.com,
self.adjust,
self.ignore_na,
)
return self._apply(
groupby_ewma_func,
numba_cache_key=(lambda x: x, "groupby_ewma"),
)
elif engine in ("cython", None):
if engine_kwargs is not None:
raise ValueError("cython engine does not accept engine_kwargs")

def f(x):
x = self._shallow_copy(x, groupby=self._groupby)
return x.mean()

return self._groupby.apply(f)
else:
raise ValueError("engine must be either 'numba' or 'cython'")
Loading