Skip to content

Commit

Permalink
FEAT-#5925: Enable grouping on categoricals with range-partitioning i…
Browse files Browse the repository at this point in the history
…mpl (#6862)

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev authored Jan 29, 2024
1 parent fb4a19f commit 46dc0a5
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 43 deletions.
6 changes: 5 additions & 1 deletion modin/core/dataframe/algebra/default2pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ def is_transformation_kernel(agg_func: Any) -> bool:
-------
bool
"""
return hashable(agg_func) and agg_func in transformation_kernels
return hashable(agg_func) and agg_func in transformation_kernels.union(
# these methods are also producing transpose-like result in a sense we understand it
# (they're non-aggregative functions), however are missing in the pandas dictionary
{"nth", "head", "tail"}
)

@classmethod
def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01
Expand Down
88 changes: 67 additions & 21 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from modin.core.dataframe.base.dataframe.utils import Axis, JoinType
from modin.core.dataframe.pandas.dataframe.utils import (
ShuffleSortFunctions,
add_missing_categories_to_groupby,
lazy_metadata_decorator,
)
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -3758,7 +3759,8 @@ def groupby(
by: Union[str, List[str]],
operator: Callable,
result_schema: Optional[Dict[Hashable, type]] = None,
align_result_columns=False,
align_result_columns: bool = False,
add_missing_cats: bool = False,
**kwargs: dict,
) -> "PandasDataframe":
"""
Expand All @@ -3780,6 +3782,8 @@ def groupby(
Whether to manually align columns between all the resulted row partitions.
This flag is helpful when dealing with UDFs as they can change the partition's shape
and labeling unpredictably, resulting in an invalid dataframe.
add_missing_cats : bool, default: False
Whether to add missing categories from `by` columns to the result.
**kwargs : dict
Additional arguments to pass to the ``df.groupby`` method (besides the 'by' argument).
Expand Down Expand Up @@ -3809,6 +3813,7 @@ def groupby(
if not isinstance(by, list):
by = [by]

kwargs["observed"] = True
skip_on_aligning_flag = "__skip_me_on_aligning__"

def apply_func(df): # pragma: no cover
Expand All @@ -3831,9 +3836,8 @@ def apply_func(df): # pragma: no cover
key_columns=by,
func=apply_func,
)

# no need aligning columns if there's only one row partition
if align_result_columns and result._partitions.shape[0] > 1:
if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1:
# FIXME: the current reshuffling implementation guarantees us that there's only one column
# partition in the result, so we should never hit this exception for now, however
# in the future, we might want to make this implementation more broader
Expand All @@ -3847,37 +3851,79 @@ def apply_func(df): # pragma: no cover
# it gathers all the dataframes in a single ray-kernel.
# 2. The second one works slower, but only gathers light pandas.Index objects,
# so there should be less stress on the network.
if not IsRayCluster.get():
if add_missing_cats or not IsRayCluster.get():
original_dtypes = self.dtypes if self.has_materialized_dtypes else None

def compute_aligned_columns(*dfs):
def compute_aligned_columns(*dfs, initial_columns=None):
"""Take row partitions, filter empty ones, and return joined columns for them."""
valid_dfs = [
df
for df in dfs
if not df.attrs.get(skip_on_aligning_flag, False)
]
if len(valid_dfs) == 0 and len(dfs) != 0:
valid_dfs = dfs

# Using '.concat()' on empty-slices instead of 'Index.join()'
# in order to get identical behavior to pandas when it joins
# results of different groups
return pandas.concat(
[df.iloc[:0] for df in valid_dfs], axis=0, join="outer"
).columns
if align_result_columns:
valid_dfs = [
df
for df in dfs
if not df.attrs.get(skip_on_aligning_flag, False)
]

if len(valid_dfs) == 0 and len(dfs) != 0:
valid_dfs = dfs

# Using '.concat()' on empty-slices instead of 'Index.join()'
# in order to get identical behavior to pandas when it joins
# results of different groups
combined_cols = pandas.concat(
[df.iloc[:0] for df in valid_dfs], axis=0, join="outer"
).columns
else:
combined_cols = dfs[0].columns

masks = None
if add_missing_cats:
masks, combined_cols = add_missing_categories_to_groupby(
dfs,
by,
operator,
initial_columns,
combined_cols,
is_udf_agg=align_result_columns,
kwargs=kwargs.copy(),
initial_dtypes=original_dtypes,
)
return (
(combined_cols, masks)
if align_result_columns
else (None, masks)
)

# Passing all partitions to the 'compute_aligned_columns' kernel to get
# aligned columns
parts = result._partitions.flatten()
aligned_columns = parts[0].apply(
compute_aligned_columns, *[part._data for part in parts[1:]]
compute_aligned_columns,
*[part._data for part in parts[1:]],
initial_columns=self.columns,
)

def apply_aligned(df, args, partition_idx):
combined_cols, mask = args
if mask is not None and mask.get(partition_idx) is not None:
values = mask[partition_idx]

original_names = df.index.names
# TODO: inserting 'values' based on 'searchsorted' result might be more efficient
# in cases of small amount of 'values'
df = pandas.concat([df, values])
if kwargs["sort"]:
df = df.sort_index(axis=0)
df.index.names = original_names
if combined_cols is not None:
df = df.reindex(columns=combined_cols)
return df

# Lazily applying aligned columns to partitions
new_partitions = self._partition_mgr_cls.lazy_map_partitions(
result._partitions,
lambda df, columns: df.reindex(columns=columns),
apply_aligned,
func_args=(aligned_columns._data,),
enumerate_partitions=True,
)
else:

Expand Down
212 changes: 212 additions & 0 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,3 +519,215 @@ def run_f_on_minimally_updated_metadata(self, *args, **kwargs):
return run_f_on_minimally_updated_metadata

return decorator


def add_missing_categories_to_groupby(
dfs,
by,
operator,
initial_columns,
combined_cols,
is_udf_agg,
kwargs,
initial_dtypes=None,
):
"""
Generate values for missing categorical values to be inserted into groupby result.
This function is used to emulate behavior of ``groupby(observed=False)`` parameter,
it takes groupby result that was computed using ``groupby(observed=True)``
and computes results for categorical values that are not presented in `dfs`.
Parameters
----------
dfs : list of pandas.DataFrames
Row partitions containing groupby results.
by : list of hashable
Column labels that were used to perform groupby.
operator : callable
Aggregation function that was used during groupby.
initial_columns : pandas.Index
Column labels of the original dataframe.
combined_cols : pandas.Index
Column labels of the groupby result.
is_udf_agg : bool
Whether ``operator`` is a UDF.
kwargs : dict
Parameters that were passed to ``groupby(by, **kwargs)``.
initial_dtypes : pandas.Series, optional
Dtypes of the original dataframe. If not specified, assume it's ``int64``.
Returns
-------
masks : dict[int, pandas.DataFrame]
Mapping between partition idx and a dataframe with results for missing categorical values
to insert to this partition.
new_combined_cols : pandas.Index
New column labels of the groupby result. If ``is_udf_agg is True``, then ``operator``
may change the resulted columns.
"""
kwargs["observed"] = False
new_combined_cols = combined_cols

### At first we need to compute missing categorical values
indices = [df.index for df in dfs]
# total_index contains all categorical values that resided in the result,
# missing values are computed differently depending on whether we're grouping
# on multiple groupers or not
total_index = indices[0].append(indices[1:])
if isinstance(total_index, pandas.MultiIndex):
if all(
not isinstance(level, pandas.CategoricalIndex)
for level in total_index.levels
):
return {}, new_combined_cols
missing_cats_dtype = {
name: (
level.dtype
if isinstance(level.dtype, pandas.CategoricalDtype)
# it's a bit confusing but we have to convert the remaining 'by' columns to categoricals
# in order to compute a proper fill value later in the code
else pandas.CategoricalDtype(level)
)
for level, name in zip(total_index.levels, total_index.names)
}
# if we're grouping on multiple groupers, then the missing categorical values is a
# carthesian product of (actual_missing_categorical_values X all_values_of_another_groupers)
complete_index = pandas.MultiIndex.from_product(
[
value.categories.astype(total_level.dtype)
for total_level, value in zip(
total_index.levels, missing_cats_dtype.values()
)
],
names=by,
)
missing_index = complete_index[~complete_index.isin(total_index)]
else:
if not isinstance(total_index, pandas.CategoricalIndex):
return {}, new_combined_cols
# if we're grouping on a single grouper then we simply compute the difference
# between categorical values in the result and the values defined in categorical dtype
missing_index = total_index.categories.difference(total_index.values)
missing_cats_dtype = {by[0]: pandas.CategoricalDtype(missing_index)}
missing_index.names = by

if len(missing_index) == 0:
return {}, new_combined_cols

### At this stage we want to get a fill_value for missing categorical values
if is_udf_agg and isinstance(total_index, pandas.MultiIndex):
# if grouping on multiple columns and aggregating with an UDF, then the
# fill value is always `np.NaN`
missing_values = pandas.DataFrame({0: [np.NaN]})
else:
# In case of a udf aggregation we're forced to run the operator against each
# missing category, as in theory it can return different results for each
# empty group. In other cases it's enough to run the operator against a single
# missing categorical and then broadcast the fill value to each missing value
if not is_udf_agg:
missing_cats_dtype = {
key: pandas.CategoricalDtype(value.categories[:1])
for key, value in missing_cats_dtype.items()
}

empty_df = pandas.DataFrame(columns=initial_columns)
# HACK: default 'object' dtype doesn't fit our needs, as most of the aggregations
# fail on a non-numeric columns, ideally, we need dtypes of the original dataframe,
# however, 'int64' also works fine here if the original schema is not available
empty_df = empty_df.astype(
"int64" if initial_dtypes is None else initial_dtypes
)
empty_df = empty_df.astype(missing_cats_dtype)
missing_values = operator(empty_df.groupby(by, **kwargs))

if is_udf_agg and not isinstance(total_index, pandas.MultiIndex):
missing_values = missing_values.drop(columns=by, errors="ignore")
new_combined_cols = pandas.concat(
[
pandas.DataFrame(columns=combined_cols),
missing_values.iloc[:0],
],
axis=0,
join="outer",
).columns
else:
# HACK: If the aggregation has failed, the result would be empty. Assuming the
# fill value to be `np.NaN` here (this may not always be correct!!!)
fill_value = np.NaN if len(missing_values) == 0 else missing_values.iloc[0, 0]
missing_values = pandas.DataFrame(
fill_value, index=missing_index, columns=combined_cols
)

# restoring original categorical dtypes for the indices (MultiIndex already have proper dtypes)
if not isinstance(missing_values.index, pandas.MultiIndex):
missing_values.index = missing_values.index.astype(total_index.dtype)

### Then we decide to which missing categorical values should go to which partition
if not kwargs["sort"]:
# If the result is allowed to be unsorted, simply insert all the missing
# categories to the last partition
mask = {len(indices) - 1: missing_values}
return mask, new_combined_cols

# If the result has to be sorted, we have to assign missing categoricals to proper partitions.
# For that purpose we define bins with corner values of each partition and then using either
# np.digitize or np.searchsorted find correct bins for each missing categorical value.
# Example: part0-> [0, 1, 2]; part1-> [3, 4, 10, 12]; part2-> [15, 17, 20, 100]
# bins -> [2, 12] # took last values of each partition excluding the last partition
# (every value that's matching 'x > part[-2][-1]' should go to the
# last partition, meaning that including the last value of the last
# partitions doesn't make sense)
# missing_cats -> [-2, 5, 6, 14, 21, 120]
# np.digitize(missing_cats, bins) -> [ 0, 1, 1, 2, 2, 2]
# ^-- mapping between values and partition idx to insert
bins = []
old_bins_to_new = {}
offset = 0
# building bins by taking last values of each partition excluding the last partition
for idx in indices[:-1]:
if len(idx) == 0:
# if a partition is empty, we can't use its values to define a bin, thus we simply
# skip it and remember the number of skipped partitions as an 'offset'
offset += 1
continue
# remember the number of skipped partitions before this bin, in order to restore original
# indexing at the end
old_bins_to_new[len(bins)] = offset
# for MultiIndices we always use the very first level for bins as using multiple levels
# doesn't affect the result
bins.append(idx[-1][0] if isinstance(idx, pandas.MultiIndex) else idx[-1])
old_bins_to_new[len(bins)] = offset

if len(bins) == 0:
# insert values to the first non-empty partition
return {old_bins_to_new.get(0, 0): missing_values}, new_combined_cols

# we used the very first level of MultiIndex to build bins, meaning that we also have
# to use values of the first index's level for 'digitize'
lvl_zero = (
missing_values.index.levels[0]
if isinstance(missing_values.index, pandas.MultiIndex)
else missing_values.index
)
if pandas.api.types.is_any_real_numeric_dtype(lvl_zero):
part_idx = np.digitize(lvl_zero, bins, right=True)
else:
part_idx = np.searchsorted(bins, lvl_zero)

### In the end we build a dictionary mapping partition index to a dataframe with missing categoricals
### to be inserted into this partition
masks = {}
if isinstance(total_index, pandas.MultiIndex):
for idx, values in pandas.RangeIndex(len(lvl_zero)).groupby(part_idx).items():
masks[idx] = missing_values[
pandas.Index(missing_values.index.codes[0]).isin(values)
]
else:
frame_idx = missing_values.index.to_frame()
for idx, values in lvl_zero.groupby(part_idx).items():
masks[idx] = missing_values[frame_idx.iloc[:, 0].isin(values)]

# Restore the original indexing by adding the amount of skipped missing partitions
masks = {key + old_bins_to_new[key]: value for key, value in masks.items()}
return masks, new_combined_cols
Loading

0 comments on commit 46dc0a5

Please sign in to comment.