From 1d1d34c8514117f73840d694826ac826027c53f9 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 6 Dec 2023 13:50:25 +0100 Subject: [PATCH 01/21] test Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 59 ++++++++++++++----- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 20003c6f20d..738169eb75d 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3809,6 +3809,8 @@ def groupby( if not isinstance(by, list): by = [by] + kwargs = kwargs.copy() + kwargs["observed"] = True skip_on_aligning_flag = "__skip_me_on_aligning__" def apply_func(df): # pragma: no cover @@ -3831,9 +3833,10 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) + drop_dupl_categories = True # no need aligning columns if there's only one row partition - if align_result_columns and result._partitions.shape[0] > 1: + if (drop_dupl_categories or align_result_columns) and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column # partition in the result, so we should never hit this exception for now, however # in the future, we might want to make this implementation more broader @@ -3851,20 +3854,38 @@ def apply_func(df): # pragma: no cover def compute_aligned_columns(*dfs): """Take row partitions, filter empty ones, and return joined columns for them.""" - valid_dfs = [ - df - for df in dfs - if not df.attrs.get(skip_on_aligning_flag, False) - ] - if len(valid_dfs) == 0 and len(dfs) != 0: - valid_dfs = dfs - - # Using '.concat()' on empty-slices instead of 'Index.join()' - # in order to get identical behavior to pandas when it joins - # results of different groups - return pandas.concat( - [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" - ).columns + combined_cols = None + mask = None + if align_result_columns: + valid_dfs = [ + df + for df in dfs + if not df.attrs.get(skip_on_aligning_flag, False) + ] + if len(valid_dfs) == 0 and len(dfs) != 0: + valid_dfs = dfs + + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + combined_cols = pandas.concat( + [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" + ).columns + if drop_dupl_categories: + indices = [df.index for df in dfs] + total_index = indices[0].append(indices[1:]) + missing_cats = total_index.categories.difference(total_index.values) + if not kwargs["sort"]: + mask = {len(indices) - 1: missing_cats} + return (combined_cols, mask) + bins = [idx[0] for idx in indices] + parts = (np.digitize(missing_cats, bins) - 1) + parts[parts < 0] = 0 + masks = {idx: [] for idx in np.unique(parts)} + for idx, value in zip(parts, missing_cats): + masks[idx].append(value) + + return (combined_cols, mask) # Passing all partitions to the 'compute_aligned_columns' kernel to get # aligned columns @@ -3873,6 +3894,14 @@ def compute_aligned_columns(*dfs): compute_aligned_columns, *[part._data for part in parts[1:]] ) + def apply_aligned(df, args, partition_idx): + combined_cols, mask = args + if combined_cols is not None: + df = df.reindex(columns=combined_cols) + if mask is not None and mask.get(partition_idx) is not None: + values = mask[partition_idx] + + # Lazily applying aligned columns to partitions new_partitions = self._partition_mgr_cls.lazy_map_partitions( result._partitions, From c00870e13073cf635d42a8c8006f9fc1ae9eac38 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 8 Dec 2023 17:20:38 +0100 Subject: [PATCH 02/21] ss Signed-off-by: Dmitry Chigarev --- .../core/dataframe/pandas/dataframe/dataframe.py | 8 ++++---- .../storage_formats/pandas/query_compiler.py | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 738169eb75d..0bf0fd44549 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3758,7 +3758,8 @@ def groupby( by: Union[str, List[str]], operator: Callable, result_schema: Optional[Dict[Hashable, type]] = None, - align_result_columns=False, + align_result_columns : bool=False, + add_missing_cats : bool=False, **kwargs: dict, ) -> "PandasDataframe": """ @@ -3833,10 +3834,9 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) - drop_dupl_categories = True # no need aligning columns if there's only one row partition - if (drop_dupl_categories or align_result_columns) and result._partitions.shape[0] > 1: + if (add_missing_cats or align_result_columns) and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column # partition in the result, so we should never hit this exception for now, however # in the future, we might want to make this implementation more broader @@ -3871,7 +3871,7 @@ def compute_aligned_columns(*dfs): combined_cols = pandas.concat( [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" ).columns - if drop_dupl_categories: + if add_missing_cats: indices = [df.index for df in dfs] total_index = indices[0].append(indices[1:]) missing_cats = total_index.categories.difference(total_index.values) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 292e696e6da..e07caaea917 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3788,15 +3788,14 @@ def _groupby_shuffle( ) # This check materializes dtypes for 'by' columns - if isinstance(self._modin_frame._dtypes, ModinDtypes): - by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() + if not groupby_kwargs.get("observed", False): + if isinstance(self._modin_frame._dtypes, ModinDtypes): + by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() + else: + by_dtypes = self.dtypes[by] + add_missing_cats = any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes): else: - by_dtypes = self.dtypes[by] - if any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes): - raise NotImplementedError( - "Range-partitioning groupby is not yet supported when grouping on a categorical column. " - + "https://github.com/modin-project/modin/issues/5925" - ) + add_missing_cats = False is_transform = how == "transform" or GroupBy.is_transformation_kernel(agg_func) @@ -3842,6 +3841,7 @@ def agg_func(grp, *args, **kwargs): # that's why we have to align the partition's shapes/labeling across different # row partitions align_result_columns=how == "group_wise", + add_missing_cats=add_missing_cats, **groupby_kwargs, ) result_qc = self.__constructor__(result) From 1e5059ced04dc08ae919da0b8a9989b2ae5af8d4 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 12 Dec 2023 10:40:25 +0100 Subject: [PATCH 03/21] tmp Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 0bf0fd44549..d058e7c7ad6 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3852,7 +3852,7 @@ def apply_func(df): # pragma: no cover # so there should be less stress on the network. if not IsRayCluster.get(): - def compute_aligned_columns(*dfs): + def compute_aligned_columns(*dfs, initial_columns=None): """Take row partitions, filter empty ones, and return joined columns for them.""" combined_cols = None mask = None @@ -3875,6 +3875,7 @@ def compute_aligned_columns(*dfs): indices = [df.index for df in dfs] total_index = indices[0].append(indices[1:]) missing_cats = total_index.categories.difference(total_index.values) + pandas.Categorical(missing_cats) if not kwargs["sort"]: mask = {len(indices) - 1: missing_cats} return (combined_cols, mask) @@ -3896,10 +3897,10 @@ def compute_aligned_columns(*dfs): def apply_aligned(df, args, partition_idx): combined_cols, mask = args - if combined_cols is not None: - df = df.reindex(columns=combined_cols) if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] + if combined_cols is not None: + df = df.reindex(columns=combined_cols) # Lazily applying aligned columns to partitions From 21c2133b88ad43fd92fe2d00b42d404c2d4a6359 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 10 Jan 2024 15:34:19 +0100 Subject: [PATCH 04/21] ss Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index d058e7c7ad6..a5b644fee30 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3875,12 +3875,17 @@ def compute_aligned_columns(*dfs, initial_columns=None): indices = [df.index for df in dfs] total_index = indices[0].append(indices[1:]) missing_cats = total_index.categories.difference(total_index.values) - pandas.Categorical(missing_cats) + cats = pandas.Categorical(missing_cats) + empty_df = pandas.DataFrame(columns=initial_columns) + empty_df = empty_df.astype({by: cats}) + kwargs = kwargs.copy() + kwargs["observed"] = False + missing_values = operator(empty_df.groupby(by, **kwargs)) if not kwargs["sort"]: - mask = {len(indices) - 1: missing_cats} + mask = {len(indices) - 1: missing_values} return (combined_cols, mask) bins = [idx[0] for idx in indices] - parts = (np.digitize(missing_cats, bins) - 1) + parts = (np.digitize(missing_values.index, bins) - 1) parts[parts < 0] = 0 masks = {idx: [] for idx in np.unique(parts)} for idx, value in zip(parts, missing_cats): @@ -3892,7 +3897,7 @@ def compute_aligned_columns(*dfs, initial_columns=None): # aligned columns parts = result._partitions.flatten() aligned_columns = parts[0].apply( - compute_aligned_columns, *[part._data for part in parts[1:]] + compute_aligned_columns, *[part._data for part in parts[1:]], initial_columns=self.columns, ) def apply_aligned(df, args, partition_idx): From c70d3e392ebda20e06888cafc282e93c05ac880a Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 10 Jan 2024 16:50:43 +0100 Subject: [PATCH 05/21] ss Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 65 +++++++++++-------- .../pandas/partitioning/partition_manager.py | 10 ++- .../storage_formats/pandas/query_compiler.py | 7 +- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index a5b644fee30..2d532cf7403 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3834,9 +3834,9 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) - + # breakpoint() # no need aligning columns if there's only one row partition - if (add_missing_cats or align_result_columns) and result._partitions.shape[0] > 1: + if (add_missing_cats or align_result_columns): # and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column # partition in the result, so we should never hit this exception for now, however # in the future, we might want to make this implementation more broader @@ -3855,13 +3855,42 @@ def apply_func(df): # pragma: no cover def compute_aligned_columns(*dfs, initial_columns=None): """Take row partitions, filter empty ones, and return joined columns for them.""" combined_cols = None - mask = None + masks = None + if add_missing_cats: + indices = [df.index for df in dfs] + total_index = indices[0].append(indices[1:]) + missing_cats = total_index.categories.difference(total_index.values) + cats = pandas.CategoricalDtype(missing_cats) + empty_df = pandas.DataFrame(columns=initial_columns) + empty_df = empty_df.astype({col: cats for col in by}) + nonlocal kwargs + kwargs = kwargs.copy() + kwargs["observed"] = False + breakpoint() + missing_values = operator(empty_df.groupby(by, **kwargs)) + if len(missing_values.columns.intersection(by)) == len(by): + missing_values = missing_values.astype({col: total_index.dtype for col in by}) + elif isinstance(missing_values.index, pandas.MultiIndex): + missing_values.index = pandas.MultiIndex.from_frame(missing_values.index.to_frame().astype({col: total_index.dtype for col in by})) + else: + missing_values.index = missing_values.index.astype(total_index.dtype) + if not kwargs["sort"]: + mask = {len(indices) - 1: missing_values} + return (combined_cols, mask) + bins = [idx[0] for idx in indices] + parts = (np.digitize(cats.categories, bins) - 1) + parts[parts < 0] = 0 + masks = {idx: pandas.DataFrame() for idx in np.unique(parts)} + for i, idx in enumerate(parts): + masks[idx] = pandas.concat([masks[idx], missing_values.iloc[[i]]]) if align_result_columns: valid_dfs = [ df for df in dfs if not df.attrs.get(skip_on_aligning_flag, False) ] + if add_missing_cats: + valid_dfs += missing_values if len(valid_dfs) == 0 and len(dfs) != 0: valid_dfs = dfs @@ -3871,27 +3900,8 @@ def compute_aligned_columns(*dfs, initial_columns=None): combined_cols = pandas.concat( [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" ).columns - if add_missing_cats: - indices = [df.index for df in dfs] - total_index = indices[0].append(indices[1:]) - missing_cats = total_index.categories.difference(total_index.values) - cats = pandas.Categorical(missing_cats) - empty_df = pandas.DataFrame(columns=initial_columns) - empty_df = empty_df.astype({by: cats}) - kwargs = kwargs.copy() - kwargs["observed"] = False - missing_values = operator(empty_df.groupby(by, **kwargs)) - if not kwargs["sort"]: - mask = {len(indices) - 1: missing_values} - return (combined_cols, mask) - bins = [idx[0] for idx in indices] - parts = (np.digitize(missing_values.index, bins) - 1) - parts[parts < 0] = 0 - masks = {idx: [] for idx in np.unique(parts)} - for idx, value in zip(parts, missing_cats): - masks[idx].append(value) - - return (combined_cols, mask) + # breakpoint() + return (combined_cols, masks) # Passing all partitions to the 'compute_aligned_columns' kernel to get # aligned columns @@ -3904,15 +3914,18 @@ def apply_aligned(df, args, partition_idx): combined_cols, mask = args if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] + df = pandas.concat([df, values]) if combined_cols is not None: df = df.reindex(columns=combined_cols) - + return df # Lazily applying aligned columns to partitions new_partitions = self._partition_mgr_cls.lazy_map_partitions( result._partitions, - lambda df, columns: df.reindex(columns=columns), + apply_aligned, + # lambda df, columns: df.reindex(columns=columns), func_args=(aligned_columns._data,), + enumerate_partitions=True, ) else: diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 421b58f29a6..d2546aeb74b 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -611,9 +611,7 @@ def map_partitions( @classmethod @wait_computations_if_benchmark_mode - def lazy_map_partitions( - cls, partitions, map_func, func_args=None, func_kwargs=None - ): + def lazy_map_partitions(cls, partitions, map_func, func_args=None, enumerate_partitions=False): """ Apply `map_func` to every partition in `partitions` *lazily*. @@ -639,12 +637,12 @@ def lazy_map_partitions( [ part.add_to_apply_calls( preprocessed_map_func, - *func_args if func_args is not None else (), - **func_kwargs if func_kwargs is not None else {}, + *(tuple() if func_args is None else func_args), + **({"partition_idx": i} if enumerate_partitions else {}) ) for part in row ] - for row in partitions + for i, row in enumerate(partitions) ] ) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index e07caaea917..45c0cf20f68 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3786,14 +3786,15 @@ def _groupby_shuffle( "Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. " + "https://github.com/modin-project/modin/issues/5926" ) - + from pandas.api.extensions import no_default + # breakpoint() # This check materializes dtypes for 'by' columns - if not groupby_kwargs.get("observed", False): + if not groupby_kwargs.get("observed", False) or groupby_kwargs.get("observed", False) is no_default: if isinstance(self._modin_frame._dtypes, ModinDtypes): by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() else: by_dtypes = self.dtypes[by] - add_missing_cats = any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes): + add_missing_cats = any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes) else: add_missing_cats = False From 340ea78e0e6b71299990d0aa1d5e9f3d5cfab0e7 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 12 Jan 2024 10:55:59 +0100 Subject: [PATCH 06/21] ss Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 66 +++++++++++++++---- modin/pandas/test/test_groupby.py | 66 +++++++++++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 2d532cf7403..607bb597289 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3857,32 +3857,66 @@ def compute_aligned_columns(*dfs, initial_columns=None): combined_cols = None masks = None if add_missing_cats: + # TODO-now: + # 1. when grouping on multiple BYs, the result of apply is always considered to be None + # 2. in other cases we can simply sample for the filling value and fill by mask indices = [df.index for df in dfs] + # breakpoint() total_index = indices[0].append(indices[1:]) - missing_cats = total_index.categories.difference(total_index.values) - cats = pandas.CategoricalDtype(missing_cats) + if isinstance(total_index, pandas.MultiIndex): + # TODO: create complete matrix and filter out valid results + missing_cats = {} + for level, name in zip(total_index.levels, total_index.names): + if isinstance(level.dtype, pandas.CategoricalDtype): + missing_cats[name] = level.dtype + else: + missing_cats[name] = pandas.CategoricalDtype(level) + else: + missing_cats = total_index.categories.difference(total_index.values) + missing_cats = {by[0]: pandas.CategoricalDtype(missing_cats)} empty_df = pandas.DataFrame(columns=initial_columns) - empty_df = empty_df.astype({col: cats for col in by}) + empty_df = empty_df.astype(missing_cats) nonlocal kwargs kwargs = kwargs.copy() kwargs["observed"] = False - breakpoint() + # breakpoint() missing_values = operator(empty_df.groupby(by, **kwargs)) - if len(missing_values.columns.intersection(by)) == len(by): - missing_values = missing_values.astype({col: total_index.dtype for col in by}) - elif isinstance(missing_values.index, pandas.MultiIndex): - missing_values.index = pandas.MultiIndex.from_frame(missing_values.index.to_frame().astype({col: total_index.dtype for col in by})) + # breakpoint() + if isinstance(missing_values.index, pandas.MultiIndex): + missing_values = missing_values[~missing_values.index.isin(total_index)] + missing_values.index = pandas.MultiIndex.from_frame( + missing_values.index.to_frame().astype({name: dtype for name, dtype in total_index.dtypes.items()}) + ) else: missing_values.index = missing_values.index.astype(total_index.dtype) if not kwargs["sort"]: mask = {len(indices) - 1: missing_values} return (combined_cols, mask) - bins = [idx[0] for idx in indices] - parts = (np.digitize(cats.categories, bins) - 1) - parts[parts < 0] = 0 + bins = [] + old_bins_to_new = {} + offset = 0 + for i, idx in enumerate(indices[:-1]): + if len(idx) == 0: + offset += 1 + continue + old_bins_to_new[len(bins)] = offset + bins.append(idx.levels[0][-1]) + old_bins_to_new[len(bins)] = offset + breakpoint() + if len(bins) == 0: + parts = np.digitize(missing_values.index.levels[0], bins) + else: + parts = np.zeros(len(missing_values.index.levels[0]), dtype=int) masks = {idx: pandas.DataFrame() for idx in np.unique(parts)} - for i, idx in enumerate(parts): - masks[idx] = pandas.concat([masks[idx], missing_values.iloc[[i]]]) + if isinstance(missing_values.index, pandas.MultiIndex): + frame_idx = missing_values.index.to_frame() + for lvl_val, idx in zip(missing_values.index.levels[0], parts): + masks[idx] = pandas.concat([masks[idx], missing_values[frame_idx.iloc[:, 0] == lvl_val]]) + else: + for i, idx in enumerate(parts): + masks[idx] = pandas.concat([masks[idx], missing_values.iloc[[i]]]) + + masks = {key: masks[value] for key, value in old_bins_to_new.items()} if align_result_columns: valid_dfs = [ df @@ -3914,7 +3948,11 @@ def apply_aligned(df, args, partition_idx): combined_cols, mask = args if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] - df = pandas.concat([df, values]) + if kwargs["sort"]: + # TODO: write search-sorted insertion or sort the result after insertion + df = pandas.concat([df, values]).sort_values(by) + else: + df = pandas.concat([df, values]) if combined_cols is not None: df = df.reindex(columns=combined_cols) return df diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 34493e1ac34..b12b517c891 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3146,3 +3146,69 @@ def test_groupby_agg_provided_callable_warning(): match="In a future version of pandas, the provided callable will be used directly", ): pandas_groupby.agg(func) + +def _apply_transform(df): + if len(df) == 0: + df = df.copy() + df.loc[0] = 10 + return df.squeeze() + return df.sum() + +@pytest.mark.parametrize("observed", [False]) +@pytest.mark.parametrize( + "func", + [ + pytest.param(lambda grp: grp.sum(), id="sum"), + pytest.param(lambda grp: grp.size(), id="size"), + pytest.param(lambda grp: grp.apply(lambda df: df.sum()), id="apply_sum"), + pytest.param(lambda grp: grp.apply(_apply_transform), id="apply_transform"), + ] +) +@pytest.mark.parametrize( + "by_cols, cat_cols", + [ + # ("a", ["a"]), + # ("b", ["b"]), + # ("e", ["e"]), + # (["a", "e"], ["a"]), + # (["a", "e"], ["e"]), + # (["a", "e"], ["a", "e"]), + # (["b", "e"], ["b"]), + # (["b", "e"], ["e"]), + # (["b", "e"], ["b", "e"]), + # (["a", "b", "e"], ["a"]), + # (["a", "b", "e"], ["b"]), + # (["a", "b", "e"], ["e"]), + # (["a", "b", "e"], ["a", "e"]), + (["a", "b", "e"], ["a", "b", "e"]), + ] +) +@pytest.mark.parametrize( + "exclude_values", + [ + pytest.param(lambda row: ~row["a"].isin(["a", "e"]), id="exclude_from_a"), + pytest.param(lambda row: ~row["b"].isin([4]), id="exclude_from_b"), + pytest.param(lambda row: ~row["e"].isin(["x"]), id="exclude_from_e"), + pytest.param(lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]), id="exclude_from_a_b"), + pytest.param(lambda row: ~row["b"].isin([4]) & ~row["e"].isin(["x"]), id="exclude_from_b_e"), + pytest.param(lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]) & ~row["e"].isin(["x"]), id="exclude_from_a_b_e"), + ] +) +def test_range_groupby_categories(observed, func, by_cols, cat_cols, exclude_values): + data = { + "a": ["a", "b", "c", "d", "e", "b", "g", "a"] * 32, + "b": [1, 2, 3, 4] * 64, + "c": range(256), + "d": range(256), + "e": ["x", "y"] * 128, + } + + md_df, pd_df = create_test_dfs(data) + md_df = md_df.astype({col: "category" for col in cat_cols})[exclude_values] + pd_df = pd_df.astype({col: "category" for col in cat_cols})[exclude_values] + + # md_res = func(md_df.groupby(by_cols, observed=observed)) + pd_res = func(pd_df.groupby(by_cols, observed=observed)) + breakpoint() + print("lel") + df_equals(md_res, pd_res) From ac6b454c899d965568178f9af3122f457ca7d3f9 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 12 Jan 2024 14:52:19 +0100 Subject: [PATCH 07/21] as_index=true works Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 123 ++++++++++++------ modin/pandas/test/test_groupby.py | 37 +++--- setup.cfg | 2 +- 3 files changed, 106 insertions(+), 56 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 607bb597289..56adc540df3 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3856,14 +3856,37 @@ def compute_aligned_columns(*dfs, initial_columns=None): """Take row partitions, filter empty ones, and return joined columns for them.""" combined_cols = None masks = None + if align_result_columns: + valid_dfs = [ + df + for df in dfs + if not df.attrs.get(skip_on_aligning_flag, False) + ] + + if len(valid_dfs) == 0 and len(dfs) != 0: + valid_dfs = dfs + + # Using '.concat()' on empty-slices instead of 'Index.join()' + # in order to get identical behavior to pandas when it joins + # results of different groups + combined_cols = pandas.concat( + [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" + ).columns + else: + combined_cols = dfs[0].columns if add_missing_cats: # TODO-now: # 1. when grouping on multiple BYs, the result of apply is always considered to be None # 2. in other cases we can simply sample for the filling value and fill by mask - indices = [df.index for df in dfs] + if kwargs["as_index"]: + indices = [df.index for df in dfs] # breakpoint() - total_index = indices[0].append(indices[1:]) - if isinstance(total_index, pandas.MultiIndex): + total_index = indices[0].append(indices[1:]) + else: + indices = [df[by] for df in dfs] + total_index = pandas.concat(indices).squeeze(axis=1) + # if isinstance(total_index, pandas.MultiIndex): + if total_index.ndim == 2: # TODO: create complete matrix and filter out valid results missing_cats = {} for level, name in zip(total_index.levels, total_index.names): @@ -3873,17 +3896,53 @@ def compute_aligned_columns(*dfs, initial_columns=None): missing_cats[name] = pandas.CategoricalDtype(level) else: missing_cats = total_index.categories.difference(total_index.values) + if len(missing_cats) == 0: + no_missing_cats = True missing_cats = {by[0]: pandas.CategoricalDtype(missing_cats)} - empty_df = pandas.DataFrame(columns=initial_columns) - empty_df = empty_df.astype(missing_cats) nonlocal kwargs - kwargs = kwargs.copy() - kwargs["observed"] = False - # breakpoint() - missing_values = operator(empty_df.groupby(by, **kwargs)) - # breakpoint() + + if isinstance(total_index, pandas.MultiIndex): + complete_index = pandas.MultiIndex.from_product([val.categories for val in missing_cats.values()], names=by) + missing_index = complete_index[~complete_index.isin(total_index)] + else: + missing_index = missing_cats[by[0]].categories + + if len(missing_index) == 0: + return (combined_cols, {}) if align_result_columns else (None, {}) + + if align_result_columns and not isinstance(total_index, pandas.MultiIndex): + # actually execute operator on empty df + empty_df = pandas.DataFrame(columns=initial_columns) + empty_df = empty_df.astype(missing_cats) + # nonlocal kwargs + kwargs = kwargs.copy() + kwargs["observed"] = False + # breakpoint() + missing_values = operator(empty_df.groupby(by, **kwargs)) + missing_values = missing_values.drop(columns=by, errors="ignore") + combined_cols = pandas.concat( + [pandas.DataFrame(columns=combined_cols), missing_values.iloc[:0]], axis=0, join="outer" + ).columns + else: + if align_result_columns: + fill_value = np.NaN + else: + # get fill value by sample + missing_cats_sample = {key: pandas.CategoricalDtype(value.categories[:1]) for key, value in missing_cats.items()} + empty_df = pandas.DataFrame(columns=initial_columns) + empty_df = empty_df.astype(missing_cats_sample) + + kwargs = kwargs.copy() + kwargs["observed"] = False + missing_values = operator(empty_df.groupby(by, **kwargs)) + # breakpoint() + fill_value = missing_values.iloc[0, 0] + # breakpoint() + # breakpoint() + missing_values = pandas.DataFrame(index=missing_index, columns=combined_cols).fillna(fill_value) + if isinstance(missing_values.index, pandas.MultiIndex): - missing_values = missing_values[~missing_values.index.isin(total_index)] + # breakpoint() missing_values.index = pandas.MultiIndex.from_frame( missing_values.index.to_frame().astype({name: dtype for name, dtype in total_index.dtypes.items()}) ) @@ -3900,42 +3959,29 @@ def compute_aligned_columns(*dfs, initial_columns=None): offset += 1 continue old_bins_to_new[len(bins)] = offset - bins.append(idx.levels[0][-1]) + bins.append(idx.levels[0][-1] if isinstance(idx, pandas.MultiIndex) else idx[-1]) old_bins_to_new[len(bins)] = offset - breakpoint() - if len(bins) == 0: - parts = np.digitize(missing_values.index.levels[0], bins) + lvl_zero = missing_values.index if isinstance(missing_values.index, pandas.CategoricalIndex) else missing_values.index.levels[0] + # breakpoint() + if len(bins) != 0: + if pandas.api.types.is_any_real_numeric_dtype(lvl_zero): + parts = np.digitize(lvl_zero, bins, right=True) + else: + parts = np.searchsorted(bins, lvl_zero) else: - parts = np.zeros(len(missing_values.index.levels[0]), dtype=int) + parts = np.zeros(len(lvl_zero), dtype=int) masks = {idx: pandas.DataFrame() for idx in np.unique(parts)} if isinstance(missing_values.index, pandas.MultiIndex): frame_idx = missing_values.index.to_frame() - for lvl_val, idx in zip(missing_values.index.levels[0], parts): + for lvl_val, idx in zip(lvl_zero, parts): masks[idx] = pandas.concat([masks[idx], missing_values[frame_idx.iloc[:, 0] == lvl_val]]) else: for i, idx in enumerate(parts): masks[idx] = pandas.concat([masks[idx], missing_values.iloc[[i]]]) - masks = {key: masks[value] for key, value in old_bins_to_new.items()} - if align_result_columns: - valid_dfs = [ - df - for df in dfs - if not df.attrs.get(skip_on_aligning_flag, False) - ] - if add_missing_cats: - valid_dfs += missing_values - if len(valid_dfs) == 0 and len(dfs) != 0: - valid_dfs = dfs - - # Using '.concat()' on empty-slices instead of 'Index.join()' - # in order to get identical behavior to pandas when it joins - # results of different groups - combined_cols = pandas.concat( - [df.iloc[:0] for df in valid_dfs], axis=0, join="outer" - ).columns + masks = {key + old_bins_to_new[key]: value for key, value in masks.items()} # breakpoint() - return (combined_cols, masks) + return (combined_cols, masks) if align_result_columns else (None, masks) # Passing all partitions to the 'compute_aligned_columns' kernel to get # aligned columns @@ -3948,11 +3994,14 @@ def apply_aligned(df, args, partition_idx): combined_cols, mask = args if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] + # breakpoint() + original_names = df.index.names if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion - df = pandas.concat([df, values]).sort_values(by) + df = pandas.concat([df, values]).sort_index(axis=0) else: df = pandas.concat([df, values]) + df.index.names = original_names if combined_cols is not None: df = df.reindex(columns=combined_cols) return df diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index b12b517c891..bc3e4842981 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3155,6 +3155,7 @@ def _apply_transform(df): return df.sum() @pytest.mark.parametrize("observed", [False]) +@pytest.mark.parametrize("as_index", [True, False]) @pytest.mark.parametrize( "func", [ @@ -3167,19 +3168,19 @@ def _apply_transform(df): @pytest.mark.parametrize( "by_cols, cat_cols", [ - # ("a", ["a"]), - # ("b", ["b"]), - # ("e", ["e"]), - # (["a", "e"], ["a"]), - # (["a", "e"], ["e"]), - # (["a", "e"], ["a", "e"]), - # (["b", "e"], ["b"]), - # (["b", "e"], ["e"]), - # (["b", "e"], ["b", "e"]), - # (["a", "b", "e"], ["a"]), - # (["a", "b", "e"], ["b"]), - # (["a", "b", "e"], ["e"]), - # (["a", "b", "e"], ["a", "e"]), + ("a", ["a"]), + ("b", ["b"]), + ("e", ["e"]), + (["a", "e"], ["a"]), + (["a", "e"], ["e"]), + (["a", "e"], ["a", "e"]), + (["b", "e"], ["b"]), + (["b", "e"], ["e"]), + (["b", "e"], ["b", "e"]), + (["a", "b", "e"], ["a"]), + (["a", "b", "e"], ["b"]), + (["a", "b", "e"], ["e"]), + (["a", "b", "e"], ["a", "e"]), (["a", "b", "e"], ["a", "b", "e"]), ] ) @@ -3194,7 +3195,7 @@ def _apply_transform(df): pytest.param(lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]) & ~row["e"].isin(["x"]), id="exclude_from_a_b_e"), ] ) -def test_range_groupby_categories(observed, func, by_cols, cat_cols, exclude_values): +def test_range_groupby_categories(observed, func, by_cols, cat_cols, exclude_values, as_index): data = { "a": ["a", "b", "c", "d", "e", "b", "g", "a"] * 32, "b": [1, 2, 3, 4] * 64, @@ -3207,8 +3208,8 @@ def test_range_groupby_categories(observed, func, by_cols, cat_cols, exclude_val md_df = md_df.astype({col: "category" for col in cat_cols})[exclude_values] pd_df = pd_df.astype({col: "category" for col in cat_cols})[exclude_values] - # md_res = func(md_df.groupby(by_cols, observed=observed)) - pd_res = func(pd_df.groupby(by_cols, observed=observed)) - breakpoint() - print("lel") + md_res = func(md_df.groupby(by_cols, observed=observed, as_index=as_index)) + pd_res = func(pd_df.groupby(by_cols, observed=observed, as_index=as_index)) + # breakpoint() + # print("lel") df_equals(md_res, pd_res) diff --git a/setup.cfg b/setup.cfg index 3acc554836f..38cc37bc13d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = --cov-config=setup.cfg --cov=modin --cov-append --cov-report= -m "not exclude_by_default" +addopts = xfail_strict=true markers = exclude_in_sanity From 710da6a2836591d639d3c7cc6c18eb698c42d31b Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 15 Jan 2024 14:57:30 +0100 Subject: [PATCH 08/21] wtf Signed-off-by: Dmitry Chigarev --- .../algebra/default2pandas/groupby.py | 2 +- .../dataframe/pandas/dataframe/dataframe.py | 70 ++++++++++++------- .../storage_formats/pandas/query_compiler.py | 30 +++++--- modin/pandas/test/test_groupby.py | 26 ++++--- 4 files changed, 86 insertions(+), 42 deletions(-) diff --git a/modin/core/dataframe/algebra/default2pandas/groupby.py b/modin/core/dataframe/algebra/default2pandas/groupby.py index 8e2e4de062d..f0a6715b1a4 100644 --- a/modin/core/dataframe/algebra/default2pandas/groupby.py +++ b/modin/core/dataframe/algebra/default2pandas/groupby.py @@ -55,7 +55,7 @@ def is_transformation_kernel(agg_func: Any) -> bool: ------- bool """ - return hashable(agg_func) and agg_func in transformation_kernels + return hashable(agg_func) and agg_func in transformation_kernels.union({"nth", "head", "tail"}) @classmethod def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01 diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 56adc540df3..37d013f9773 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3829,7 +3829,7 @@ def apply_func(df): # pragma: no cover # that shouldn't be considered on the aligning phase result.attrs[skip_on_aligning_flag] = True return result - + # breakpoint() result = self._apply_func_to_range_partitioning( key_columns=by, func=apply_func, @@ -3874,19 +3874,19 @@ def compute_aligned_columns(*dfs, initial_columns=None): ).columns else: combined_cols = dfs[0].columns + breakpoint() if add_missing_cats: + nonlocal kwargs # TODO-now: # 1. when grouping on multiple BYs, the result of apply is always considered to be None # 2. in other cases we can simply sample for the filling value and fill by mask if kwargs["as_index"]: indices = [df.index for df in dfs] - # breakpoint() - total_index = indices[0].append(indices[1:]) else: - indices = [df[by] for df in dfs] - total_index = pandas.concat(indices).squeeze(axis=1) + indices = [pandas.MultiIndex.from_frame(df[by]) if len(by) > 1 else pandas.Index(df[by].squeeze(axis=1)) for df in dfs] + total_index = indices[0].append(indices[1:]) # if isinstance(total_index, pandas.MultiIndex): - if total_index.ndim == 2: + if isinstance(total_index, pandas.MultiIndex): # TODO: create complete matrix and filter out valid results missing_cats = {} for level, name in zip(total_index.levels, total_index.names): @@ -3895,11 +3895,11 @@ def compute_aligned_columns(*dfs, initial_columns=None): else: missing_cats[name] = pandas.CategoricalDtype(level) else: + if not isinstance(total_index, pandas.CategoricalIndex): + return (combined_cols, {}) if align_result_columns else (None, {}) missing_cats = total_index.categories.difference(total_index.values) - if len(missing_cats) == 0: - no_missing_cats = True missing_cats = {by[0]: pandas.CategoricalDtype(missing_cats)} - nonlocal kwargs + if isinstance(total_index, pandas.MultiIndex): complete_index = pandas.MultiIndex.from_product([val.categories for val in missing_cats.values()], names=by) @@ -3909,16 +3909,18 @@ def compute_aligned_columns(*dfs, initial_columns=None): if len(missing_index) == 0: return (combined_cols, {}) if align_result_columns else (None, {}) - + # breakpoint() if align_result_columns and not isinstance(total_index, pandas.MultiIndex): # actually execute operator on empty df empty_df = pandas.DataFrame(columns=initial_columns) empty_df = empty_df.astype(missing_cats) # nonlocal kwargs - kwargs = kwargs.copy() - kwargs["observed"] = False + local_kwargs = kwargs.copy() + local_kwargs["observed"] = False + local_kwargs["as_index"] = True + # breakpoint() + missing_values = operator(empty_df.groupby(by, **local_kwargs)) # breakpoint() - missing_values = operator(empty_df.groupby(by, **kwargs)) missing_values = missing_values.drop(columns=by, errors="ignore") combined_cols = pandas.concat( [pandas.DataFrame(columns=combined_cols), missing_values.iloc[:0]], axis=0, join="outer" @@ -3929,17 +3931,32 @@ def compute_aligned_columns(*dfs, initial_columns=None): else: # get fill value by sample missing_cats_sample = {key: pandas.CategoricalDtype(value.categories[:1]) for key, value in missing_cats.items()} - empty_df = pandas.DataFrame(columns=initial_columns) + empty_df = pandas.DataFrame(columns=initial_columns, dtype="float64") empty_df = empty_df.astype(missing_cats_sample) - kwargs = kwargs.copy() - kwargs["observed"] = False - missing_values = operator(empty_df.groupby(by, **kwargs)) - # breakpoint() - fill_value = missing_values.iloc[0, 0] + local_kwargs = kwargs.copy() + local_kwargs["observed"] = False + missing_values = operator(empty_df.groupby(by, **local_kwargs)) + if len(missing_values) == 0: + # potentially incorrect missing value + # breakpoint() + fill_value = np.NaN + else: + try: + fill_value = missing_values.iloc[0, -1] + except: + breakpoint() + print("ss") + # breakpoint() # breakpoint() - missing_values = pandas.DataFrame(index=missing_index, columns=combined_cols).fillna(fill_value) + + if isinstance(combined_cols, pandas.MultiIndex): + cols = combined_cols[~combined_cols.to_frame().iloc[:, 0].isin(by)] + else: + cols = combined_cols.difference(by) + missing_values = pandas.DataFrame(index=missing_index, columns=cols).fillna(fill_value) + missing_values.index.names = by if isinstance(missing_values.index, pandas.MultiIndex): # breakpoint() @@ -3959,7 +3976,8 @@ def compute_aligned_columns(*dfs, initial_columns=None): offset += 1 continue old_bins_to_new[len(bins)] = offset - bins.append(idx.levels[0][-1] if isinstance(idx, pandas.MultiIndex) else idx[-1]) + # breakpoint() + bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1]) old_bins_to_new[len(bins)] = offset lvl_zero = missing_values.index if isinstance(missing_values.index, pandas.CategoricalIndex) else missing_values.index.levels[0] # breakpoint() @@ -3992,15 +4010,19 @@ def compute_aligned_columns(*dfs, initial_columns=None): def apply_aligned(df, args, partition_idx): combined_cols, mask = args + # breakpoint() if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] # breakpoint() + if not kwargs["as_index"]: + values = values.reset_index(drop=False) + # breakpoint() original_names = df.index.names + df = pandas.concat([df, values]) + # breakpoint() if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion - df = pandas.concat([df, values]).sort_index(axis=0) - else: - df = pandas.concat([df, values]) + df = df.sort_index(axis=0) if kwargs["as_index"] else df.sort_values(by) df.index.names = original_names if combined_cols is not None: df = df.reindex(columns=combined_cols) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 45c0cf20f68..1441dd6fcb2 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3781,6 +3781,15 @@ def _groupby_shuffle( all(col in self.columns for col in by) if is_all_labels else False ) + is_transform = how == "transform" or GroupBy.is_transformation_kernel(agg_func) + + if is_transform: + # https://github.com/modin-project/modin/issues/5924 + ErrorMessage.missmatch_with_pandas( + operation="range-partitioning groupby", + message="the order of rows may be shuffled for the result", + ) + if not is_all_column_names: raise NotImplementedError( "Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. " @@ -3789,7 +3798,7 @@ def _groupby_shuffle( from pandas.api.extensions import no_default # breakpoint() # This check materializes dtypes for 'by' columns - if not groupby_kwargs.get("observed", False) or groupby_kwargs.get("observed", False) is no_default: + if not is_transform and (not groupby_kwargs.get("observed", False) or groupby_kwargs.get("observed", False) is no_default): if isinstance(self._modin_frame._dtypes, ModinDtypes): by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() else: @@ -3798,13 +3807,12 @@ def _groupby_shuffle( else: add_missing_cats = False - is_transform = how == "transform" or GroupBy.is_transformation_kernel(agg_func) - - if is_transform: - # https://github.com/modin-project/modin/issues/5924 - ErrorMessage.missmatch_with_pandas( - operation="range-partitioning groupby", - message="the order of rows may be shuffled for the result", + if add_missing_cats and not groupby_kwargs.get("as_index", True): + raise NotImplementedError( + "Range-partitioning groupby is not implemented for grouping on categorical columns with " + + "the following set of parameters \{'as_index': False, 'observed': False\}. Change either 'as_index' " + + "or 'observed' to True and try again. " + + "https://github.com/modin-project/modin/issues/5926" ) if isinstance(agg_func, dict): @@ -3848,7 +3856,11 @@ def agg_func(grp, *args, **kwargs): result_qc = self.__constructor__(result) if not is_transform and not groupby_kwargs.get("as_index", True): - return result_qc.reset_index(drop=True) + try: + return result_qc.reset_index(drop=True) + except: + breakpoint() + print("sas") return result_qc diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index bc3e4842981..d9ca08f471f 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -106,6 +106,10 @@ ] +def df_equals_fillna(df1, df2, fill_value=0): + df_equals(df1.fillna(fill_value), df2.fillna(fill_value)) + + def modin_groupby_equals_pandas(modin_groupby, pandas_groupby): eval_general( modin_groupby, pandas_groupby, lambda grp: grp.indices, comparator=dict_equals @@ -452,7 +456,9 @@ def maybe_get_columns(df, by): lambda df: df.sem(), modin_df_almost_equals_pandas, ) + # breakpoint() eval_mean(modin_groupby, pandas_groupby, numeric_only=True) + eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.idxmax()) @@ -481,7 +487,7 @@ def maybe_get_columns(df, by): ] for func in apply_functions: eval_apply(modin_groupby, pandas_groupby, func) - + eval_dtypes(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.first()) eval_general(modin_groupby, pandas_groupby, lambda df: df.bfill()) @@ -514,18 +520,20 @@ def maybe_get_columns(df, by): # because of this bug: https://github.com/pandas-dev/pandas/issues/36698 # Modin correctly processes the result, that's why `check_exception_type=None` in some cases is_pandas_bug_case = not as_index and col1_category and isinstance(func, dict) - + # breakpoint() eval_general( modin_groupby, pandas_groupby, lambda grp: grp.agg(func), check_exception_type=None if is_pandas_bug_case else True, + comparator=df_equals_fillna, ) eval_general( modin_groupby, pandas_groupby, lambda grp: grp.aggregate(func), check_exception_type=None if is_pandas_bug_case else True, + comparator=df_equals_fillna, ) eval_general(modin_groupby, pandas_groupby, lambda df: df.last()) @@ -618,6 +626,7 @@ def maybe_get_columns(df, by): if isinstance(by, list) else ["col3", "col4"] ) + # breakpoint() eval___getitem__(modin_groupby, pandas_groupby, non_by_cols) # When GroupBy.__getitem__ meets an intersection of the selection and 'by' columns # it throws a warning with the suggested workaround. The following code tests @@ -1243,8 +1252,8 @@ def eval_cummin(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only ) -def eval_apply(modin_groupby, pandas_groupby, func): - df_equals(modin_groupby.apply(func), pandas_groupby.apply(func)) +def eval_apply(modin_groupby, pandas_groupby, func, comparator=df_equals): + comparator(modin_groupby.apply(func), pandas_groupby.apply(func)) def eval_dtypes(modin_groupby, pandas_groupby): @@ -1413,7 +1422,7 @@ def test(grp): return res return test - + md_grp[item].agg(["mean"]) eval_general( md_grp, pd_grp, @@ -2432,7 +2441,8 @@ def test_groupby_sort(sort, is_categorical_by): eval_general(md_grp, pd_grp, lambda grp: grp.sum(numeric_only=True)) eval_general(md_grp, pd_grp, lambda grp: grp.size()) eval_general(md_grp, pd_grp, lambda grp: grp.agg(lambda df: df.mean())) - eval_general(md_grp, pd_grp, lambda grp: grp.dtypes) + # breakpoint() + # eval_general(md_grp, pd_grp, lambda grp: grp.dtypes) eval_general(md_grp, pd_grp, lambda grp: grp.first()) @@ -2978,7 +2988,7 @@ def test_groupby_apply_series_result(modify_config): np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)] ) df["group"] = [1, 1, 2, 2, 3] - + # breakpoint() # res = df.groupby('group').apply(lambda x: x.name+2) eval_general( df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2) @@ -3155,7 +3165,7 @@ def _apply_transform(df): return df.sum() @pytest.mark.parametrize("observed", [False]) -@pytest.mark.parametrize("as_index", [True, False]) +@pytest.mark.parametrize("as_index", [True]) @pytest.mark.parametrize( "func", [ From c8300a1b4fd2268dc17e1bd6dfc930b213080f25 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 17 Jan 2024 11:32:42 +0100 Subject: [PATCH 09/21] allmost all tests are passing Signed-off-by: Dmitry Chigarev --- .../algebra/default2pandas/groupby.py | 4 +- .../dataframe/pandas/dataframe/dataframe.py | 165 +++--------------- .../core/dataframe/pandas/dataframe/utils.py | 164 +++++++++++++++++ .../pandas/partitioning/partition_manager.py | 6 +- .../storage_formats/pandas/query_compiler.py | 10 +- modin/pandas/test/test_groupby.py | 41 +++-- 6 files changed, 238 insertions(+), 152 deletions(-) diff --git a/modin/core/dataframe/algebra/default2pandas/groupby.py b/modin/core/dataframe/algebra/default2pandas/groupby.py index f0a6715b1a4..a1d8a603b1a 100644 --- a/modin/core/dataframe/algebra/default2pandas/groupby.py +++ b/modin/core/dataframe/algebra/default2pandas/groupby.py @@ -55,7 +55,9 @@ def is_transformation_kernel(agg_func: Any) -> bool: ------- bool """ - return hashable(agg_func) and agg_func in transformation_kernels.union({"nth", "head", "tail"}) + return hashable(agg_func) and agg_func in transformation_kernels.union( + {"nth", "head", "tail"} + ) @classmethod def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01 diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 37d013f9773..efa5a2121c9 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -33,6 +33,7 @@ from modin.core.dataframe.pandas.dataframe.utils import ( ShuffleSortFunctions, lazy_metadata_decorator, + add_missing_categories_to_groupby, ) from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, @@ -3758,8 +3759,8 @@ def groupby( by: Union[str, List[str]], operator: Callable, result_schema: Optional[Dict[Hashable, type]] = None, - align_result_columns : bool=False, - add_missing_cats : bool=False, + align_result_columns: bool = False, + add_missing_cats: bool = False, **kwargs: dict, ) -> "PandasDataframe": """ @@ -3829,6 +3830,7 @@ def apply_func(df): # pragma: no cover # that shouldn't be considered on the aligning phase result.attrs[skip_on_aligning_flag] = True return result + # breakpoint() result = self._apply_func_to_range_partitioning( key_columns=by, @@ -3836,7 +3838,9 @@ def apply_func(df): # pragma: no cover ) # breakpoint() # no need aligning columns if there's only one row partition - if (add_missing_cats or align_result_columns): # and result._partitions.shape[0] > 1: + if ( + add_missing_cats or align_result_columns + ): # and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column # partition in the result, so we should never hit this exception for now, however # in the future, we might want to make this implementation more broader @@ -3851,6 +3855,7 @@ def apply_func(df): # pragma: no cover # 2. The second one works slower, but only gathers light pandas.Index objects, # so there should be less stress on the network. if not IsRayCluster.get(): + original_dtypes = self.dtypes if self.has_materialized_dtypes else None def compute_aligned_columns(*dfs, initial_columns=None): """Take row partitions, filter empty ones, and return joined columns for them.""" @@ -3874,155 +3879,43 @@ def compute_aligned_columns(*dfs, initial_columns=None): ).columns else: combined_cols = dfs[0].columns - breakpoint() + if add_missing_cats: - nonlocal kwargs - # TODO-now: - # 1. when grouping on multiple BYs, the result of apply is always considered to be None - # 2. in other cases we can simply sample for the filling value and fill by mask - if kwargs["as_index"]: - indices = [df.index for df in dfs] - else: - indices = [pandas.MultiIndex.from_frame(df[by]) if len(by) > 1 else pandas.Index(df[by].squeeze(axis=1)) for df in dfs] - total_index = indices[0].append(indices[1:]) - # if isinstance(total_index, pandas.MultiIndex): - if isinstance(total_index, pandas.MultiIndex): - # TODO: create complete matrix and filter out valid results - missing_cats = {} - for level, name in zip(total_index.levels, total_index.names): - if isinstance(level.dtype, pandas.CategoricalDtype): - missing_cats[name] = level.dtype - else: - missing_cats[name] = pandas.CategoricalDtype(level) - else: - if not isinstance(total_index, pandas.CategoricalIndex): - return (combined_cols, {}) if align_result_columns else (None, {}) - missing_cats = total_index.categories.difference(total_index.values) - missing_cats = {by[0]: pandas.CategoricalDtype(missing_cats)} - - - if isinstance(total_index, pandas.MultiIndex): - complete_index = pandas.MultiIndex.from_product([val.categories for val in missing_cats.values()], names=by) - missing_index = complete_index[~complete_index.isin(total_index)] - else: - missing_index = missing_cats[by[0]].categories - - if len(missing_index) == 0: - return (combined_cols, {}) if align_result_columns else (None, {}) - # breakpoint() - if align_result_columns and not isinstance(total_index, pandas.MultiIndex): - # actually execute operator on empty df - empty_df = pandas.DataFrame(columns=initial_columns) - empty_df = empty_df.astype(missing_cats) - # nonlocal kwargs - local_kwargs = kwargs.copy() - local_kwargs["observed"] = False - local_kwargs["as_index"] = True - # breakpoint() - missing_values = operator(empty_df.groupby(by, **local_kwargs)) - # breakpoint() - missing_values = missing_values.drop(columns=by, errors="ignore") - combined_cols = pandas.concat( - [pandas.DataFrame(columns=combined_cols), missing_values.iloc[:0]], axis=0, join="outer" - ).columns - else: - if align_result_columns: - fill_value = np.NaN - else: - # get fill value by sample - missing_cats_sample = {key: pandas.CategoricalDtype(value.categories[:1]) for key, value in missing_cats.items()} - empty_df = pandas.DataFrame(columns=initial_columns, dtype="float64") - empty_df = empty_df.astype(missing_cats_sample) - - local_kwargs = kwargs.copy() - local_kwargs["observed"] = False - missing_values = operator(empty_df.groupby(by, **local_kwargs)) - if len(missing_values) == 0: - # potentially incorrect missing value - # breakpoint() - fill_value = np.NaN - else: - try: - fill_value = missing_values.iloc[0, -1] - except: - breakpoint() - print("ss") - - # breakpoint() - # breakpoint() - - if isinstance(combined_cols, pandas.MultiIndex): - cols = combined_cols[~combined_cols.to_frame().iloc[:, 0].isin(by)] - else: - cols = combined_cols.difference(by) - missing_values = pandas.DataFrame(index=missing_index, columns=cols).fillna(fill_value) - missing_values.index.names = by - - if isinstance(missing_values.index, pandas.MultiIndex): - # breakpoint() - missing_values.index = pandas.MultiIndex.from_frame( - missing_values.index.to_frame().astype({name: dtype for name, dtype in total_index.dtypes.items()}) - ) - else: - missing_values.index = missing_values.index.astype(total_index.dtype) - if not kwargs["sort"]: - mask = {len(indices) - 1: missing_values} - return (combined_cols, mask) - bins = [] - old_bins_to_new = {} - offset = 0 - for i, idx in enumerate(indices[:-1]): - if len(idx) == 0: - offset += 1 - continue - old_bins_to_new[len(bins)] = offset - # breakpoint() - bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1]) - old_bins_to_new[len(bins)] = offset - lvl_zero = missing_values.index if isinstance(missing_values.index, pandas.CategoricalIndex) else missing_values.index.levels[0] - # breakpoint() - if len(bins) != 0: - if pandas.api.types.is_any_real_numeric_dtype(lvl_zero): - parts = np.digitize(lvl_zero, bins, right=True) - else: - parts = np.searchsorted(bins, lvl_zero) - else: - parts = np.zeros(len(lvl_zero), dtype=int) - masks = {idx: pandas.DataFrame() for idx in np.unique(parts)} - if isinstance(missing_values.index, pandas.MultiIndex): - frame_idx = missing_values.index.to_frame() - for lvl_val, idx in zip(lvl_zero, parts): - masks[idx] = pandas.concat([masks[idx], missing_values[frame_idx.iloc[:, 0] == lvl_val]]) - else: - for i, idx in enumerate(parts): - masks[idx] = pandas.concat([masks[idx], missing_values.iloc[[i]]]) - - masks = {key + old_bins_to_new[key]: value for key, value in masks.items()} - # breakpoint() - return (combined_cols, masks) if align_result_columns else (None, masks) + masks, combined_cols = add_missing_categories_to_groupby( + dfs, + by, + operator, + initial_columns, + combined_cols, + is_udf_agg=align_result_columns, + kwargs=kwargs.copy(), + initial_dtypes=original_dtypes, + ) + return ( + (combined_cols, masks) + if align_result_columns + else (None, masks) + ) # Passing all partitions to the 'compute_aligned_columns' kernel to get # aligned columns parts = result._partitions.flatten() aligned_columns = parts[0].apply( - compute_aligned_columns, *[part._data for part in parts[1:]], initial_columns=self.columns, + compute_aligned_columns, + *[part._data for part in parts[1:]], + initial_columns=self.columns, ) def apply_aligned(df, args, partition_idx): combined_cols, mask = args - # breakpoint() if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] - # breakpoint() - if not kwargs["as_index"]: - values = values.reset_index(drop=False) - # breakpoint() + original_names = df.index.names df = pandas.concat([df, values]) - # breakpoint() if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion - df = df.sort_index(axis=0) if kwargs["as_index"] else df.sort_values(by) + df = df.sort_index(axis=0) df.index.names = original_names if combined_cols is not None: df = df.reindex(columns=combined_cols) diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 6bb84df49bb..6fdf226c1a2 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -338,6 +338,7 @@ def split_partitions_using_pivots_for_sort( tuple[pandas.DataFrame] A tuple of the splits from this partition. """ + # breakpoint() if len(columns_info) == 0: # We can return the dataframe with zero changes if there were no pivots passed return (df,) @@ -519,3 +520,166 @@ def run_f_on_minimally_updated_metadata(self, *args, **kwargs): return run_f_on_minimally_updated_metadata return decorator + + +def add_missing_categories_to_groupby( + dfs, by, operator, initial_columns, combined_cols, is_udf_agg, kwargs, initial_dtypes=None +): + kwargs["observed"] = False + new_combined_cols = combined_cols + + ### At first we need to compute missing categorical values + indices = [df.index for df in dfs] + # total_index contains all categorical values that resided in the result, + # missing values are computed differently depending on whether we're grouping + # on multiple groupers or not + total_index = indices[0].append(indices[1:]) + if isinstance(total_index, pandas.MultiIndex): + if all(not isinstance(level, pandas.CategoricalIndex) for level in total_index.levels): + return {} + missing_cats_dtype = { + name: level.dtype + if isinstance(level.dtype, pandas.CategoricalDtype) + # it's a bit confusing but we have to convert the remaining columns to categoricals + # in order to compute a proper fill value later in the code + else pandas.CategoricalDtype(level) + for level, name in zip(total_index.levels, total_index.names) + } + # if we're grouping on multiple groupers, then the missing categorical values is a + # carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers) + complete_index = pandas.MultiIndex.from_product( + [value.categories for value in missing_cats_dtype.values()], + names=by, + ) + missing_index = complete_index[~complete_index.isin(total_index)] + else: + if not isinstance(total_index, pandas.CategoricalIndex): + return {}, new_combined_cols + # if we're grouping on a single grouper then we simply compute the difference + # between categorical values in the result and the values defined in categorical dtype + missing_index = total_index.categories.difference(total_index.values) + missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)} + missing_index.names = by + + if len(missing_index) == 0: + return {}, new_combined_cols + # breakpoint() + ### At this stage we want to get a fill_value for missing categorical values + if is_udf_agg and isinstance(total_index, pandas.MultiIndex): + # if grouping on multiple columns and aggregating with an UDF, then the + # fill value is always `np.NaN` + missing_values = pandas.DataFrame({0: [np.NaN]}) + else: + # In case of a udf aggregation we're forced to run the operator against each + # missing category, as in theory it can return different results for each + # empty group. In other cases it's enough to run the operator against a single + # missing categorical and then broadcast the fill value to each missing value + if not is_udf_agg: + missing_cats_dtype = { + key: pandas.CategoricalDtype(value.categories[:1]) + for key, value in missing_cats_dtype.items() + } + + empty_df = pandas.DataFrame(columns=initial_columns) + # HACK: default 'object' dtype doesn't fit our needs, as most of the aggregations + # fail on a non-numeric columns, ideally, we need dtypes of the original dataframe, + # however, 'int64' also works fine here if the original schema is not available + empty_df = empty_df.astype("int64" if initial_dtypes is None else initial_dtypes) + empty_df = empty_df.astype(missing_cats_dtype) + missing_values = operator(empty_df.groupby(by, **kwargs)) + + if is_udf_agg and not isinstance(total_index, pandas.MultiIndex): + missing_values = missing_values.drop( + columns=by, errors="ignore" + ) + new_combined_cols = pandas.concat( + [ + pandas.DataFrame(columns=combined_cols), + missing_values.iloc[:0], + ], + axis=0, + join="outer", + ).columns + else: + # If the aggregation has failed, the result would be empty. Assuming the + # fill value to be `np.NaN` here (this may not always be correct!!!) + fill_value = np.NaN if len(missing_values) == 0 else missing_values.iloc[0, 0] + missing_values = pandas.DataFrame(index=missing_index, columns=combined_cols).fillna( + fill_value + ) + + # restoring original categorical dtypes for the indices + if isinstance(missing_values.index, pandas.MultiIndex): + # MultiIndex.astype() only takes a single dtype, the only way to cast + # individual levels to different dtypes is to convert MI to DF do the + # casting then + missing_values.index = pandas.MultiIndex.from_frame( + missing_values.index.to_frame().astype( + {name: dtype for name, dtype in total_index.dtypes.items()} + ) + ) + else: + missing_values.index = missing_values.index.astype(total_index.dtype) + + ### Then we decide to which missing categorical values should go to which partition + if not kwargs["sort"]: + # If the result is allowed to be unsorted, simply insert all the missing + # categories to the last partition + mask = {len(indices) - 1: missing_values} + return mask, new_combined_cols + + # If the result has to be sorted, we have to assign missing categoricals to proper partitions. + # For that purpose we define bins with corner values of each partition and then using either + # np.digitize or np.searchsorted find correct bins for each missing categorical value. + # Example: part0-> [0, 1, 2]; part1-> [3, 4, 10, 12]; part2-> [15, 17, 20, 100] + # bins -> [2, 12] # took last values of each partition excluding the last partition + # (every value that's matching 'x > part[-2][-1]' should go to the + # last partition, meaning that including the last value of the last + # partitions doesn't make sense) + # missing_cats -> [-2, 5, 6, 14, 21, 120] + # np.digitize(missing_cats, bins) -> [ 0, 1, 1, 2, 2, 2] + # ^-- mapping between values and partition idx to insert + bins = [] + old_bins_to_new = {} + offset = 0 + # building bins by taking last values of each partition excluding the last partition + for idx in indices[:-1]: + if len(idx) == 0: + # if a partition is empty, we can't use its values to define a bin, thus we simply + # skip it and remember the number of skipped partitions as an 'offset' + offset += 1 + continue + # remember the number of skipped partitions before this bin, in order to restore original + # indexing at the end + old_bins_to_new[len(bins)] = offset + # for MultiIndices we always use the very first level for bins as using multiple levels + # doesn't affect the result + bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1]) + old_bins_to_new[len(bins)] = offset + + if len(bins) == 0: + # insert values to the first non-empty partition + return {old_bins_to_new.get(0, 0): missing_values}, new_combined_cols + + # we used the very first level of MultiIndex to build bins, meaning that we also have + # to use values of the first index's level for 'digitize' + lvl_zero = ( + missing_values.index.levels[0] + if isinstance(missing_values.index, pandas.MultiIndex) + else missing_values.index + ) + if pandas.api.types.is_any_real_numeric_dtype(lvl_zero): + part_idx = np.digitize(lvl_zero, bins, right=True) + else: + part_idx = np.searchsorted(bins, lvl_zero) + + ### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals + ### to be inserted into this partition + masks = {} + frame_idx = missing_values.index.to_frame() + for idx, values in lvl_zero.groupby(part_idx).items(): + masks[idx] = missing_values[frame_idx.iloc[:, 0].isin(values)] + + # Restore the original indexing by adding the amount of skipped missing partitions + masks = {key + old_bins_to_new[key]: value for key, value in masks.items()} + return masks, new_combined_cols diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index d2546aeb74b..ce7348d9da9 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -611,7 +611,9 @@ def map_partitions( @classmethod @wait_computations_if_benchmark_mode - def lazy_map_partitions(cls, partitions, map_func, func_args=None, enumerate_partitions=False): + def lazy_map_partitions( + cls, partitions, map_func, func_args=None, enumerate_partitions=False + ): """ Apply `map_func` to every partition in `partitions` *lazily*. @@ -638,7 +640,7 @@ def lazy_map_partitions(cls, partitions, map_func, func_args=None, enumerate_par part.add_to_apply_calls( preprocessed_map_func, *(tuple() if func_args is None else func_args), - **({"partition_idx": i} if enumerate_partitions else {}) + **({"partition_idx": i} if enumerate_partitions else {}), ) for part in row ] diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 1441dd6fcb2..7ba98593627 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3796,14 +3796,20 @@ def _groupby_shuffle( + "https://github.com/modin-project/modin/issues/5926" ) from pandas.api.extensions import no_default + # breakpoint() # This check materializes dtypes for 'by' columns - if not is_transform and (not groupby_kwargs.get("observed", False) or groupby_kwargs.get("observed", False) is no_default): + if not is_transform and ( + not groupby_kwargs.get("observed", False) + or groupby_kwargs.get("observed", False) is no_default + ): if isinstance(self._modin_frame._dtypes, ModinDtypes): by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() else: by_dtypes = self.dtypes[by] - add_missing_cats = any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes) + add_missing_cats = any( + isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes + ) else: add_missing_cats = False diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index d9ca08f471f..625b3cc3209 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -458,7 +458,7 @@ def maybe_get_columns(df, by): ) # breakpoint() eval_mean(modin_groupby, pandas_groupby, numeric_only=True) - + eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.idxmax()) @@ -487,7 +487,7 @@ def maybe_get_columns(df, by): ] for func in apply_functions: eval_apply(modin_groupby, pandas_groupby, func) - + eval_dtypes(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.first()) eval_general(modin_groupby, pandas_groupby, lambda df: df.bfill()) @@ -1422,6 +1422,7 @@ def test(grp): return res return test + md_grp[item].agg(["mean"]) eval_general( md_grp, @@ -2441,8 +2442,7 @@ def test_groupby_sort(sort, is_categorical_by): eval_general(md_grp, pd_grp, lambda grp: grp.sum(numeric_only=True)) eval_general(md_grp, pd_grp, lambda grp: grp.size()) eval_general(md_grp, pd_grp, lambda grp: grp.agg(lambda df: df.mean())) - # breakpoint() - # eval_general(md_grp, pd_grp, lambda grp: grp.dtypes) + eval_general(md_grp, pd_grp, lambda grp: grp.dtypes) eval_general(md_grp, pd_grp, lambda grp: grp.first()) @@ -3157,6 +3157,7 @@ def test_groupby_agg_provided_callable_warning(): ): pandas_groupby.agg(func) + def _apply_transform(df): if len(df) == 0: df = df.copy() @@ -3164,6 +3165,10 @@ def _apply_transform(df): return df.squeeze() return df.sum() + +import os + + @pytest.mark.parametrize("observed", [False]) @pytest.mark.parametrize("as_index", [True]) @pytest.mark.parametrize( @@ -3173,7 +3178,7 @@ def _apply_transform(df): pytest.param(lambda grp: grp.size(), id="size"), pytest.param(lambda grp: grp.apply(lambda df: df.sum()), id="apply_sum"), pytest.param(lambda grp: grp.apply(_apply_transform), id="apply_transform"), - ] + ], ) @pytest.mark.parametrize( "by_cols, cat_cols", @@ -3192,7 +3197,7 @@ def _apply_transform(df): (["a", "b", "e"], ["e"]), (["a", "b", "e"], ["a", "e"]), (["a", "b", "e"], ["a", "b", "e"]), - ] + ], ) @pytest.mark.parametrize( "exclude_values", @@ -3200,12 +3205,26 @@ def _apply_transform(df): pytest.param(lambda row: ~row["a"].isin(["a", "e"]), id="exclude_from_a"), pytest.param(lambda row: ~row["b"].isin([4]), id="exclude_from_b"), pytest.param(lambda row: ~row["e"].isin(["x"]), id="exclude_from_e"), - pytest.param(lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]), id="exclude_from_a_b"), - pytest.param(lambda row: ~row["b"].isin([4]) & ~row["e"].isin(["x"]), id="exclude_from_b_e"), - pytest.param(lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]) & ~row["e"].isin(["x"]), id="exclude_from_a_b_e"), - ] + pytest.param( + lambda row: ~row["a"].isin(["a", "e"]) & ~row["b"].isin([4]), + id="exclude_from_a_b", + ), + pytest.param( + lambda row: ~row["b"].isin([4]) & ~row["e"].isin(["x"]), + id="exclude_from_b_e", + ), + pytest.param( + lambda row: ~row["a"].isin(["a", "e"]) + & ~row["b"].isin([4]) + & ~row["e"].isin(["x"]), + id="exclude_from_a_b_e", + ), + ], ) -def test_range_groupby_categories(observed, func, by_cols, cat_cols, exclude_values, as_index): +def test_range_groupby_categories( + observed, func, by_cols, cat_cols, exclude_values, as_index +): + np.random.seed(int(os.environ.get("TEST_SEED", 0))) data = { "a": ["a", "b", "c", "d", "e", "b", "g", "a"] * 32, "b": [1, 2, 3, 4] * 64, From c4d4905c676afe890a27359785a3607b6e359818 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 17 Jan 2024 13:30:37 +0100 Subject: [PATCH 10/21] TESTS WORKgit add -u! Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 8 ++------ modin/core/dataframe/pandas/utils.py | 2 ++ modin/pandas/test/test_groupby.py | 14 ++++++-------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index efa5a2121c9..ca4ebd77a22 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3831,16 +3831,12 @@ def apply_func(df): # pragma: no cover result.attrs[skip_on_aligning_flag] = True return result - # breakpoint() result = self._apply_func_to_range_partitioning( key_columns=by, func=apply_func, ) - # breakpoint() # no need aligning columns if there's only one row partition - if ( - add_missing_cats or align_result_columns - ): # and result._partitions.shape[0] > 1: + if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column # partition in the result, so we should never hit this exception for now, however # in the future, we might want to make this implementation more broader @@ -3854,7 +3850,7 @@ def apply_func(df): # pragma: no cover # it gathers all the dataframes in a single ray-kernel. # 2. The second one works slower, but only gathers light pandas.Index objects, # so there should be less stress on the network. - if not IsRayCluster.get(): + if add_missing_cats or not IsRayCluster.get(): original_dtypes = self.dtypes if self.has_materialized_dtypes else None def compute_aligned_columns(*dfs, initial_columns=None): diff --git a/modin/core/dataframe/pandas/utils.py b/modin/core/dataframe/pandas/utils.py index c1703d6f2db..019d8470727 100644 --- a/modin/core/dataframe/pandas/utils.py +++ b/modin/core/dataframe/pandas/utils.py @@ -38,6 +38,8 @@ def concatenate(dfs): assert df.columns.equals(dfs[0].columns) for i in dfs[0].columns.get_indexer_for(dfs[0].select_dtypes("category").columns): columns = [df.iloc[:, i] for df in dfs] + if not all(isinstance(col.dtype, pandas.CategoricalDtype) for col in columns): + continue union = union_categoricals(columns) for df in dfs: df.isetitem( diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 625b3cc3209..662fa94c208 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3165,10 +3165,9 @@ def _apply_transform(df): return df.squeeze() return df.sum() - -import os - - +@pytest.mark.parametrize( + "modify_config", [{RangePartitioningGroupby: True}], indirect=True +) @pytest.mark.parametrize("observed", [False]) @pytest.mark.parametrize("as_index", [True]) @pytest.mark.parametrize( @@ -3222,9 +3221,10 @@ def _apply_transform(df): ], ) def test_range_groupby_categories( - observed, func, by_cols, cat_cols, exclude_values, as_index + observed, func, by_cols, cat_cols, exclude_values, as_index, modify_config ): - np.random.seed(int(os.environ.get("TEST_SEED", 0))) + # HACK: there's a bug + np.random.seed(0) data = { "a": ["a", "b", "c", "d", "e", "b", "g", "a"] * 32, "b": [1, 2, 3, 4] * 64, @@ -3239,6 +3239,4 @@ def test_range_groupby_categories( md_res = func(md_df.groupby(by_cols, observed=observed, as_index=as_index)) pd_res = func(pd_df.groupby(by_cols, observed=observed, as_index=as_index)) - # breakpoint() - # print("lel") df_equals(md_res, pd_res) From 97399103c5acdeaba6039cebbcb2e7d601e9dfe2 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 17 Jan 2024 17:44:06 +0100 Subject: [PATCH 11/21] version 1 is ready Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 8 +++ .../core/dataframe/pandas/dataframe/utils.py | 56 ++++++++++++------- modin/pandas/test/test_groupby.py | 1 + 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index ca4ebd77a22..2fdcffbc2c7 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -18,6 +18,8 @@ for pandas storage format. """ import datetime +from timeit import default_timer as timer +from collections import OrderedDict from typing import TYPE_CHECKING, Callable, Dict, Hashable, List, Optional, Union import numpy as np @@ -3904,14 +3906,20 @@ def compute_aligned_columns(*dfs, initial_columns=None): def apply_aligned(df, args, partition_idx): combined_cols, mask = args + t1 = timer() if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] original_names = df.index.names + # values = pandas.DataFrame(np.NaN, index=values.index, columns=df.columns) df = pandas.concat([df, values]) + + print("concating", timer() - t1) + t1 = timer() if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion df = df.sort_index(axis=0) + print("sorting", timer() - t1) df.index.names = original_names if combined_cols is not None: df = df.reindex(columns=combined_cols) diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 6fdf226c1a2..a1de82ff8bc 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -521,10 +521,11 @@ def run_f_on_minimally_updated_metadata(self, *args, **kwargs): return decorator - +from timeit import default_timer as timer def add_missing_categories_to_groupby( dfs, by, operator, initial_columns, combined_cols, is_udf_agg, kwargs, initial_dtypes=None ): + t1 = timer() kwargs["observed"] = False new_combined_cols = combined_cols @@ -540,15 +541,16 @@ def add_missing_categories_to_groupby( missing_cats_dtype = { name: level.dtype if isinstance(level.dtype, pandas.CategoricalDtype) - # it's a bit confusing but we have to convert the remaining columns to categoricals + # it's a bit confusing but we have to convert the remaining 'by' columns to categoricals # in order to compute a proper fill value later in the code else pandas.CategoricalDtype(level) for level, name in zip(total_index.levels, total_index.names) } # if we're grouping on multiple groupers, then the missing categorical values is a # carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers) + # breakpoint() complete_index = pandas.MultiIndex.from_product( - [value.categories for value in missing_cats_dtype.values()], + [value.categories.astype(total_level.dtype) for total_level, value in zip(total_index.levels, missing_cats_dtype.values())], names=by, ) missing_index = complete_index[~complete_index.isin(total_index)] @@ -560,7 +562,9 @@ def add_missing_categories_to_groupby( missing_index = total_index.categories.difference(total_index.values) missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)} missing_index.names = by - + print("generating missing", timer() - t1) + print(len(missing_index)) + t1 = timer() if len(missing_index) == 0: return {}, new_combined_cols # breakpoint() @@ -587,7 +591,8 @@ def add_missing_categories_to_groupby( empty_df = empty_df.astype("int64" if initial_dtypes is None else initial_dtypes) empty_df = empty_df.astype(missing_cats_dtype) missing_values = operator(empty_df.groupby(by, **kwargs)) - + print("getting fill value", timer() - t1) + t1 = timer() if is_udf_agg and not isinstance(total_index, pandas.MultiIndex): missing_values = missing_values.drop( columns=by, errors="ignore" @@ -604,23 +609,24 @@ def add_missing_categories_to_groupby( # If the aggregation has failed, the result would be empty. Assuming the # fill value to be `np.NaN` here (this may not always be correct!!!) fill_value = np.NaN if len(missing_values) == 0 else missing_values.iloc[0, 0] - missing_values = pandas.DataFrame(index=missing_index, columns=combined_cols).fillna( - fill_value - ) - + missing_values = pandas.DataFrame(fill_value, index=missing_index, columns=combined_cols) + print("generating missing values", timer() - t1) + t1 = timer() # restoring original categorical dtypes for the indices if isinstance(missing_values.index, pandas.MultiIndex): # MultiIndex.astype() only takes a single dtype, the only way to cast # individual levels to different dtypes is to convert MI to DF do the # casting then - missing_values.index = pandas.MultiIndex.from_frame( - missing_values.index.to_frame().astype( - {name: dtype for name, dtype in total_index.dtypes.items()} - ) - ) + pass + # missing_values.index = pandas.MultiIndex.from_frame( + # missing_values.index.to_frame().astype( + # {name: dtype for name, dtype in total_index.dtypes.items()} + # ) + # ) else: missing_values.index = missing_values.index.astype(total_index.dtype) - + print("casting to original dtype", timer() - t1) + t1 = timer() ### Then we decide to which missing categorical values should go to which partition if not kwargs["sort"]: # If the result is allowed to be unsorted, simply insert all the missing @@ -656,13 +662,14 @@ def add_missing_categories_to_groupby( # doesn't affect the result bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1]) old_bins_to_new[len(bins)] = offset - + # breakpoint() if len(bins) == 0: # insert values to the first non-empty partition return {old_bins_to_new.get(0, 0): missing_values}, new_combined_cols # we used the very first level of MultiIndex to build bins, meaning that we also have # to use values of the first index's level for 'digitize' + # breakpoint() lvl_zero = ( missing_values.index.levels[0] if isinstance(missing_values.index, pandas.MultiIndex) @@ -672,14 +679,23 @@ def add_missing_categories_to_groupby( part_idx = np.digitize(lvl_zero, bins, right=True) else: part_idx = np.searchsorted(bins, lvl_zero) - + print("binning", timer() - t1) + t1 = timer() ### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals ### to be inserted into this partition masks = {} - frame_idx = missing_values.index.to_frame() - for idx, values in lvl_zero.groupby(part_idx).items(): - masks[idx] = missing_values[frame_idx.iloc[:, 0].isin(values)] + # if isinstance(total_index, pandas.MultiIndex): + # breakpoint() + # frame_idx = missing_values.index.to_frame() + if isinstance(total_index, pandas.MultiIndex): + for idx, values in pandas.RangeIndex(len(lvl_zero)).groupby(part_idx).items(): + masks[idx] = missing_values[pandas.Index(missing_values.index.codes[0]).isin(values)] + else: + frame_idx = missing_values.index.to_frame() + for idx, values in lvl_zero.groupby(part_idx).items(): + masks[idx] = missing_values[frame_idx.iloc[:, 0].isin(values)] # Restore the original indexing by adding the amount of skipped missing partitions masks = {key + old_bins_to_new[key]: value for key, value in masks.items()} + print("generating masks", timer() - t1) return masks, new_combined_cols diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 662fa94c208..fd562bd3928 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3239,4 +3239,5 @@ def test_range_groupby_categories( md_res = func(md_df.groupby(by_cols, observed=observed, as_index=as_index)) pd_res = func(pd_df.groupby(by_cols, observed=observed, as_index=as_index)) + # breakpoint() df_equals(md_res, pd_res) From 4b835fa25900ee8953605c97bad0e862c21b1430 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 17 Jan 2024 18:04:30 +0100 Subject: [PATCH 12/21] FEAT-#5925: Enable grouping on categoricals with range-partitioning impl Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 2 ++ modin/core/dataframe/pandas/dataframe/utils.py | 3 --- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 2fdcffbc2c7..835e28507cd 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -36,6 +36,7 @@ ShuffleSortFunctions, lazy_metadata_decorator, add_missing_categories_to_groupby, + missing_cats_insert, ) from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, @@ -3837,6 +3838,7 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) + # no need aligning columns if there's only one row partition if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index a1de82ff8bc..ac85602ed27 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -684,9 +684,6 @@ def add_missing_categories_to_groupby( ### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals ### to be inserted into this partition masks = {} - # if isinstance(total_index, pandas.MultiIndex): - # breakpoint() - # frame_idx = missing_values.index.to_frame() if isinstance(total_index, pandas.MultiIndex): for idx, values in pandas.RangeIndex(len(lvl_zero)).groupby(part_idx).items(): masks[idx] = missing_values[pandas.Index(missing_values.index.codes[0]).isin(values)] From 2d003bbf8b3b5ac34b2ffc539d7f9debcb4354a1 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 17 Jan 2024 18:21:57 +0100 Subject: [PATCH 13/21] fix formatting Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 9 ++-- .../core/dataframe/pandas/dataframe/utils.py | 45 +++++++++++++------ .../storage_formats/pandas/query_compiler.py | 8 +--- modin/pandas/test/test_groupby.py | 1 + 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 835e28507cd..12028c4c5fe 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -18,8 +18,8 @@ for pandas storage format. """ import datetime -from timeit import default_timer as timer from collections import OrderedDict +from timeit import default_timer as timer from typing import TYPE_CHECKING, Callable, Dict, Hashable, List, Optional, Union import numpy as np @@ -34,9 +34,8 @@ from modin.core.dataframe.base.dataframe.utils import Axis, JoinType from modin.core.dataframe.pandas.dataframe.utils import ( ShuffleSortFunctions, - lazy_metadata_decorator, add_missing_categories_to_groupby, - missing_cats_insert, + lazy_metadata_decorator, ) from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, @@ -3838,7 +3837,7 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) - + # no need aligning columns if there's only one row partition if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column @@ -3915,7 +3914,7 @@ def apply_aligned(df, args, partition_idx): original_names = df.index.names # values = pandas.DataFrame(np.NaN, index=values.index, columns=df.columns) df = pandas.concat([df, values]) - + print("concating", timer() - t1) t1 = timer() if kwargs["sort"]: diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index ac85602ed27..8bc6440f9da 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -15,6 +15,7 @@ import abc from collections import namedtuple +from timeit import default_timer as timer from typing import TYPE_CHECKING, Callable, Optional, Union import numpy as np @@ -521,9 +522,16 @@ def run_f_on_minimally_updated_metadata(self, *args, **kwargs): return decorator -from timeit import default_timer as timer + def add_missing_categories_to_groupby( - dfs, by, operator, initial_columns, combined_cols, is_udf_agg, kwargs, initial_dtypes=None + dfs, + by, + operator, + initial_columns, + combined_cols, + is_udf_agg, + kwargs, + initial_dtypes=None, ): t1 = timer() kwargs["observed"] = False @@ -536,11 +544,13 @@ def add_missing_categories_to_groupby( # on multiple groupers or not total_index = indices[0].append(indices[1:]) if isinstance(total_index, pandas.MultiIndex): - if all(not isinstance(level, pandas.CategoricalIndex) for level in total_index.levels): + if all( + not isinstance(level, pandas.CategoricalIndex) + for level in total_index.levels + ): return {} missing_cats_dtype = { - name: level.dtype - if isinstance(level.dtype, pandas.CategoricalDtype) + name: level.dtype if isinstance(level.dtype, pandas.CategoricalDtype) # it's a bit confusing but we have to convert the remaining 'by' columns to categoricals # in order to compute a proper fill value later in the code else pandas.CategoricalDtype(level) @@ -550,7 +560,12 @@ def add_missing_categories_to_groupby( # carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers) # breakpoint() complete_index = pandas.MultiIndex.from_product( - [value.categories.astype(total_level.dtype) for total_level, value in zip(total_index.levels, missing_cats_dtype.values())], + [ + value.categories.astype(total_level.dtype) + for total_level, value in zip( + total_index.levels, missing_cats_dtype.values() + ) + ], names=by, ) missing_index = complete_index[~complete_index.isin(total_index)] @@ -558,7 +573,7 @@ def add_missing_categories_to_groupby( if not isinstance(total_index, pandas.CategoricalIndex): return {}, new_combined_cols # if we're grouping on a single grouper then we simply compute the difference - # between categorical values in the result and the values defined in categorical dtype + # between categorical values in the result and the values defined in categorical dtype missing_index = total_index.categories.difference(total_index.values) missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)} missing_index.names = by @@ -588,15 +603,15 @@ def add_missing_categories_to_groupby( # HACK: default 'object' dtype doesn't fit our needs, as most of the aggregations # fail on a non-numeric columns, ideally, we need dtypes of the original dataframe, # however, 'int64' also works fine here if the original schema is not available - empty_df = empty_df.astype("int64" if initial_dtypes is None else initial_dtypes) + empty_df = empty_df.astype( + "int64" if initial_dtypes is None else initial_dtypes + ) empty_df = empty_df.astype(missing_cats_dtype) missing_values = operator(empty_df.groupby(by, **kwargs)) print("getting fill value", timer() - t1) t1 = timer() if is_udf_agg and not isinstance(total_index, pandas.MultiIndex): - missing_values = missing_values.drop( - columns=by, errors="ignore" - ) + missing_values = missing_values.drop(columns=by, errors="ignore") new_combined_cols = pandas.concat( [ pandas.DataFrame(columns=combined_cols), @@ -609,7 +624,9 @@ def add_missing_categories_to_groupby( # If the aggregation has failed, the result would be empty. Assuming the # fill value to be `np.NaN` here (this may not always be correct!!!) fill_value = np.NaN if len(missing_values) == 0 else missing_values.iloc[0, 0] - missing_values = pandas.DataFrame(fill_value, index=missing_index, columns=combined_cols) + missing_values = pandas.DataFrame( + fill_value, index=missing_index, columns=combined_cols + ) print("generating missing values", timer() - t1) t1 = timer() # restoring original categorical dtypes for the indices @@ -686,7 +703,9 @@ def add_missing_categories_to_groupby( masks = {} if isinstance(total_index, pandas.MultiIndex): for idx, values in pandas.RangeIndex(len(lvl_zero)).groupby(part_idx).items(): - masks[idx] = missing_values[pandas.Index(missing_values.index.codes[0]).isin(values)] + masks[idx] = missing_values[ + pandas.Index(missing_values.index.codes[0]).isin(values) + ] else: frame_idx = missing_values.index.to_frame() for idx, values in lvl_zero.groupby(part_idx).items(): diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 7ba98593627..a929b8acfc3 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3816,7 +3816,7 @@ def _groupby_shuffle( if add_missing_cats and not groupby_kwargs.get("as_index", True): raise NotImplementedError( "Range-partitioning groupby is not implemented for grouping on categorical columns with " - + "the following set of parameters \{'as_index': False, 'observed': False\}. Change either 'as_index' " + + "the following set of parameters {'as_index': False, 'observed': False}. Change either 'as_index' " + "or 'observed' to True and try again. " + "https://github.com/modin-project/modin/issues/5926" ) @@ -3862,11 +3862,7 @@ def agg_func(grp, *args, **kwargs): result_qc = self.__constructor__(result) if not is_transform and not groupby_kwargs.get("as_index", True): - try: - return result_qc.reset_index(drop=True) - except: - breakpoint() - print("sas") + return result_qc.reset_index(drop=True) return result_qc diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index fd562bd3928..2c0ea392446 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3165,6 +3165,7 @@ def _apply_transform(df): return df.squeeze() return df.sum() + @pytest.mark.parametrize( "modify_config", [{RangePartitioningGroupby: True}], indirect=True ) From fa3fe0a49571d1c605185821311398ebd7a643fa Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 18 Jan 2024 16:15:12 +0100 Subject: [PATCH 14/21] test if all good Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 6 --- .../core/dataframe/pandas/dataframe/utils.py | 43 +++++++++++-------- modin/pandas/test/test_groupby.py | 15 +------ setup.cfg | 2 +- 4 files changed, 28 insertions(+), 38 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 12028c4c5fe..233ceb405c9 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3837,7 +3837,6 @@ def apply_func(df): # pragma: no cover key_columns=by, func=apply_func, ) - # no need aligning columns if there's only one row partition if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1: # FIXME: the current reshuffling implementation guarantees us that there's only one column @@ -3907,20 +3906,15 @@ def compute_aligned_columns(*dfs, initial_columns=None): def apply_aligned(df, args, partition_idx): combined_cols, mask = args - t1 = timer() if mask is not None and mask.get(partition_idx) is not None: values = mask[partition_idx] original_names = df.index.names # values = pandas.DataFrame(np.NaN, index=values.index, columns=df.columns) df = pandas.concat([df, values]) - - print("concating", timer() - t1) - t1 = timer() if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion df = df.sort_index(axis=0) - print("sorting", timer() - t1) df.index.names = original_names if combined_cols is not None: df = df.reindex(columns=combined_cols) diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 8bc6440f9da..79a2ddccd42 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -15,7 +15,6 @@ import abc from collections import namedtuple -from timeit import default_timer as timer from typing import TYPE_CHECKING, Callable, Optional, Union import numpy as np @@ -533,7 +532,24 @@ def add_missing_categories_to_groupby( kwargs, initial_dtypes=None, ): - t1 = timer() + """ + Generate missing categories. + + Parameters + ---------- + dfs : list of pandas.DataFrames + by : list of hashable + operator : callable + initial_columns : pandas.Index + combined_cols : pandas.Index + is_udf_agg : bool + kwargs : dict + initial_dtypes : pandas.Series, optional + + Returns + ------- + tuple[dict, pandas.Index] + """ kwargs["observed"] = False new_combined_cols = combined_cols @@ -558,7 +574,6 @@ def add_missing_categories_to_groupby( } # if we're grouping on multiple groupers, then the missing categorical values is a # carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers) - # breakpoint() complete_index = pandas.MultiIndex.from_product( [ value.categories.astype(total_level.dtype) @@ -577,12 +592,10 @@ def add_missing_categories_to_groupby( missing_index = total_index.categories.difference(total_index.values) missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)} missing_index.names = by - print("generating missing", timer() - t1) - print(len(missing_index)) - t1 = timer() + if len(missing_index) == 0: return {}, new_combined_cols - # breakpoint() + ### At this stage we want to get a fill_value for missing categorical values if is_udf_agg and isinstance(total_index, pandas.MultiIndex): # if grouping on multiple columns and aggregating with an UDF, then the @@ -608,8 +621,7 @@ def add_missing_categories_to_groupby( ) empty_df = empty_df.astype(missing_cats_dtype) missing_values = operator(empty_df.groupby(by, **kwargs)) - print("getting fill value", timer() - t1) - t1 = timer() + if is_udf_agg and not isinstance(total_index, pandas.MultiIndex): missing_values = missing_values.drop(columns=by, errors="ignore") new_combined_cols = pandas.concat( @@ -627,8 +639,7 @@ def add_missing_categories_to_groupby( missing_values = pandas.DataFrame( fill_value, index=missing_index, columns=combined_cols ) - print("generating missing values", timer() - t1) - t1 = timer() + # restoring original categorical dtypes for the indices if isinstance(missing_values.index, pandas.MultiIndex): # MultiIndex.astype() only takes a single dtype, the only way to cast @@ -642,8 +653,7 @@ def add_missing_categories_to_groupby( # ) else: missing_values.index = missing_values.index.astype(total_index.dtype) - print("casting to original dtype", timer() - t1) - t1 = timer() + ### Then we decide to which missing categorical values should go to which partition if not kwargs["sort"]: # If the result is allowed to be unsorted, simply insert all the missing @@ -679,14 +689,13 @@ def add_missing_categories_to_groupby( # doesn't affect the result bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1]) old_bins_to_new[len(bins)] = offset - # breakpoint() + if len(bins) == 0: # insert values to the first non-empty partition return {old_bins_to_new.get(0, 0): missing_values}, new_combined_cols # we used the very first level of MultiIndex to build bins, meaning that we also have # to use values of the first index's level for 'digitize' - # breakpoint() lvl_zero = ( missing_values.index.levels[0] if isinstance(missing_values.index, pandas.MultiIndex) @@ -696,8 +705,7 @@ def add_missing_categories_to_groupby( part_idx = np.digitize(lvl_zero, bins, right=True) else: part_idx = np.searchsorted(bins, lvl_zero) - print("binning", timer() - t1) - t1 = timer() + ### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals ### to be inserted into this partition masks = {} @@ -713,5 +721,4 @@ def add_missing_categories_to_groupby( # Restore the original indexing by adding the amount of skipped missing partitions masks = {key + old_bins_to_new[key]: value for key, value in masks.items()} - print("generating masks", timer() - t1) return masks, new_combined_cols diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 2c0ea392446..fb0956dbabf 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -106,10 +106,6 @@ ] -def df_equals_fillna(df1, df2, fill_value=0): - df_equals(df1.fillna(fill_value), df2.fillna(fill_value)) - - def modin_groupby_equals_pandas(modin_groupby, pandas_groupby): eval_general( modin_groupby, pandas_groupby, lambda grp: grp.indices, comparator=dict_equals @@ -456,7 +452,6 @@ def maybe_get_columns(df, by): lambda df: df.sem(), modin_df_almost_equals_pandas, ) - # breakpoint() eval_mean(modin_groupby, pandas_groupby, numeric_only=True) eval_any(modin_groupby, pandas_groupby) @@ -520,20 +515,17 @@ def maybe_get_columns(df, by): # because of this bug: https://github.com/pandas-dev/pandas/issues/36698 # Modin correctly processes the result, that's why `check_exception_type=None` in some cases is_pandas_bug_case = not as_index and col1_category and isinstance(func, dict) - # breakpoint() eval_general( modin_groupby, pandas_groupby, lambda grp: grp.agg(func), check_exception_type=None if is_pandas_bug_case else True, - comparator=df_equals_fillna, ) eval_general( modin_groupby, pandas_groupby, lambda grp: grp.aggregate(func), check_exception_type=None if is_pandas_bug_case else True, - comparator=df_equals_fillna, ) eval_general(modin_groupby, pandas_groupby, lambda df: df.last()) @@ -626,7 +618,6 @@ def maybe_get_columns(df, by): if isinstance(by, list) else ["col3", "col4"] ) - # breakpoint() eval___getitem__(modin_groupby, pandas_groupby, non_by_cols) # When GroupBy.__getitem__ meets an intersection of the selection and 'by' columns # it throws a warning with the suggested workaround. The following code tests @@ -1252,8 +1243,8 @@ def eval_cummin(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only ) -def eval_apply(modin_groupby, pandas_groupby, func, comparator=df_equals): - comparator(modin_groupby.apply(func), pandas_groupby.apply(func)) +def eval_apply(modin_groupby, pandas_groupby, func): + df_equals(modin_groupby.apply(func), pandas_groupby.apply(func)) def eval_dtypes(modin_groupby, pandas_groupby): @@ -2988,7 +2979,6 @@ def test_groupby_apply_series_result(modify_config): np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)] ) df["group"] = [1, 1, 2, 2, 3] - # breakpoint() # res = df.groupby('group').apply(lambda x: x.name+2) eval_general( df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2) @@ -3240,5 +3230,4 @@ def test_range_groupby_categories( md_res = func(md_df.groupby(by_cols, observed=observed, as_index=as_index)) pd_res = func(pd_df.groupby(by_cols, observed=observed, as_index=as_index)) - # breakpoint() df_equals(md_res, pd_res) diff --git a/setup.cfg b/setup.cfg index 38cc37bc13d..3acc554836f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -12,7 +12,7 @@ tag_prefix = parentdir_prefix = modin- [tool:pytest] -addopts = +addopts = --cov-config=setup.cfg --cov=modin --cov-append --cov-report= -m "not exclude_by_default" xfail_strict=true markers = exclude_in_sanity From f5a424f68f8c7c0241b0ecfae5377cc7a7516fcb Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 18 Jan 2024 16:20:31 +0100 Subject: [PATCH 15/21] fix styling Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/algebra/default2pandas/groupby.py | 2 ++ modin/core/dataframe/pandas/dataframe/dataframe.py | 2 -- modin/core/dataframe/pandas/dataframe/utils.py | 1 - modin/core/dataframe/pandas/partitioning/partition_manager.py | 1 + modin/core/storage_formats/pandas/query_compiler.py | 3 +-- modin/pandas/test/test_groupby.py | 4 ++-- 6 files changed, 6 insertions(+), 7 deletions(-) diff --git a/modin/core/dataframe/algebra/default2pandas/groupby.py b/modin/core/dataframe/algebra/default2pandas/groupby.py index a1d8a603b1a..4c8271df555 100644 --- a/modin/core/dataframe/algebra/default2pandas/groupby.py +++ b/modin/core/dataframe/algebra/default2pandas/groupby.py @@ -56,6 +56,8 @@ def is_transformation_kernel(agg_func: Any) -> bool: bool """ return hashable(agg_func) and agg_func in transformation_kernels.union( + # these methods are also producing transpose-like result in a sense we understand it + # (they're non-aggregative functions), however are missing in the pandas dictionary {"nth", "head", "tail"} ) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 233ceb405c9..9bb0d24956c 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3910,7 +3910,6 @@ def apply_aligned(df, args, partition_idx): values = mask[partition_idx] original_names = df.index.names - # values = pandas.DataFrame(np.NaN, index=values.index, columns=df.columns) df = pandas.concat([df, values]) if kwargs["sort"]: # TODO: write search-sorted insertion or sort the result after insertion @@ -3924,7 +3923,6 @@ def apply_aligned(df, args, partition_idx): new_partitions = self._partition_mgr_cls.lazy_map_partitions( result._partitions, apply_aligned, - # lambda df, columns: df.reindex(columns=columns), func_args=(aligned_columns._data,), enumerate_partitions=True, ) diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 79a2ddccd42..a7461a0daa4 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -338,7 +338,6 @@ def split_partitions_using_pivots_for_sort( tuple[pandas.DataFrame] A tuple of the splits from this partition. """ - # breakpoint() if len(columns_info) == 0: # We can return the dataframe with zero changes if there were no pivots passed return (df,) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index ce7348d9da9..e9494aeec10 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -627,6 +627,7 @@ def lazy_map_partitions( Positional arguments for the 'map_func'. func_kwargs : dict, optional Keyword arguments for the 'map_func'. + enumerate_partitions : bool, default: False Returns ------- diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index a929b8acfc3..41aad099efa 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -28,6 +28,7 @@ import numpy as np import pandas from pandas._libs import lib +from pandas.api.extensions import no_default from pandas.api.types import is_scalar from pandas.core.apply import reconstruct_func from pandas.core.common import is_bool_indexer @@ -3795,9 +3796,7 @@ def _groupby_shuffle( "Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. " + "https://github.com/modin-project/modin/issues/5926" ) - from pandas.api.extensions import no_default - # breakpoint() # This check materializes dtypes for 'by' columns if not is_transform and ( not groupby_kwargs.get("observed", False) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index fb0956dbabf..adfe6cd5879 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -453,7 +453,6 @@ def maybe_get_columns(df, by): modin_df_almost_equals_pandas, ) eval_mean(modin_groupby, pandas_groupby, numeric_only=True) - eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.idxmax()) @@ -515,6 +514,7 @@ def maybe_get_columns(df, by): # because of this bug: https://github.com/pandas-dev/pandas/issues/36698 # Modin correctly processes the result, that's why `check_exception_type=None` in some cases is_pandas_bug_case = not as_index and col1_category and isinstance(func, dict) + eval_general( modin_groupby, pandas_groupby, @@ -1414,7 +1414,6 @@ def test(grp): return test - md_grp[item].agg(["mean"]) eval_general( md_grp, pd_grp, @@ -2979,6 +2978,7 @@ def test_groupby_apply_series_result(modify_config): np.random.randint(5, 10, size=5), index=[f"s{i+1}" for i in range(5)] ) df["group"] = [1, 1, 2, 2, 3] + # res = df.groupby('group').apply(lambda x: x.name+2) eval_general( df, df._to_pandas(), lambda df: df.groupby("group").apply(lambda x: x.name + 2) From 1e7edbfb0d81405bdd1a890a85f7b7394147da87 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 18 Jan 2024 16:23:17 +0100 Subject: [PATCH 16/21] add missing docs Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 9bb0d24956c..ae1cf3ad13d 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3784,6 +3784,7 @@ def groupby( Whether to manually align columns between all the resulted row partitions. This flag is helpful when dealing with UDFs as they can change the partition's shape and labeling unpredictably, resulting in an invalid dataframe. + add_missing_cats : bool, default: False **kwargs : dict Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument). From 40d637f10ea5c101479ef052f3c1140ef0106b58 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 19 Jan 2024 20:38:23 +0100 Subject: [PATCH 17/21] fix docstrings Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 1 + .../core/dataframe/pandas/dataframe/utils.py | 37 +++++++++++-------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index ae1cf3ad13d..68cbed09d2b 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3785,6 +3785,7 @@ def groupby( This flag is helpful when dealing with UDFs as they can change the partition's shape and labeling unpredictably, resulting in an invalid dataframe. add_missing_cats : bool, default: False + Whether to add missing categories from `by` columns to the result. **kwargs : dict Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument). diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index a7461a0daa4..471fa2d3ea1 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -532,22 +532,39 @@ def add_missing_categories_to_groupby( initial_dtypes=None, ): """ - Generate missing categories. + Generate values for missing categorical values to be inserted into groupby result. + + This function is used to emulate behavior of ``groupby(observed=False)`` parameter. + The function takes groupby result that was computed using ``groupby(observed=True)`` + and computes results for categorical values that are not presented in `dfs`. Parameters ---------- dfs : list of pandas.DataFrames + Row partitions containing groupby results. by : list of hashable + Column labels that were used to perform groupby. operator : callable + Aggregation function that was used during groupby. initial_columns : pandas.Index + Column labels of the original dataframe. combined_cols : pandas.Index + Column labels of the groupby result. is_udf_agg : bool + Whether ``operator`` is a UDF. kwargs : dict + Parameters that were passed to ``groupby(by, **kwargs)``. initial_dtypes : pandas.Series, optional + Dtypes of the original dataframe. If not specified, assume it's ``int64``. Returns ------- - tuple[dict, pandas.Index] + masks : dict[int, pandas.DataFrame] + Mapping between partition idx and a dataframe with results for missing categorical values + to insert to this partition. + new_combined_cols : pandas.Index + New column labels of the groupby result. If ``is_udf_agg is True``, then ``operator`` + may change the resulted columns. """ kwargs["observed"] = False new_combined_cols = combined_cols @@ -563,7 +580,7 @@ def add_missing_categories_to_groupby( not isinstance(level, pandas.CategoricalIndex) for level in total_index.levels ): - return {} + return {}, new_combined_cols missing_cats_dtype = { name: level.dtype if isinstance(level.dtype, pandas.CategoricalDtype) # it's a bit confusing but we have to convert the remaining 'by' columns to categoricals @@ -639,18 +656,8 @@ def add_missing_categories_to_groupby( fill_value, index=missing_index, columns=combined_cols ) - # restoring original categorical dtypes for the indices - if isinstance(missing_values.index, pandas.MultiIndex): - # MultiIndex.astype() only takes a single dtype, the only way to cast - # individual levels to different dtypes is to convert MI to DF do the - # casting then - pass - # missing_values.index = pandas.MultiIndex.from_frame( - # missing_values.index.to_frame().astype( - # {name: dtype for name, dtype in total_index.dtypes.items()} - # ) - # ) - else: + # restoring original categorical dtypes for the indices (MultiIndex already have proper dtypes) + if not isinstance(missing_values.index, pandas.MultiIndex): missing_values.index = missing_values.index.astype(total_index.dtype) ### Then we decide to which missing categorical values should go to which partition From 9b5fd71d8ec2572517bf39deb9388cfa2b4754f8 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 22 Jan 2024 19:53:44 +0100 Subject: [PATCH 18/21] add a link to the bug Signed-off-by: Dmitry Chigarev --- modin/pandas/test/test_groupby.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index adfe6cd5879..4e2a7c536c4 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -3214,7 +3214,9 @@ def _apply_transform(df): def test_range_groupby_categories( observed, func, by_cols, cat_cols, exclude_values, as_index, modify_config ): - # HACK: there's a bug + # HACK: there's a bug in range-partitioning impl that can be triggered + # here on certain seeds, manually setting the seed so it won't show up + # https://github.com/modin-project/modin/issues/6875 np.random.seed(0) data = { "a": ["a", "b", "c", "d", "e", "b", "g", "a"] * 32, From e0d3919bb54f605adf9a05f5dc5ec8a85c8694d1 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 29 Jan 2024 14:38:19 +0100 Subject: [PATCH 19/21] apply review suggestions1 Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 11 ++++------- modin/core/dataframe/pandas/dataframe/utils.py | 6 +++--- modin/core/dataframe/pandas/utils.py | 18 +++++++++++++++++- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 68cbed09d2b..b4b37712ba4 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -18,8 +18,6 @@ for pandas storage format. """ import datetime -from collections import OrderedDict -from timeit import default_timer as timer from typing import TYPE_CHECKING, Callable, Dict, Hashable, List, Optional, Union import numpy as np @@ -3815,7 +3813,6 @@ def groupby( if not isinstance(by, list): by = [by] - kwargs = kwargs.copy() kwargs["observed"] = True skip_on_aligning_flag = "__skip_me_on_aligning__" @@ -3858,9 +3855,7 @@ def apply_func(df): # pragma: no cover original_dtypes = self.dtypes if self.has_materialized_dtypes else None def compute_aligned_columns(*dfs, initial_columns=None): - """Take row partitions, filter empty ones, and return joined columns for them.""" - combined_cols = None - masks = None + """Take row partitions, filter empty ones, and return joined columns for them.""" if align_result_columns: valid_dfs = [ df @@ -3880,6 +3875,7 @@ def compute_aligned_columns(*dfs, initial_columns=None): else: combined_cols = dfs[0].columns + masks = None if add_missing_cats: masks, combined_cols = add_missing_categories_to_groupby( dfs, @@ -3912,9 +3908,10 @@ def apply_aligned(df, args, partition_idx): values = mask[partition_idx] original_names = df.index.names + # TODO: inserting 'values' based on 'searchsorted' result might be more efficient + # in cases of small amount of 'values' df = pandas.concat([df, values]) if kwargs["sort"]: - # TODO: write search-sorted insertion or sort the result after insertion df = df.sort_index(axis=0) df.index.names = original_names if combined_cols is not None: diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 471fa2d3ea1..cd4c2c5f244 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -534,8 +534,8 @@ def add_missing_categories_to_groupby( """ Generate values for missing categorical values to be inserted into groupby result. - This function is used to emulate behavior of ``groupby(observed=False)`` parameter. - The function takes groupby result that was computed using ``groupby(observed=True)`` + This function is used to emulate behavior of ``groupby(observed=False)`` parameter, + it takes groupby result that was computed using ``groupby(observed=True)`` and computes results for categorical values that are not presented in `dfs`. Parameters @@ -649,7 +649,7 @@ def add_missing_categories_to_groupby( join="outer", ).columns else: - # If the aggregation has failed, the result would be empty. Assuming the + # HACK: If the aggregation has failed, the result would be empty. Assuming the # fill value to be `np.NaN` here (this may not always be correct!!!) fill_value = np.NaN if len(missing_values) == 0 else missing_values.iloc[0, 0] missing_values = pandas.DataFrame( diff --git a/modin/core/dataframe/pandas/utils.py b/modin/core/dataframe/pandas/utils.py index 019d8470727..01cc2fc6c10 100644 --- a/modin/core/dataframe/pandas/utils.py +++ b/modin/core/dataframe/pandas/utils.py @@ -17,6 +17,8 @@ import pandas from pandas.api.types import union_categoricals +from modin.error_message import ErrorMessage + def concatenate(dfs): """ @@ -38,7 +40,21 @@ def concatenate(dfs): assert df.columns.equals(dfs[0].columns) for i in dfs[0].columns.get_indexer_for(dfs[0].select_dtypes("category").columns): columns = [df.iloc[:, i] for df in dfs] - if not all(isinstance(col.dtype, pandas.CategoricalDtype) for col in columns): + all_categorical_parts_are_empty = None + has_non_categorical_parts = False + for col in columns: + if isinstance(col.dtype, pandas.CategoricalDtype): + if all_categorical_parts_are_empty is None: + all_categorical_parts_are_empty = len(col) == 0 + continue + all_categorical_parts_are_empty &= len(col) == 0 + else: + has_non_categorical_parts = True + # 'union_categoricals' raises an error if some of the passed values don't have categorical dtype, + # if it happens, we only want to continue when all parts with categorical dtypes are actually empty. + # This can happen if there were an aggregation that discards categorical dtypes and that aggregation + # doesn't properly do so for empty partitions + if has_non_categorical_parts and all_categorical_parts_are_empty: continue union = union_categoricals(columns) for df in dfs: From 7bda8ea9bb51688130ebf8dd74aba2b79eb6ab72 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 29 Jan 2024 15:10:35 +0100 Subject: [PATCH 20/21] add transform tests Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 2 +- modin/core/dataframe/pandas/dataframe/utils.py | 11 +++++++---- modin/core/dataframe/pandas/utils.py | 4 +--- modin/core/storage_formats/pandas/query_compiler.py | 7 +++---- modin/pandas/test/test_groupby.py | 10 +++++++--- 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index b4b37712ba4..6ed18805336 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3855,7 +3855,7 @@ def apply_func(df): # pragma: no cover original_dtypes = self.dtypes if self.has_materialized_dtypes else None def compute_aligned_columns(*dfs, initial_columns=None): - """Take row partitions, filter empty ones, and return joined columns for them.""" + """Take row partitions, filter empty ones, and return joined columns for them.""" if align_result_columns: valid_dfs = [ df diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index cd4c2c5f244..08bbd6894de 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -582,10 +582,13 @@ def add_missing_categories_to_groupby( ): return {}, new_combined_cols missing_cats_dtype = { - name: level.dtype if isinstance(level.dtype, pandas.CategoricalDtype) - # it's a bit confusing but we have to convert the remaining 'by' columns to categoricals - # in order to compute a proper fill value later in the code - else pandas.CategoricalDtype(level) + name: ( + level.dtype + if isinstance(level.dtype, pandas.CategoricalDtype) + # it's a bit confusing but we have to convert the remaining 'by' columns to categoricals + # in order to compute a proper fill value later in the code + else pandas.CategoricalDtype(level) + ) for level, name in zip(total_index.levels, total_index.names) } # if we're grouping on multiple groupers, then the missing categorical values is a diff --git a/modin/core/dataframe/pandas/utils.py b/modin/core/dataframe/pandas/utils.py index 01cc2fc6c10..98304ba89c3 100644 --- a/modin/core/dataframe/pandas/utils.py +++ b/modin/core/dataframe/pandas/utils.py @@ -17,8 +17,6 @@ import pandas from pandas.api.types import union_categoricals -from modin.error_message import ErrorMessage - def concatenate(dfs): """ @@ -49,7 +47,7 @@ def concatenate(dfs): continue all_categorical_parts_are_empty &= len(col) == 0 else: - has_non_categorical_parts = True + has_non_categorical_parts = True # 'union_categoricals' raises an error if some of the passed values don't have categorical dtype, # if it happens, we only want to continue when all parts with categorical dtypes are actually empty. # This can happen if there were an aggregation that discards categorical dtypes and that aggregation diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 41aad099efa..56bfbcdd5bd 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -28,7 +28,6 @@ import numpy as np import pandas from pandas._libs import lib -from pandas.api.extensions import no_default from pandas.api.types import is_scalar from pandas.core.apply import reconstruct_func from pandas.core.common import is_bool_indexer @@ -3798,9 +3797,9 @@ def _groupby_shuffle( ) # This check materializes dtypes for 'by' columns - if not is_transform and ( - not groupby_kwargs.get("observed", False) - or groupby_kwargs.get("observed", False) is no_default + if not is_transform and groupby_kwargs.get("observed", False) in ( + False, + lib.no_default, ): if isinstance(self._modin_frame._dtypes, ModinDtypes): by_dtypes = self._modin_frame._dtypes.lazy_get(by).get() diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 4e2a7c536c4..c28ccad1832 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2963,9 +2963,13 @@ def test_reshuffling_groupby_on_strings(modify_config): modin_df = modin_df.astype({"col1": "string"}) pandas_df = pandas_df.astype({"col1": "string"}) - eval_general( - modin_df.groupby("col1"), pandas_df.groupby("col1"), lambda grp: grp.mean() - ) + md_grp = modin_df.groupby("col1") + pd_grp = pandas_df.groupby("col1") + + eval_general(md_grp, pd_grp, lambda grp: grp.mean()) + eval_general(md_grp, pd_grp, lambda grp: grp.nth()) + eval_general(md_grp, pd_grp, lambda grp: grp.head(10)) + eval_general(md_grp, pd_grp, lambda grp: grp.tail(10)) @pytest.mark.parametrize( From 1aba0fa151f2d1c26c5542323bad6f3753884c33 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 29 Jan 2024 15:19:52 +0100 Subject: [PATCH 21/21] revert undesired changes Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/partitioning/partition_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index e9494aeec10..f7b3899550c 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -612,7 +612,12 @@ def map_partitions( @classmethod @wait_computations_if_benchmark_mode def lazy_map_partitions( - cls, partitions, map_func, func_args=None, enumerate_partitions=False + cls, + partitions, + map_func, + func_args=None, + func_kwargs=None, + enumerate_partitions=False, ): """ Apply `map_func` to every partition in `partitions` *lazily*. @@ -641,6 +646,7 @@ def lazy_map_partitions( part.add_to_apply_calls( preprocessed_map_func, *(tuple() if func_args is None else func_args), + **func_kwargs if func_kwargs is not None else {}, **({"partition_idx": i} if enumerate_partitions else {}), ) for part in row