Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#5925: Enable grouping on categoricals with range-partitioning impl #6862

Merged
merged 22 commits into from
Jan 29, 2024

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Jan 17, 2024

What do these changes do?

This PR allows grouping on categorical columns using range-partitioning implementation. The main challenge with this was to support proper behavior of the default value of groupby(observed=False) parameter. observed=False as a default value for groupby is deprecated, however it will only be replaced in pandas 3.0, so we should have an implementation for that.

What's observed=False?
This parameter includes missing categories into the result index, the missing values are then filled with the default value for this particular aggregation. Consider this example for more details:

An example of how `observed=False` works
# we have a categorical 'by_col', containing values {1, 2, 3}
>>> df
  by_col  b  c
0      1  3  6
1      2  4  5
2      2  5  4
3      3  6  3
>>> df.dtypes
by_col    category
b            int64
c            int64
# then if we make the following row-slice, the 'by_col' is now containing values {1, 2}
>>> df.iloc[:3]
  by_col  b  c
0      1  3  6
1      2  4  5
2      2  5  4
# however, the categorical dtype of the column, still contains {1, 2, 3}, meaning, that for this particular dataframe
# {3} is now considered a missing categorical value
>>> df.iloc[:3].dtypes["by_col"]
CategoricalDtype(categories=[1, 2, 3], ordered=False, categories_dtype=int64)
# if we then perform a groupby with `observed=False`, we'll see that the missing categorical value
# is actually appears in the result with a default value ('0')
>>> df.iloc[:3].groupby("by_col", observed=False).sum()
        b  c
by_col
1       3  6
2       9  9
3       0  0  <--- result for a missing categorical value
# in case `observed=True` was specified, the result contains only actual dataframe values,
# discarding missing categories
>>> df.iloc[:3].groupby("by_col", observed=True).sum()
        b  c
by_col
1       3  6
2       9  9
              <--- nothing here
# in case of a multi-column groupby, the resulted index will contain a cartesian
# product of (missing_categorical_values X values_of_another_by_column)
>>> df.iloc[:3].groupby(["by_col", "b"], observed=False).sum()
          c
by_col b
1      3  6
       4  0 <--- result for a missing categorical value
       5  0 <--- result for a missing categorical value
2      3  0 <--- result for a missing categorical value
       4  5
       5  4
3      3  0 <--- result for a missing categorical value
       4  0 <--- result for a missing categorical value
       5  0 <--- result for a missing categorical value

How observed=False is implemented in this PR
Groupby itself is always being called with observed=True parameter, meaning that the result won't contain missing categories. Then I've added a post-processing procedure for groupby results that determines missing categories and inserts them in a proper order to partitions. Two kernels are being submitted in order to perform this:

  1. The first kernel calls add_missing_categories_to_groupby() function that takes all resulted partitions with some metadata and determines both missing categories and a fill value that should be used as an aggregation result for these groups. Then this kernel determines which missing categories should go to which partitions so the result remain sorted. As the result of this kernel a dictionary is being returned, mapping partition indices to missing categorical values to be inserted to this partition.
  2. The second kernel is being applied as a map function to the result of groupby and as an argument takes a dictionary that was returned at the previous step. The kernel concatenates the partition's content with the missing categorical values and sort the partition (doesn't take much time as it just sorts a small piece of the result).

Is it possible to run groupby with observed=False parameter in the beginning and avoid this post-processing step?
It's possible, but then we'll need to filter out fake missing values in an additional post-processing stage, which in combination with that each kernel now returns much bigger dataframes makes this implementation slower than the presented one:

# in this example, the total dataframe doesn't have missing categories, however, each partition will 
# individually fill the groupby result with nulls for categories that doesn't present in this partition,
# they will need to be filtered out later
>>> part1
  by_col  b  c
0      1  3  3
1      2  4  2
>>> part2
  by_col  b  c
2      3  3  3
3      4  4  4
>>> part1.groupby("by_col", observed=False).sum()
        b  c
by_col
1       3  3
2       4  2
3       0  0 <--- result for a missing categorical value
4       0  0 <--- result for a missing categorical value
>>> part2.groupby("by_col", observed=False).sum()
        b  c
by_col
1       0  0 <--- result for a missing categorical value
2       0  0 <--- result for a missing categorical value
3       3  3
4       4  4

