diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a2162f2e66b36..b8ad3a96cae3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,6 +157,7 @@ jobs: pytest pandas/tests/reductions/ --array-manager pytest pandas/tests/generic/test_generic.py --array-manager pytest pandas/tests/arithmetic/ --array-manager + pytest pandas/tests/groupby/aggregate/ --array-manager pytest pandas/tests/reshape/merge --array-manager # indexing subset (temporary since other tests don't pass yet) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 4df1d036e5321..45914519c0a94 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -41,6 +41,7 @@ ArrayLike, FrameOrSeries, FrameOrSeriesUnion, + Manager, ) from pandas.util._decorators import ( Appender, @@ -107,7 +108,10 @@ all_indexes_same, ) import pandas.core.indexes.base as ibase -from pandas.core.internals import BlockManager +from pandas.core.internals import ( + ArrayManager, + BlockManager, +) from pandas.core.series import Series from pandas.core.util.numba_ import maybe_use_numba @@ -1074,20 +1078,22 @@ def _iterate_slices(self) -> Iterable[Series]: def _cython_agg_general( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 ) -> DataFrame: - agg_mgr = self._cython_agg_blocks( + agg_mgr = self._cython_agg_manager( how, alt=alt, numeric_only=numeric_only, min_count=min_count ) return self._wrap_agged_manager(agg_mgr) - def _cython_agg_blocks( + def _cython_agg_manager( self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1 - ) -> BlockManager: + ) -> Manager: - data: BlockManager = self._get_data_to_aggregate() + data: Manager = self._get_data_to_aggregate() if numeric_only: data = data.get_numeric_data(copy=False) + using_array_manager = isinstance(data, ArrayManager) + def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike: # see if we can cast the values to the desired dtype # this may not be the original dtype @@ -1101,7 +1107,11 @@ def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike: result = type(values)._from_sequence(result.ravel(), dtype=values.dtype) # Note this will have result.dtype == dtype from above - elif isinstance(result, np.ndarray) and result.ndim == 1: + elif ( + not using_array_manager + and isinstance(result, np.ndarray) + and result.ndim == 1 + ): # We went through a SeriesGroupByPath and need to reshape # GH#32223 includes case with IntegerArray values result = result.reshape(1, -1) @@ -1153,11 +1163,11 @@ def py_fallback(bvalues: ArrayLike) -> ArrayLike: result = mgr.blocks[0].values return result - def blk_func(bvalues: ArrayLike) -> ArrayLike: + def array_func(values: ArrayLike) -> ArrayLike: try: result = self.grouper._cython_operation( - "aggregate", bvalues, how, axis=1, min_count=min_count + "aggregate", values, how, axis=1, min_count=min_count ) except NotImplementedError: # generally if we have numeric_only=False @@ -1170,14 +1180,14 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: assert how == "ohlc" raise - result = py_fallback(bvalues) + result = py_fallback(values) - return cast_agg_result(result, bvalues, how) + return cast_agg_result(result, values, how) # TypeError -> we may have an exception in trying to aggregate # continue and exclude the block # NotImplementedError -> "ohlc" with wrong dtype - new_mgr = data.grouped_reduce(blk_func, ignore_failures=True) + new_mgr = data.grouped_reduce(array_func, ignore_failures=True) if not len(new_mgr): raise DataError("No numeric types to aggregate") @@ -1670,7 +1680,7 @@ def _wrap_frame_output(self, result, obj: DataFrame) -> DataFrame: else: return self.obj._constructor(result, index=obj.index, columns=result_index) - def _get_data_to_aggregate(self) -> BlockManager: + def _get_data_to_aggregate(self) -> Manager: obj = self._obj_with_exclusions if self.axis == 1: return obj.T._mgr @@ -1755,17 +1765,17 @@ def _wrap_transformed_output( return result - def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame: + def _wrap_agged_manager(self, mgr: Manager) -> DataFrame: if not self.as_index: index = np.arange(mgr.shape[1]) - mgr.axes[1] = ibase.Index(index) + mgr.set_axis(1, ibase.Index(index), verify_integrity=False) result = self.obj._constructor(mgr) self._insert_inaxis_grouper_inplace(result) result = result._consolidate() else: index = self.grouper.result_index - mgr.axes[1] = index + mgr.set_axis(1, index, verify_integrity=False) result = self.obj._constructor(mgr) if self.axis == 1: diff --git a/pandas/core/internals/array_manager.py b/pandas/core/internals/array_manager.py index 97d1303824cd4..e0447378c4542 100644 --- a/pandas/core/internals/array_manager.py +++ b/pandas/core/internals/array_manager.py @@ -150,18 +150,20 @@ def _normalize_axis(axis): axis = 1 if axis == 0 else 0 return axis - # TODO can be shared - def set_axis(self, axis: int, new_labels: Index) -> None: + def set_axis( + self, axis: int, new_labels: Index, verify_integrity: bool = True + ) -> None: # Caller is responsible for ensuring we have an Index object. axis = self._normalize_axis(axis) - old_len = len(self._axes[axis]) - new_len = len(new_labels) + if verify_integrity: + old_len = len(self._axes[axis]) + new_len = len(new_labels) - if new_len != old_len: - raise ValueError( - f"Length mismatch: Expected axis has {old_len} elements, new " - f"values have {new_len} elements" - ) + if new_len != old_len: + raise ValueError( + f"Length mismatch: Expected axis has {old_len} elements, new " + f"values have {new_len} elements" + ) self._axes[axis] = new_labels @@ -254,6 +256,30 @@ def reduce( new_mgr = type(self)(result_arrays, [index, columns]) return new_mgr, indexer + def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T: + """ + Apply grouped reduction function columnwise, returning a new ArrayManager. + + Parameters + ---------- + func : grouped reduction function + ignore_failures : bool, default False + Whether to drop columns where func raises TypeError. + + Returns + ------- + ArrayManager + """ + # TODO ignore_failures + result_arrays = [func(arr) for arr in self.arrays] + + if len(result_arrays) == 0: + index = Index([None]) # placeholder + else: + index = Index(range(result_arrays[0].shape[0])) + + return type(self)(result_arrays, [index, self.items]) + def operate_blockwise(self, other: ArrayManager, array_op) -> ArrayManager: """ Apply array_op blockwise with another (aligned) BlockManager. @@ -369,7 +395,7 @@ def apply_with_block(self: T, f, align_keys=None, **kwargs) -> T: if hasattr(arr, "tz") and arr.tz is None: # type: ignore[union-attr] # DatetimeArray needs to be converted to ndarray for DatetimeBlock arr = arr._data # type: ignore[union-attr] - elif arr.dtype.kind == "m": + elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray): # TimedeltaArray needs to be converted to ndarray for TimedeltaBlock arr = arr._data # type: ignore[union-attr] if isinstance(arr, np.ndarray): diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index ab287ca4087bc..e013a7f680d6f 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -234,16 +234,19 @@ def shape(self) -> Shape: def ndim(self) -> int: return len(self.axes) - def set_axis(self, axis: int, new_labels: Index) -> None: + def set_axis( + self, axis: int, new_labels: Index, verify_integrity: bool = True + ) -> None: # Caller is responsible for ensuring we have an Index object. - old_len = len(self.axes[axis]) - new_len = len(new_labels) + if verify_integrity: + old_len = len(self.axes[axis]) + new_len = len(new_labels) - if new_len != old_len: - raise ValueError( - f"Length mismatch: Expected axis has {old_len} elements, new " - f"values have {new_len} elements" - ) + if new_len != old_len: + raise ValueError( + f"Length mismatch: Expected axis has {old_len} elements, new " + f"values have {new_len} elements" + ) self.axes[axis] = new_labels @@ -282,7 +285,7 @@ def get_dtypes(self): return algos.take_nd(dtypes, self.blknos, allow_fill=False) @property - def arrays(self): + def arrays(self) -> List[ArrayLike]: """ Quick access to the backing arrays of the Blocks. @@ -290,8 +293,7 @@ def arrays(self): Not to be used in actual code, and return value is not the same as the ArrayManager method (list of 1D arrays vs iterator of 2D ndarrays / 1D EAs). """ - for blk in self.blocks: - yield blk.values + return [blk.values for blk in self.blocks] def __getstate__(self): block_values = [b.values for b in self.blocks] diff --git a/pandas/tests/groupby/aggregate/test_aggregate.py b/pandas/tests/groupby/aggregate/test_aggregate.py index 276c0adfdb485..96413758e9cb0 100644 --- a/pandas/tests/groupby/aggregate/test_aggregate.py +++ b/pandas/tests/groupby/aggregate/test_aggregate.py @@ -10,6 +10,7 @@ import pytest from pandas.errors import PerformanceWarning +import pandas.util._test_decorators as td from pandas.core.dtypes.common import is_integer_dtype @@ -45,6 +46,7 @@ def test_agg_regression1(tsframe): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) quantile/describe def test_agg_must_agg(df): grouped = df.groupby("A")["C"] @@ -134,6 +136,7 @@ def test_groupby_aggregation_multi_level_column(): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg def test_agg_apply_corner(ts, tsframe): # nothing to group, all NA grouped = ts.groupby(ts * np.nan) @@ -212,6 +215,7 @@ def test_aggregate_str_func(tsframe, groupbyfunc): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg def test_agg_str_with_kwarg_axis_1_raises(df, reduction_func): gb = df.groupby(level=0) if reduction_func in ("idxmax", "idxmin"): @@ -491,6 +495,7 @@ def test_agg_index_has_complex_internals(index): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback def test_agg_split_block(): # https://github.com/pandas-dev/pandas/issues/31522 df = DataFrame( @@ -508,6 +513,7 @@ def test_agg_split_block(): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback def test_agg_split_object_part_datetime(): # https://github.com/pandas-dev/pandas/pull/31616 df = DataFrame( @@ -1199,6 +1205,7 @@ def test_aggregate_datetime_objects(): tm.assert_series_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback def test_aggregate_numeric_object_dtype(): # https://github.com/pandas-dev/pandas/issues/39329 # simplified case: multiple object columns where one is all-NaN diff --git a/pandas/tests/groupby/aggregate/test_cython.py b/pandas/tests/groupby/aggregate/test_cython.py index f9b45f4d9f4cf..4a8aabe41b754 100644 --- a/pandas/tests/groupby/aggregate/test_cython.py +++ b/pandas/tests/groupby/aggregate/test_cython.py @@ -281,7 +281,7 @@ def test_read_only_buffer_source_agg(agg): "species": ["setosa", "setosa", "setosa", "setosa", "setosa"], } ) - df._mgr.blocks[0].values.flags.writeable = False + df._mgr.arrays[0].flags.writeable = False result = df.groupby(["species"]).agg({"sepal_length": agg}) expected = df.copy().groupby(["species"]).agg({"sepal_length": agg}) diff --git a/pandas/tests/groupby/aggregate/test_other.py b/pandas/tests/groupby/aggregate/test_other.py index c566c45b582d7..8dd1ac33bf8ae 100644 --- a/pandas/tests/groupby/aggregate/test_other.py +++ b/pandas/tests/groupby/aggregate/test_other.py @@ -8,6 +8,8 @@ import numpy as np import pytest +import pandas.util._test_decorators as td + import pandas as pd from pandas import ( DataFrame, @@ -412,6 +414,7 @@ def __call__(self, x): tm.assert_frame_equal(result, expected) +@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) columns with ndarrays def test_agg_over_numpy_arrays(): # GH 3788 df = DataFrame(