diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 226b225b47591..79a33c437ea5c 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -225,4 +225,17 @@ def time_rolling_offset(self, method): getattr(self.groupby_roll_offset, method)() +class GroupbyEWM: + + params = ["cython", "numba"] + param_names = ["engine"] + + def setup(self, engine): + df = pd.DataFrame({"A": range(50), "B": range(50)}) + self.gb_ewm = df.groupby("A").ewm(com=1.0) + + def time_groupby_mean(self, engine): + self.gb_ewm.mean(engine=engine) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index 47ef1e9c8c4d7..05f8be091fa25 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -43,7 +43,7 @@ Concept Method Returned Object Rolling window ``rolling`` ``Rolling`` Yes Yes Weighted window ``rolling`` ``Window`` No No Expanding window ``expanding`` ``Expanding`` No Yes -Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No No +Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) ============================= ================= =========================== =========================== ======================== As noted above, some operations support specifying a window based on a time offset: diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 62da3c0c5cddc..15b5485b26402 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -204,6 +204,23 @@ example where the index name is preserved: The same is true for :class:`MultiIndex`, but the logic is applied separately on a level-by-level basis. +.. _whatsnew_120.groupby_ewm: + +Groupby supports EWM operations directly +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +:class:`DataFrameGroupBy` now supports exponentially weighted window operations directly (:issue:`16037`). + +.. ipython:: python + + df = pd.DataFrame({'A': ['a', 'b', 'a', 'b'], 'B': range(4)}) + df + df.groupby('A').ewm(com=1.0).mean() + +Additionally ``mean`` supports execution via `Numba `__ with +the ``engine`` and ``engine_kwargs`` arguments. Numba must be installed as an optional dependency +to use this feature. + .. _whatsnew_120.enhancements.other: Other enhancements diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index c1b5adab654cb..54a09a6d2ede7 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1496,8 +1496,8 @@ def roll_weighted_var(float64_t[:] values, float64_t[:] weights, # ---------------------------------------------------------------------- # Exponentially weighted moving average -def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, - int64_t halflife): +def ewma_time(const float64_t[:] vals, int64_t[:] start, int64_t[:] end, + int minp, ndarray[int64_t] times, int64_t halflife): """ Compute exponentially-weighted moving average using halflife and time distances. @@ -1505,6 +1505,8 @@ def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, Parameters ---------- vals : ndarray[float_64] + start: ndarray[int_64] + end: ndarray[int_64] minp : int times : ndarray[int64] halflife : int64 @@ -1552,17 +1554,20 @@ def ewma_time(const float64_t[:] vals, int minp, ndarray[int64_t] times, return output -def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp): +def ewma(float64_t[:] vals, int64_t[:] start, int64_t[:] end, int minp, + float64_t com, bint adjust, bint ignore_na): """ Compute exponentially-weighted moving average using center-of-mass. Parameters ---------- vals : ndarray (float64 type) + start: ndarray (int64 type) + end: ndarray (int64 type) + minp : int com : float64 adjust : int ignore_na : bool - minp : int Returns ------- @@ -1620,19 +1625,21 @@ def ewma(float64_t[:] vals, float64_t com, bint adjust, bint ignore_na, int minp # Exponentially weighted moving covariance -def ewmcov(float64_t[:] input_x, float64_t[:] input_y, - float64_t com, bint adjust, bint ignore_na, int minp, bint bias): +def ewmcov(float64_t[:] input_x, int64_t[:] start, int64_t[:] end, int minp, + float64_t[:] input_y, float64_t com, bint adjust, bint ignore_na, bint bias): """ Compute exponentially-weighted moving variance using center-of-mass. Parameters ---------- input_x : ndarray (float64 type) + start: ndarray (int64 type) + end: ndarray (int64 type) + minp : int input_y : ndarray (float64 type) com : float64 adjust : int ignore_na : bool - minp : int bias : int Returns diff --git a/pandas/core/groupby/base.py b/pandas/core/groupby/base.py index f205226c03a53..7dc0db35bf8fe 100644 --- a/pandas/core/groupby/base.py +++ b/pandas/core/groupby/base.py @@ -192,6 +192,7 @@ def _gotitem(self, key, ndim, subset=None): "describe", "dtypes", "expanding", + "ewm", "filter", "get_group", "groups", diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index ad8f212aa20ea..6e26e9a43bb2a 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -1859,6 +1859,16 @@ def expanding(self, *args, **kwargs): return ExpandingGroupby(self, *args, **kwargs) + @Substitution(name="groupby") + @Appender(_common_see_also) + def ewm(self, *args, **kwargs): + """ + Return an ewm grouper, providing ewm functionality per group. + """ + from pandas.core.window import ExponentialMovingWindowGroupby + + return ExponentialMovingWindowGroupby(self, *args, **kwargs) + def _fill(self, direction, limit=None): """ Shared function for `pad` and `backfill` to call Cython method. diff --git a/pandas/core/window/__init__.py b/pandas/core/window/__init__.py index 304c61ac0e489..b3d0820fee4da 100644 --- a/pandas/core/window/__init__.py +++ b/pandas/core/window/__init__.py @@ -1,3 +1,6 @@ -from pandas.core.window.ewm import ExponentialMovingWindow # noqa:F401 +from pandas.core.window.ewm import ( # noqa:F401 + ExponentialMovingWindow, + ExponentialMovingWindowGroupby, +) from pandas.core.window.expanding import Expanding, ExpandingGroupby # noqa:F401 from pandas.core.window.rolling import Rolling, RollingGroupby, Window # noqa:F401 diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index b601bacec35f1..f8237a436f436 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -14,8 +14,20 @@ from pandas.core.dtypes.common import is_datetime64_ns_dtype import pandas.core.common as common -from pandas.core.window.common import _doc_template, _shared_docs, zsqrt -from pandas.core.window.rolling import BaseWindow, flex_binary_moment +from pandas.core.util.numba_ import maybe_use_numba +from pandas.core.window.common import ( + _doc_template, + _shared_docs, + flex_binary_moment, + zsqrt, +) +from pandas.core.window.indexers import ( + BaseIndexer, + ExponentialMovingWindowIndexer, + GroupbyIndexer, +) +from pandas.core.window.numba_ import generate_numba_groupby_ewma_func +from pandas.core.window.rolling import BaseWindow, BaseWindowGroupby, dispatch if TYPE_CHECKING: from pandas import Series @@ -219,14 +231,16 @@ def __init__( ignore_na: bool = False, axis: int = 0, times: Optional[Union[str, np.ndarray, FrameOrSeries]] = None, + **kwargs, ): - self.com: Optional[float] self.obj = obj self.min_periods = max(int(min_periods), 1) self.adjust = adjust self.ignore_na = ignore_na self.axis = axis self.on = None + self.center = False + self.closed = None if times is not None: if isinstance(times, str): times = self._selected_obj[times] @@ -245,7 +259,7 @@ def __init__( if common.count_not_none(com, span, alpha) > 0: self.com = get_center_of_mass(com, span, None, alpha) else: - self.com = None + self.com = 0.0 else: if halflife is not None and isinstance(halflife, (str, datetime.timedelta)): raise ValueError( @@ -260,6 +274,12 @@ def __init__( def _constructor(self): return ExponentialMovingWindow + def _get_window_indexer(self) -> BaseIndexer: + """ + Return an indexer class that will compute the window start and end bounds + """ + return ExponentialMovingWindowIndexer() + _agg_see_also_doc = dedent( """ See Also @@ -299,27 +319,6 @@ def aggregate(self, func, *args, **kwargs): agg = aggregate - def _apply(self, func): - """ - Rolling statistical measure using supplied function. Designed to be - used with passed-in Cython array-based functions. - - Parameters - ---------- - func : str/callable to apply - - Returns - ------- - y : same type as input argument - """ - - def homogeneous_func(values: np.ndarray): - if values.size == 0: - return values.copy() - return np.apply_along_axis(func, self.axis, values) - - return self._apply_blockwise(homogeneous_func) - @Substitution(name="ewm", func_name="mean") @Appender(_doc_template) def mean(self, *args, **kwargs): @@ -336,7 +335,6 @@ def mean(self, *args, **kwargs): window_func = self._get_roll_func("ewma_time") window_func = partial( window_func, - minp=self.min_periods, times=self.times, halflife=self.halflife, ) @@ -347,7 +345,6 @@ def mean(self, *args, **kwargs): com=self.com, adjust=self.adjust, ignore_na=self.ignore_na, - minp=self.min_periods, ) return self._apply(window_func) @@ -371,13 +368,19 @@ def var(self, bias: bool = False, *args, **kwargs): Exponential weighted moving variance. """ nv.validate_window_func("var", args, kwargs) + window_func = self._get_roll_func("ewmcov") + window_func = partial( + window_func, + com=self.com, + adjust=self.adjust, + ignore_na=self.ignore_na, + bias=bias, + ) - def f(arg): - return window_aggregations.ewmcov( - arg, arg, self.com, self.adjust, self.ignore_na, self.min_periods, bias - ) + def var_func(values, begin, end, min_periods): + return window_func(values, begin, end, min_periods, values) - return self._apply(f) + return self._apply(var_func) @Substitution(name="ewm", func_name="cov") @Appender(_doc_template) @@ -419,11 +422,13 @@ def _get_cov(X, Y): Y = self._shallow_copy(Y) cov = window_aggregations.ewmcov( X._prep_values(), + np.array([0], dtype=np.int64), + np.array([0], dtype=np.int64), + self.min_periods, Y._prep_values(), self.com, self.adjust, self.ignore_na, - self.min_periods, bias, ) return wrap_result(X, cov) @@ -470,7 +475,15 @@ def _get_corr(X, Y): def _cov(x, y): return window_aggregations.ewmcov( - x, y, self.com, self.adjust, self.ignore_na, self.min_periods, 1 + x, + np.array([0], dtype=np.int64), + np.array([0], dtype=np.int64), + self.min_periods, + y, + self.com, + self.adjust, + self.ignore_na, + 1, ) x_values = X._prep_values() @@ -485,3 +498,78 @@ def _cov(x, y): return flex_binary_moment( self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise) ) + + +class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): + """ + Provide an exponential moving window groupby implementation. + """ + + def _get_window_indexer(self) -> GroupbyIndexer: + """ + Return an indexer class that will compute the window start and end bounds + + Returns + ------- + GroupbyIndexer + """ + window_indexer = GroupbyIndexer( + groupby_indicies=self._groupby.indices, + window_indexer=ExponentialMovingWindowIndexer, + ) + return window_indexer + + var = dispatch("var", bias=False) + std = dispatch("std", bias=False) + cov = dispatch("cov", other=None, pairwise=None, bias=False) + corr = dispatch("corr", other=None, pairwise=None) + + def mean(self, engine=None, engine_kwargs=None): + """ + Parameters + ---------- + engine : str, default None + * ``'cython'`` : Runs mean through C-extensions from cython. + * ``'numba'`` : Runs mean through JIT compiled code from numba. + Only available when ``raw`` is set to ``True``. + * ``None`` : Defaults to ``'cython'`` or globally setting + ``compute.use_numba`` + + .. versionadded:: 1.2.0 + + engine_kwargs : dict, default None + * For ``'cython'`` engine, there are no accepted ``engine_kwargs`` + * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` + and ``parallel`` dictionary keys. The values must either be ``True`` or + ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is + ``{'nopython': True, 'nogil': False, 'parallel': False}``. + + .. versionadded:: 1.2.0 + + Returns + ------- + Series or DataFrame + Return type is determined by the caller. + """ + if maybe_use_numba(engine): + groupby_ewma_func = generate_numba_groupby_ewma_func( + engine_kwargs, + self.com, + self.adjust, + self.ignore_na, + ) + return self._apply( + groupby_ewma_func, + numba_cache_key=(lambda x: x, "groupby_ewma"), + ) + elif engine in ("cython", None): + if engine_kwargs is not None: + raise ValueError("cython engine does not accept engine_kwargs") + + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return x.mean() + + return self._groupby.apply(f) + else: + raise ValueError("engine must be either 'numba' or 'cython'") diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a8229257bb7bb..a3b9695d777d9 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -344,3 +344,18 @@ def get_window_bounds( start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) return start, end + + +class ExponentialMovingWindowIndexer(BaseIndexer): + """Calculate ewm window bounds (the entire window)""" + + @Appender(get_window_bounds_doc) + def get_window_bounds( + self, + num_values: int = 0, + min_periods: Optional[int] = None, + center: Optional[bool] = None, + closed: Optional[str] = None, + ) -> Tuple[np.ndarray, np.ndarray]: + + return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64) diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index c4858b6e5a4ab..274586e1745b5 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -72,3 +72,92 @@ def roll_apply( return result return roll_apply + + +def generate_numba_groupby_ewma_func( + engine_kwargs: Optional[Dict[str, bool]], + com: float, + adjust: bool, + ignore_na: bool, +): + """ + Generate a numba jitted groupby ewma function specified by values + from engine_kwargs. + + Parameters + ---------- + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + com : float + adjust : bool + ignore_na : bool + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + cache_key = (lambda x: x, "groupby_ewma") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + if parallel: + loop_range = numba.prange + else: + loop_range = range + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def groupby_ewma( + values: np.ndarray, + begin: np.ndarray, + end: np.ndarray, + minimum_periods: int, + ) -> np.ndarray: + result = np.empty(len(values)) + alpha = 1.0 / (1.0 + com) + for i in loop_range(len(begin)): + start = begin[i] + stop = end[i] + window = values[start:stop] + sub_result = np.empty(len(window)) + + old_wt_factor = 1.0 - alpha + new_wt = 1.0 if adjust else alpha + + weighted_avg = window[0] + nobs = int(not np.isnan(weighted_avg)) + sub_result[0] = weighted_avg if nobs >= minimum_periods else np.nan + old_wt = 1.0 + + for j in range(1, len(window)): + cur = window[j] + is_observation = not np.isnan(cur) + nobs += is_observation + if not np.isnan(weighted_avg): + + if is_observation or not ignore_na: + + old_wt *= old_wt_factor + if is_observation: + + # avoid numerical errors on constant series + if weighted_avg != cur: + weighted_avg = ( + (old_wt * weighted_avg) + (new_wt * cur) + ) / (old_wt + new_wt) + if adjust: + old_wt += new_wt + else: + old_wt = 1.0 + elif is_observation: + weighted_avg = cur + + sub_result[j] = weighted_avg if nobs >= minimum_periods else np.nan + + result[start:stop] = sub_result + + return result + + return groupby_ewma diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f65452cb2f17f..e74ae5311125e 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -405,7 +405,7 @@ def _apply( self, func: Callable[..., Any], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ): """ @@ -417,9 +417,8 @@ def _apply( ---------- func : callable function to apply name : str, - use_numba_cache : bool - whether to cache a numba compiled function. Only available for numba - enabled methods (so far only apply) + numba_cache_key : tuple + caching key to be used to store a compiled numba func **kwargs additional arguments for rolling function and window function @@ -456,8 +455,8 @@ def calc(x): result = calc(values) result = np.asarray(result) - if use_numba_cache: - NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func + if numba_cache_key is not None: + NUMBA_FUNC_CACHE[numba_cache_key] = func return result @@ -715,7 +714,7 @@ def aggregate(self, func, *args, **kwargs): ) -def _dispatch(name: str, *args, **kwargs): +def dispatch(name: str, *args, **kwargs): """ Dispatch to groupby apply. """ @@ -746,20 +745,20 @@ def __init__(self, obj, *args, **kwargs): self._groupby.grouper.mutated = True super().__init__(obj, *args, **kwargs) - corr = _dispatch("corr", other=None, pairwise=None) - cov = _dispatch("cov", other=None, pairwise=None) + corr = dispatch("corr", other=None, pairwise=None) + cov = dispatch("cov", other=None, pairwise=None) def _apply( self, func: Callable[..., Any], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ) -> FrameOrSeries: result = super()._apply( func, name, - use_numba_cache, + numba_cache_key, **kwargs, ) # Reconstruct the resulting MultiIndex from tuples @@ -1038,7 +1037,7 @@ def _apply( self, func: Callable[[np.ndarray, int, int], np.ndarray], name: Optional[str] = None, - use_numba_cache: bool = False, + numba_cache_key: Optional[Tuple[Callable, str]] = None, **kwargs, ): """ @@ -1050,9 +1049,8 @@ def _apply( ---------- func : callable function to apply name : str, - use_numba_cache : bool - whether to cache a numba compiled function. Only available for numba - enabled methods (so far only apply) + use_numba_cache : tuple + unused **kwargs additional arguments for scipy windows if necessary @@ -1292,10 +1290,12 @@ def apply( if not is_bool(raw): raise ValueError("raw parameter must be `True` or `False`") + numba_cache_key = None if maybe_use_numba(engine): if raw is False: raise ValueError("raw must be `True` when using the numba engine") apply_func = generate_numba_apply_func(args, kwargs, func, engine_kwargs) + numba_cache_key = (func, "rolling_apply") elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") @@ -1305,10 +1305,7 @@ def apply( return self._apply( apply_func, - use_numba_cache=maybe_use_numba(engine), - original_func=func, - args=args, - kwargs=kwargs, + numba_cache_key=numba_cache_key, ) def _generate_cython_apply_func( diff --git a/pandas/tests/groupby/test_allowlist.py b/pandas/tests/groupby/test_allowlist.py index 4a735fc7bb686..34729c771eac9 100644 --- a/pandas/tests/groupby/test_allowlist.py +++ b/pandas/tests/groupby/test_allowlist.py @@ -329,6 +329,7 @@ def test_tab_completion(mframe): "expanding", "pipe", "sample", + "ewm", } assert results == expected diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index 1780925202593..a803ce716eb05 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -74,6 +74,18 @@ def nopython(request): return request.param +@pytest.fixture(params=[True, False]) +def adjust(request): + """adjust keyword argument for ewm""" + return request.param + + +@pytest.fixture(params=[True, False]) +def ignore_na(request): + """ignore_na keyword argument for ewm""" + return request.param + + @pytest.fixture( params=[ pytest.param( diff --git a/pandas/tests/window/test_grouper.py b/pandas/tests/window/test_groupby.py similarity index 91% rename from pandas/tests/window/test_grouper.py rename to pandas/tests/window/test_groupby.py index 65906df819054..c4de112bd6dc0 100644 --- a/pandas/tests/window/test_grouper.py +++ b/pandas/tests/window/test_groupby.py @@ -631,3 +631,60 @@ def test_groupby_rolling_index_level_and_column_label(self): ), ) tm.assert_frame_equal(result, expected) + + +class TestEWM: + @pytest.mark.parametrize( + "method, expected_data", + [ + ["mean", [0.0, 0.6666666666666666, 1.4285714285714286, 2.2666666666666666]], + ["std", [np.nan, 0.707107, 0.963624, 1.177164]], + ["var", [np.nan, 0.5, 0.9285714285714286, 1.3857142857142857]], + ], + ) + def test_methods(self, method, expected_data): + # GH 16037 + df = DataFrame({"A": ["a"] * 4, "B": range(4)}) + result = getattr(df.groupby("A").ewm(com=1.0), method)() + expected = DataFrame( + {"B": expected_data}, + index=MultiIndex.from_tuples( + [ + ("a", 0), + ("a", 1), + ("a", 2), + ("a", 3), + ], + names=["A", None], + ), + ) + tm.assert_frame_equal(result, expected) + + expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)()) + # There may be a bug in the above statement; not returning the correct index + tm.assert_frame_equal(result.reset_index(drop=True), expected) + + @pytest.mark.parametrize( + "method, expected_data", + [["corr", [np.nan, 1.0, 1.0, 1]], ["cov", [np.nan, 0.5, 0.928571, 1.385714]]], + ) + def test_pairwise_methods(self, method, expected_data): + # GH 16037 + df = DataFrame({"A": ["a"] * 4, "B": range(4)}) + result = getattr(df.groupby("A").ewm(com=1.0), method)() + expected = DataFrame( + {"B": expected_data}, + index=MultiIndex.from_tuples( + [ + ("a", 0, "B"), + ("a", 1, "B"), + ("a", 2, "B"), + ("a", 3, "B"), + ], + names=["A", None, None], + ), + ) + tm.assert_frame_equal(result, expected) + + expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)()) + tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 35bdb972a7bc0..3dd09bc4b752a 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -3,7 +3,7 @@ import pandas.util._test_decorators as td -from pandas import Series, option_context +from pandas import DataFrame, Series, option_context import pandas._testing as tm from pandas.core.util.numba_ import NUMBA_FUNC_CACHE @@ -11,7 +11,7 @@ @td.skip_if_no("numba", "0.46.0") @pytest.mark.filterwarnings("ignore:\\nThe keyword argument") # Filter warnings when parallel=True and the function can't be parallelized by Numba -class TestApply: +class TestRollingApply: @pytest.mark.parametrize("jit", [True, False]) def test_numba_vs_cython(self, jit, nogil, parallel, nopython, center): def f(x, *args): @@ -77,6 +77,31 @@ def func_2(x): tm.assert_series_equal(result, expected) +@td.skip_if_no("numba", "0.46.0") +class TestGroupbyEWMMean: + def test_invalid_engine(self): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + with pytest.raises(ValueError, match="engine must be either"): + df.groupby("A").ewm(com=1.0).mean(engine="foo") + + def test_invalid_engine_kwargs(self): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + with pytest.raises(ValueError, match="cython engine does not"): + df.groupby("A").ewm(com=1.0).mean( + engine="cython", engine_kwargs={"nopython": True} + ) + + def test_cython_vs_numba(self, nogil, parallel, nopython, ignore_na, adjust): + df = DataFrame({"A": ["a", "b", "a", "b"], "B": range(4)}) + gb_ewm = df.groupby("A").ewm(com=1.0, adjust=adjust, ignore_na=ignore_na) + + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs) + expected = gb_ewm.mean(engine="cython") + + tm.assert_frame_equal(result, expected) + + @td.skip_if_no("numba", "0.46.0") def test_use_global_config(): def f(x):