How this parameter is handled in other modin's groupby implementations?
In the full-axis implementation, we always pass observed=False to the groupby since we're dealing with a full-column partition here and so leave pandas to deal with it.

In the MapReduce implementation, the map stage is always performed with observed=True parameter and the reduce stage (the stage where we build a full-column partition) passes observed=False to the reduction groupby and gets proper result.

Can this be a single full-column kernel performing the post-processing instead of two map kernels?
The overhead of launching two kernels appeared to be much less than running a full-column operation. The full-column kernel approach appeared to be ~1.5x slower than the two map kernels approach.

Performance results

I performed testing on datasets: original H2O dataset and a slightly modified one. The results for the modified dataset are quite good, but for the original one they're quite dissapointing.

1. Tests on modified H2O dataset
Modifications I made to the dataset:

  • id3 column is casted from categorical to str.
    Why?: the column has 1_000_000 unique values which doesn't work very well with the current implementation of categories in modin (see 2nd problem here). In particular, having a column with so much unique values as a categorical, makes each operation much slower, because of the overhead of containing all 1_000_000 categorical values in each partition.
  • id1 only had 10 categorical values, now it has 10_000 categorical values
  • id2 only had 10 categorical values, now it has 100 categorical values

There were several test cases, you can read their description in the code:

code I used to measure the results
import modin.pandas as pd
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
cfg.RangePartitioningGroupby.put(True)
import numpy as np


dtypes = {
    **{n: "category" for n in ["id1", "id2"]},
    **{n: "int32" for n in ["id4", "id5", "id6", "v1", "v2"]},
    "v3": "float64",
}

is_1_5gb_data = False
use_apply_method = False
path = "h2o/G1_1e7_1e1_0_0.csv"

t1 = timer()
df = pd.read_csv(path)

# original h2o data has only 10 unique values in each 'id1' and 'id2'
new_id1_values = [f"id{i}" for i in range(10_000)] * 1_000
new_id2_values = [f"id{i}" for i in range(100)] * 100_000
np.random.shuffle(new_id1_values)
np.random.shuffle(new_id2_values)
df["id1"] = new_id1_values
df["id2"] = new_id2_values

if is_1_5gb_data:
    df = pd.concat([df, df, df])
df = df.astype(dtypes)
execute(df)
print("reading took:", timer() - t1)

gb_params = {"observed": False}

def aggregate(df, by):
    if use_apply_method:
        res = df.groupby(by, **gb_params).apply(lambda df: df[["id6", "v1", "v2", "v3"]].sum())
    else:
        res = df.groupby(by, **gb_params).agg({key: ["sum", "mean", "max"] for key in ("id6", "v1", "v2", "v3")})
    execute(res)
    return res

def case1_1(df):
    """
    MultiIndex with small amount of missing categories.
        [id1, id2] -> 44 / 1_000_000 = <1% missing categories
    """
    return aggregate(df, ["id1", "id2"])

def case1_2(df):
    """
    MultiIndex with small amount of missing categories:
        [id1, id2 + 10% filter] -> 100_036 / 1_000_000 (~10% missing categories)
    """
    to_exclude = np.random.choice(df["id2"].unique(), 10, replace=False)
    df = df[~df["id2"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id1", "id2"])

def case2_1(df):
    """
    MultiIndex with small amount of missing categories.
        [id1 + 50% filter, id2] -> 500_017 / 1_000_000 = 50% missing categories
    """
    to_exclude = np.random.choice(df["id1"].unique(), 5_000, replace=False)
    df = df[~df["id1"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id1", "id2"])

def case2_2(df):
    """
    MultiIndex with small amount of missing categories.
        [id2 + 50% filter, id3] -> 94_575_508 / 99_332_700 = 95% missing categories
    """
    to_exclude = np.random.choice(df["id2"].unique(), 50, replace=False)
    df = df[~df["id2"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id2", "id3"])

def case3_1(df):
    """Single Index with no missing categories (10_000 categories)."""
    return aggregate(df, ["id1"])

def case3_2(df):
    """Single Index with no missing categories (100 categories)."""
    return aggregate(df, ["id2"])

def case4(df):
    """Single Index with a lot of missing categories:
        [id1 + 50% filter] = 5_000 / 10_000 (~50% missing categories).
    """
    to_exclude = np.random.choice(df["id1"].unique(), 5_000, replace=False)
    df = df[~df["id1"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id1"])

def case5(df):
    """Single Index with small amount of missing categories:
        [id1 + 10% filter] = 1_000 / 10_000 (~10% missing categories).
    """
    to_exclude = np.random.choice(df["id1"].unique(), 1_000, replace=False)
    df = df[~df["id1"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id1"])

cases = [case1_1, case1_2, case2_1, case2_2, case3_1, case3_2, case4, case5]
results = {}

for case in cases:
    t1 = timer()
    res = case(df)
    results[case.__name__] = timer() - t1
    print(case.__name__,":", results[case.__name__])

print(results)
print("=====formated=====")
for val in results.values():
    print(val)
500mb data, aggregation functions: `grp.agg(["mean", "sum", "max"])`

image

In this scenario, the only case where the compa-ratio changed its color after enabling observed=False is case2_1. However, the absolute difference is not that high.

The really sad thing is that in cases where the grouping column doesn't have missing categories (case3_1 and case3_2) we still see an overhead of 20-40% just to ensure that there are no missing cats at the post-processing stage.

It's also worth mentioning that the overhead of case3_1 and case4 is almost the same, meaning that the implementation of the post-processing kernels itself doesn't add a lot of overhead, the main overhead is from the fact of submitting an extra post-processing kernel.

1.5gb data, aggregation functions: `grp.agg(["mean", "sum", "max"])`

image

The relative overhead of observed=False has dropped from ~40% on avarege down to ~15% on avarege when compared with 500mb dataset.

500mb data, aggregation functions: `grp.apply(lambda df: df.sum())`

image

Here we're insterested in case4 - case5, as here we're manually running the applied func on every missing group in order to compute individual default values.

2. Tests on original H2O dataset

As was described above, original H2O dataset has a categorical column with a lot of unique values (id3). The high uniquenes makes modin to strugle because of its implementation of categoricals that stores all unique values in each partition (see 2nd problem here).

I've ran the following test cases on original H2O data in two scenarios: with id3 being categoricals and id3 being a string type.

Script to measure
import modin.pandas as pd
import modin.config as cfg

from modin.utils import execute
from timeit import default_timer as timer
cfg.RangePartitioningGroupby.put(True)
import numpy as np

# import pandas as pd

dtypes = {
    **{n: "category" for n in ["id1", "id2", "id3"]},
    **{n: "int32" for n in ["id4", "id5", "id6", "v1", "v2"]},
    "v3": "float64",
}

path = "h2o/G1_1e7_1e1_0_0.csv"

t1 = timer()
df = pd.read_csv(path)
df = df.astype(dtypes)
execute(df)
print("reading took:", timer() - t1)

gb_params = {"observed": False}

def aggregate(df, by):
    res = df.groupby(by, **gb_params).agg({key: ["sum", "mean"] for key in ("id6", "v1", "v2", "v3")})
    execute(res)
    return res

def case1(df):
    """MultiIndex with no missing categories."""
    return aggregate(df, ["id1", "id2"])

def case2_1(df):
    """
    MultiIndex with a lot of missing categories:
        [id1, id2, id3] = 90_477_932 / 99_995_100 (~90% missing categories)
    """
    return aggregate(df, ["id1", "id2", "id3"])

def case2_2(df):
    """
    MultiIndex with a lot of missing categories:
        [id2, id3] = 3_678_510 / 9_999_510 (~36% missing categories)
    """
    return aggregate(df, ["id2", "id3"])

def case3(df):
    """
    MultiIndex with small amount of missing categories:
        [id1, id2] -> id2 != 'id007' and id2 != 'id010' = 20 / 100 (~20% missing categories)
    """
    df = df.query("id2 != 'id007' and id2 != 'id010'")
    execute(df)
    return aggregate(df, ["id1", "id2"])

def case4_1(df):
    """Single Index with no missing categories (10 categories)."""
    return aggregate(df, ["id2"])

def case4_2(df):
    """Single Index with no missing categories (991_951 categories)."""
    return aggregate(df, ["id3"])

def case5(df):
    """Single Index with a lot of missing categories:
        [id3 + filtering] = 500_000 / 999_951 (~50% categories missing categories).
    """
    to_exclude = np.random.choice(df["id3"].unique(), 500_000, replace=False)
    df = df[~df["id3"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id3"])

def case6_1(df):
    """Single Index with small amount of missing categories:
        [id3 + filtering] = 1_000 / 999_951 (~1% categories missing categories).
    """
    to_exclude = np.random.choice(df["id3"].unique(), 1_000, replace=False)
    df = df[~df["id3"].isin(to_exclude)]
    execute(df)
    return aggregate(df, ["id3"])

def case6_2(df):
    """Single Index with small amount of missing categories:
        [id2 + filtering] = 2 / 10 (~20% categories missing categories).
    """
    df = df.query("id2 != 'id007' and id2 != 'id010'")
    execute(df)
    return aggregate(df, ["id2"])


cases = [case1, case2_1, case2_2, case3, case4_1, case4_2, case5, case6_1, case6_2]
results = {}

for case in cases:
    t1 = timer()
    case(df)
    results[case.__name__] = timer() - t1
    print(case.__name__,":", results[case.__name__])

print(results)
print("=====formated=====")
for val in results.values():
    print(val)
H2O original data ~500mb, 'id3' is categorical

image

H2O original data ~500mb, 'id3' is a string type

image

H2O original data ~500mb, 'id3_cat' vs 'id_str'

image

In this comparison we see that casting id3 to an object type makes things much faster. Giving a thought of whether we should claim that creating categorical with high uniquennes is an antipattern for modin.

image

BTW, for pandas when comparing id3_cat vs id3_str, categorical approach wins.

image

However, when comparing modin_id3_str vs pandas_id3_cat, modin is still faster in some cases.

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Reshuffling groupby doesn't handle grouping on a categorical column correctly #5925
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@dchigarev dchigarev marked this pull request as ready for review January 19, 2024 19:42
@@ -38,6 +38,8 @@ def concatenate(dfs):
assert df.columns.equals(dfs[0].columns)
for i in dfs[0].columns.get_indexer_for(dfs[0].select_dtypes("category").columns):
columns = [df.iloc[:, i] for df in dfs]
if not all(isinstance(col.dtype, pandas.CategoricalDtype) for col in columns):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a bug in this function that was never triggered before, decided to fix it in this PR. Tried to made the fix more explicit by adding comments:

all_categorical_parts_are_empty = None
has_non_categorical_parts = False
for col in columns:
    if isinstance(col.dtype, pandas.CategoricalDtype):
        if all_categorical_parts_are_empty is None:
            all_categorical_parts_are_empty = len(col) == 0
            continue
        all_categorical_parts_are_empty &= len(col) == 0
    else:
        has_non_categorical_parts = True
# 'union_categoricals' raises an error if some of the passed values don't have categorical dtype,
# if it happens, we only want to continue when all parts with categorical dtypes are actually empty.
# This can happen if there were an aggregation that discards categorical dtypes and that aggregation
# doesn't properly do so for empty partitions
if has_non_categorical_parts and all_categorical_parts_are_empty:
    continue

modin/core/storage_formats/pandas/query_compiler.py Outdated Show resolved Hide resolved
modin/core/storage_formats/pandas/query_compiler.py Outdated Show resolved Hide resolved
modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
modin/core/dataframe/pandas/dataframe/dataframe.py Outdated Show resolved Hide resolved
original_names = df.index.names
df = pandas.concat([df, values])
if kwargs["sort"]:
# TODO: write search-sorted insertion or sort the result after insertion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide more details? I can't understand what you want here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment:

# TODO: inserting 'values' based on 'searchsorted' result might be more efficient
# in cases of small amount of 'values'

anmyachev
anmyachev previously approved these changes Jan 25, 2024
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev very detailed comments, thank you!

modin/core/dataframe/pandas/dataframe/utils.py Outdated Show resolved Hide resolved
Comment on lines 650 to 656
# If the aggregation has failed, the result would be empty. Assuming the
# fill value to be `np.NaN` here (this may not always be correct!!!)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this also a hack?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a hack, added an according note

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
…artitioning impl

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@anmyachev anmyachev merged commit 46dc0a5 into modin-project:master Jan 29, 2024
37 checks passed
@dchigarev
Copy link
Collaborator Author

CI on master started to fail after merging this PR https://github.com/modin-project/modin/actions/runs/7701338108/job/20995335180

I'll prepare a separate PR to revert this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reshuffling groupby doesn't handle grouping on a categorical column correctly
2 participants