From 3d3055909a63e3bc25922c250cc5b1907f66607d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 7 Oct 2024 15:17:35 -0700 Subject: [PATCH 1/4] add custom code path to dask-cudf --- python/dask_cudf/dask_cudf/expr/_expr.py | 159 +++++++++++++++++++- python/dask_cudf/dask_cudf/expr/_groupby.py | 118 ++++++++++++++- 2 files changed, 275 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index af83a01da98..189d295175e 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -6,11 +6,20 @@ from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Elemwise, Expr, RenameAxis, VarColumns +from dask_expr._groupby import ( + DecomposableGroupbyAggregation, + GroupbyAggregation, +) from dask_expr._reductions import Reduction, Var from dask_expr.io.io import FusedParquetIO from dask_expr.io.parquet import ReadParquetPyarrowFS -from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty +from dask.dataframe.core import ( + _concat, + is_dataframe_like, + make_meta, + meta_nonempty, +) from dask.dataframe.dispatch import is_categorical_dtype from dask.typing import no_default @@ -21,6 +30,154 @@ ## +def _groupby_meta(gb): + from dask.dataframe.dispatch import make_meta, meta_nonempty + + meta = meta_nonempty(gb.frame._meta) + meta = meta.groupby( + gb._by_meta, + # **_as_dict("observed", gb.observed), + # **_as_dict("dropna", gb.dropna), + ) + if gb._slice is not None: + meta = meta[gb._slice] + meta = meta.aggregate(gb.arg) + return make_meta(meta) + + +class CudfGroupbyAggregation(GroupbyAggregation): + @functools.cached_property + def _meta(self): + return _groupby_meta(self) + + def _lower(self): + return DecomposableCudfGroupbyAggregation( + self.frame, + self.arg, + self.observed, + self.dropna, + self.split_every, + self.split_out, + self.sort, + self.shuffle_method, + self._slice, + *self.by, + ) + + +class DecomposableCudfGroupbyAggregation(DecomposableGroupbyAggregation): + sep = "___" + + @functools.cached_property + def _meta(self): + return _groupby_meta(self) + + @property + def shuffle_by_index(self): + return False # We always group by column(s) + + @functools.cached_property + def spec_info(self): + if isinstance(self.arg, (dict, list)): + aggs = self.arg.copy() + else: + aggs = self.arg + + if self._slice and not isinstance(aggs, dict): + aggs = {self._slice: aggs} + + gb_cols = self._by_columns + if isinstance(gb_cols, str): + gb_cols = [gb_cols] + columns = [c for c in self.frame.columns if c not in gb_cols] + if not isinstance(aggs, dict): + aggs = {col: aggs for col in columns} + + # Assert if our output will have a MultiIndex; this will be the case if + # any value in the `aggs` dict is not a string (i.e. multiple/named + # aggregations per column) + str_cols_out = True + aggs_renames = {} + for col in aggs: + if isinstance(aggs[col], str) or callable(aggs[col]): + aggs[col] = [aggs[col]] + elif isinstance(aggs[col], dict): + str_cols_out = False + col_aggs = [] + for k, v in aggs[col].items(): + aggs_renames[col, v] = k + col_aggs.append(v) + aggs[col] = col_aggs + else: + str_cols_out = False + if col in gb_cols: + columns.append(col) + + return { + "aggs": aggs, + "columns": columns, + "str_cols_out": str_cols_out, + "aggs_renames": aggs_renames, + } + + @classmethod + def chunk(cls, df, *by, **kwargs): + from dask_cudf.groupby import _groupby_partition_agg + + return _groupby_partition_agg(df, **kwargs) + + @classmethod + def combine(cls, inputs, **kwargs): + from dask_cudf.groupby import _tree_node_agg + + return _tree_node_agg(_concat(inputs), **kwargs) + + @classmethod + def aggregate(cls, inputs, **kwargs): + from dask_cudf.groupby import _finalize_gb_agg + + return _finalize_gb_agg(_concat(inputs), **kwargs) + + @property + def chunk_kwargs(self) -> dict: + dropna = True if self.dropna is None else self.dropna + return { + "gb_cols": self._by_columns, + "aggs": self.spec_info["aggs"], + "columns": self.spec_info["columns"], + "dropna": dropna, + "sort": self.sort, + "sep": self.sep, + } + + @property + def combine_kwargs(self) -> dict: + dropna = True if self.dropna is None else self.dropna + return { + "gb_cols": self._by_columns, + "dropna": dropna, + "sort": self.sort, + "sep": self.sep, + } + + @property + def aggregate_kwargs(self) -> dict: + dropna = True if self.dropna is None else self.dropna + final_columns = self._slice or self._meta.columns + return { + "gb_cols": self._by_columns, + "aggs": self.spec_info["aggs"], + "columns": self.spec_info["columns"], + "final_columns": final_columns, + "as_index": True, + "dropna": dropna, + "sort": self.sort, + "sep": self.sep, + "str_cols_out": self.spec_info["str_cols_out"], + "aggs_renames": self.spec_info["aggs_renames"], + } + + class CudfFusedParquetIO(FusedParquetIO): @staticmethod def _load_multiple_files( diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py index 65688115b59..f61bb6b1fe6 100644 --- a/python/dask_cudf/dask_cudf/expr/_groupby.py +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -1,5 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +from dask_expr._collection import new_collection from dask_expr._groupby import ( GroupBy as DXGroupBy, SeriesGroupBy as DXSeriesGroupBy, @@ -54,6 +55,64 @@ def _translate_arg(arg): return arg +# def _get_custom_arg(gb, arg): +# from dask_cudf.groupby import ( +# OPTIMIZED_AGGS, +# _aggs_optimized, +# _redirect_aggs, +# ) + +# _arg = _redirect_aggs(arg) +# if not _aggs_optimized(_arg, OPTIMIZED_AGGS) or not hasattr( +# gb.obj._meta, "to_pandas" +# ): +# # Not supported +# return None + +# # Convert all agg specs to dict +# use_list = False +# if not isinstance(_arg, dict): +# use_list = True +# gb_cols = gb._meta.grouping.keys.names +# columns = [c for c in gb.obj.columns if c not in gb_cols] +# _arg = {col: _arg for col in columns} + +# # Normalize the dict and count ops +# naggs = 0 +# str_cols_out = True +# for col in _arg: +# if isinstance(_arg[col], str) or callable(_arg[col]): +# _arg[col] = [_arg[col]] +# naggs += 1 +# elif isinstance(_arg[col], dict): +# # TODO: Support named aggs +# return None +# str_cols_out = False +# col_aggs = [] +# for k, v in _arg[col].items(): +# col_aggs.append(v) +# _arg[col] = col_aggs +# naggs += len(col_aggs) +# else: +# str_cols_out = False + +# # if str_cols_out: +# # # Metadata should use `str` for dict values if that is +# # # what the user originally specified (column names will +# # # be str, rather than tuples). +# # for col in _arg.keys(): +# # _arg[col] = _arg[col][0] +# # if use_list: +# # _arg = next(_arg.values()) + +# if naggs > 1: +# # Only use the custom code path if we +# # are performing multiple aggs at once +# return _arg +# else: +# return None + + # TODO: These classes are mostly a work-around for missing # `observed=False` support. # See: https://github.com/rapidsai/cudf/issues/15173 @@ -90,7 +149,64 @@ def collect(self, **kwargs): return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): - return super().aggregate(_translate_arg(arg), **kwargs) + from dask_cudf.expr._expr import CudfGroupbyAggregation + from dask_cudf.groupby import ( + OPTIMIZED_AGGS, + _aggs_optimized, + _redirect_aggs, + ) + + # Check if "custom" aggregation is supported/needed + _arg = _redirect_aggs(arg) + supported = _aggs_optimized(_arg, OPTIMIZED_AGGS) and hasattr( + self.obj._meta, "to_pandas" + ) + n_aggs = 0 + if supported: + if isinstance(_arg, list): + # TODO: Support named aggregations + supported = supported and all( + [isinstance(v, str) for v in _arg] + ) + n_aggs += len(_arg) + elif isinstance(_arg, dict): + for val in _arg.values(): + if isinstance(val, str): + n_aggs += 1 + elif isinstance(val, list): + n_aggs += len(val) + supported = supported and all( + [isinstance(v, str) for v in val] + ) + else: + # TODO: Support named aggregations + supported = False + if not supported: + break + else: + n_aggs = 1 + + if supported and n_aggs > 1: + # Use custom agg logic from "legacy" dask-cudf. + # This code path may be more efficient than dask-expr + # when we are performing multiple aggregations on the + # same DataFrame at once. + return new_collection( + CudfGroupbyAggregation( + self.obj.expr, + _arg, + self.observed, + self.dropna, + kwargs.get("split_every"), + kwargs.get("split_out"), + self.sort, + kwargs.get("shuffle_method"), + self._slice, + *self.by, + ) + ) + else: + return super().aggregate(_translate_arg(arg), **kwargs) class SeriesGroupBy(DXSeriesGroupBy): From 60f2db88b2458b938d839677ab7d51d2341590d5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 8 Oct 2024 09:28:45 -0700 Subject: [PATCH 2/4] cleanup --- python/dask_cudf/dask_cudf/expr/_expr.py | 214 ++++++++++++-------- python/dask_cudf/dask_cudf/expr/_groupby.py | 134 ++---------- 2 files changed, 151 insertions(+), 197 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 189d295175e..4a9f4de8b9c 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -30,96 +30,87 @@ ## -def _groupby_meta(gb): - from dask.dataframe.dispatch import make_meta, meta_nonempty - - meta = meta_nonempty(gb.frame._meta) - meta = meta.groupby( - gb._by_meta, - # **_as_dict("observed", gb.observed), - # **_as_dict("dropna", gb.dropna), - ) - if gb._slice is not None: - meta = meta[gb._slice] - meta = meta.aggregate(gb.arg) - return make_meta(meta) - - -class CudfGroupbyAggregation(GroupbyAggregation): - @functools.cached_property - def _meta(self): - return _groupby_meta(self) - - def _lower(self): - return DecomposableCudfGroupbyAggregation( - self.frame, - self.arg, - self.observed, - self.dropna, - self.split_every, - self.split_out, - self.sort, - self.shuffle_method, - self._slice, - *self.by, - ) +def _get_spec_info(gb): + if isinstance(gb.arg, (dict, list)): + aggs = gb.arg.copy() + else: + aggs = gb.arg + + if gb._slice and not isinstance(aggs, dict): + aggs = {gb._slice: aggs} + + gb_cols = gb._by_columns + if isinstance(gb_cols, str): + gb_cols = [gb_cols] + columns = [c for c in gb.frame.columns if c not in gb_cols] + if not isinstance(aggs, dict): + aggs = {col: aggs for col in columns} + + # Assert if our output will have a MultiIndex; this will be the case if + # any value in the `aggs` dict is not a string (i.e. multiple/named + # aggregations per column) + str_cols_out = True + aggs_renames = {} + for col in aggs: + if isinstance(aggs[col], str) or callable(aggs[col]): + aggs[col] = [aggs[col]] + elif isinstance(aggs[col], dict): + str_cols_out = False + col_aggs = [] + for k, v in aggs[col].items(): + aggs_renames[col, v] = k + col_aggs.append(v) + aggs[col] = col_aggs + else: + str_cols_out = False + if col in gb_cols: + columns.append(col) + + return { + "aggs": aggs, + "columns": columns, + "str_cols_out": str_cols_out, + "aggs_renames": aggs_renames, + } -class DecomposableCudfGroupbyAggregation(DecomposableGroupbyAggregation): +def _get_meta(gb): + spec_info = gb.spec_info + gb_cols = gb._by_columns + aggs = spec_info["aggs"].copy() + aggs_renames = spec_info["aggs_renames"] + if spec_info["str_cols_out"]: + # Metadata should use `str` for dict values if that is + # what the user originally specified (column names will + # be str, rather than tuples). + for col in aggs: + aggs[col] = aggs[col][0] + _meta = gb.frame._meta.groupby(gb_cols).agg(aggs) + if aggs_renames: + col_array = [] + agg_array = [] + for col, agg in _meta.columns: + col_array.append(col) + agg_array.append(aggs_renames.get((col, agg), agg)) + _meta.columns = pd.MultiIndex.from_arrays([col_array, agg_array]) + return _meta + + +class DecomposableCudfGroupbyAgg(DecomposableGroupbyAggregation): sep = "___" + @functools.cached_property + def spec_info(self): + return _get_spec_info(self) + @functools.cached_property def _meta(self): - return _groupby_meta(self) + return _get_meta(self) @property def shuffle_by_index(self): return False # We always group by column(s) - @functools.cached_property - def spec_info(self): - if isinstance(self.arg, (dict, list)): - aggs = self.arg.copy() - else: - aggs = self.arg - - if self._slice and not isinstance(aggs, dict): - aggs = {self._slice: aggs} - - gb_cols = self._by_columns - if isinstance(gb_cols, str): - gb_cols = [gb_cols] - columns = [c for c in self.frame.columns if c not in gb_cols] - if not isinstance(aggs, dict): - aggs = {col: aggs for col in columns} - - # Assert if our output will have a MultiIndex; this will be the case if - # any value in the `aggs` dict is not a string (i.e. multiple/named - # aggregations per column) - str_cols_out = True - aggs_renames = {} - for col in aggs: - if isinstance(aggs[col], str) or callable(aggs[col]): - aggs[col] = [aggs[col]] - elif isinstance(aggs[col], dict): - str_cols_out = False - col_aggs = [] - for k, v in aggs[col].items(): - aggs_renames[col, v] = k - col_aggs.append(v) - aggs[col] = col_aggs - else: - str_cols_out = False - if col in gb_cols: - columns.append(col) - - return { - "aggs": aggs, - "columns": columns, - "str_cols_out": str_cols_out, - "aggs_renames": aggs_renames, - } - @classmethod def chunk(cls, df, *by, **kwargs): from dask_cudf.groupby import _groupby_partition_agg @@ -178,6 +169,71 @@ def aggregate_kwargs(self) -> dict: } +class CudfGroupbyAgg(GroupbyAggregation): + @functools.cached_property + def spec_info(self): + return _get_spec_info(self) + + @functools.cached_property + def _meta(self): + return _get_meta(self) + + def _lower(self): + return DecomposableCudfGroupbyAgg( + self.frame, + self.arg, + self.observed, + self.dropna, + self.split_every, + self.split_out, + self.sort, + self.shuffle_method, + self._slice, + *self.by, + ) + + +def _maybe_get_custom_expr( + gb, + aggs, + split_every=None, + split_out=None, + shuffle_method=None, + **kwargs, +): + from dask_cudf.groupby import ( + OPTIMIZED_AGGS, + _aggs_optimized, + _redirect_aggs, + ) + + if kwargs: + # Unsupported key-word arguments + return None + + if not hasattr(gb.obj._meta, "to_pandas"): + # Not cuDF-backed data + return None + + _aggs = _redirect_aggs(aggs) + if not _aggs_optimized(_aggs, OPTIMIZED_AGGS): + # One or more aggregations are unsupported + return None + + return CudfGroupbyAgg( + gb.obj.expr, + _aggs, + gb.observed, + gb.dropna, + split_every, + split_out, + gb.sort, + shuffle_method, + gb._slice, + *gb.by, + ) + + class CudfFusedParquetIO(FusedParquetIO): @staticmethod def _load_multiple_files( diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py index f61bb6b1fe6..82731efef59 100644 --- a/python/dask_cudf/dask_cudf/expr/_groupby.py +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -12,6 +12,8 @@ from cudf.core.groupby.groupby import _deprecate_collect +from dask_cudf.expr._expr import _maybe_get_custom_expr + ## ## Custom groupby classes ## @@ -55,67 +57,16 @@ def _translate_arg(arg): return arg -# def _get_custom_arg(gb, arg): -# from dask_cudf.groupby import ( -# OPTIMIZED_AGGS, -# _aggs_optimized, -# _redirect_aggs, -# ) - -# _arg = _redirect_aggs(arg) -# if not _aggs_optimized(_arg, OPTIMIZED_AGGS) or not hasattr( -# gb.obj._meta, "to_pandas" -# ): -# # Not supported -# return None - -# # Convert all agg specs to dict -# use_list = False -# if not isinstance(_arg, dict): -# use_list = True -# gb_cols = gb._meta.grouping.keys.names -# columns = [c for c in gb.obj.columns if c not in gb_cols] -# _arg = {col: _arg for col in columns} - -# # Normalize the dict and count ops -# naggs = 0 -# str_cols_out = True -# for col in _arg: -# if isinstance(_arg[col], str) or callable(_arg[col]): -# _arg[col] = [_arg[col]] -# naggs += 1 -# elif isinstance(_arg[col], dict): -# # TODO: Support named aggs -# return None -# str_cols_out = False -# col_aggs = [] -# for k, v in _arg[col].items(): -# col_aggs.append(v) -# _arg[col] = col_aggs -# naggs += len(col_aggs) -# else: -# str_cols_out = False - -# # if str_cols_out: -# # # Metadata should use `str` for dict values if that is -# # # what the user originally specified (column names will -# # # be str, rather than tuples). -# # for col in _arg.keys(): -# # _arg[col] = _arg[col][0] -# # if use_list: -# # _arg = next(_arg.values()) - -# if naggs > 1: -# # Only use the custom code path if we -# # are performing multiple aggs at once -# return _arg -# else: -# return None - - -# TODO: These classes are mostly a work-around for missing -# `observed=False` support. -# See: https://github.com/rapidsai/cudf/issues/15173 +# We define our own GroupBy classes in Dask cuDF for +# the following reasons: +# (1) We want to use a custom `aggregate` algorithm +# that performs multiple aggregations on the +# same dataframe partition at once. The upstream +# algorithm breaks distinct aggregations into +# separate tasks. +# (2) We need to work around missing `observed=False` +# support: +# https://github.com/rapidsai/cudf/issues/15173 class GroupBy(DXGroupBy): @@ -149,64 +100,11 @@ def collect(self, **kwargs): return self._single_agg(ListAgg, **kwargs) def aggregate(self, arg, **kwargs): - from dask_cudf.expr._expr import CudfGroupbyAggregation - from dask_cudf.groupby import ( - OPTIMIZED_AGGS, - _aggs_optimized, - _redirect_aggs, - ) - - # Check if "custom" aggregation is supported/needed - _arg = _redirect_aggs(arg) - supported = _aggs_optimized(_arg, OPTIMIZED_AGGS) and hasattr( - self.obj._meta, "to_pandas" - ) - n_aggs = 0 - if supported: - if isinstance(_arg, list): - # TODO: Support named aggregations - supported = supported and all( - [isinstance(v, str) for v in _arg] - ) - n_aggs += len(_arg) - elif isinstance(_arg, dict): - for val in _arg.values(): - if isinstance(val, str): - n_aggs += 1 - elif isinstance(val, list): - n_aggs += len(val) - supported = supported and all( - [isinstance(v, str) for v in val] - ) - else: - # TODO: Support named aggregations - supported = False - if not supported: - break - else: - n_aggs = 1 - - if supported and n_aggs > 1: - # Use custom agg logic from "legacy" dask-cudf. - # This code path may be more efficient than dask-expr - # when we are performing multiple aggregations on the - # same DataFrame at once. - return new_collection( - CudfGroupbyAggregation( - self.obj.expr, - _arg, - self.observed, - self.dropna, - kwargs.get("split_every"), - kwargs.get("split_out"), - self.sort, - kwargs.get("shuffle_method"), - self._slice, - *self.by, - ) - ) - else: + expr = _maybe_get_custom_expr(self, arg, **kwargs) + if expr is None: return super().aggregate(_translate_arg(arg), **kwargs) + else: + return new_collection(expr) class SeriesGroupBy(DXSeriesGroupBy): From 84dbd3625020d5ab26258026bdaf6ce01fc15a14 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 8 Oct 2024 10:00:28 -0700 Subject: [PATCH 3/4] add fused arg --- python/dask_cudf/dask_cudf/expr/_groupby.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py index 82731efef59..8a16fe7615d 100644 --- a/python/dask_cudf/dask_cudf/expr/_groupby.py +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -99,12 +99,15 @@ def collect(self, **kwargs): _deprecate_collect() return self._single_agg(ListAgg, **kwargs) - def aggregate(self, arg, **kwargs): - expr = _maybe_get_custom_expr(self, arg, **kwargs) - if expr is None: - return super().aggregate(_translate_arg(arg), **kwargs) - else: + def aggregate(self, arg, fused=True, **kwargs): + if ( + fused + and (expr := _maybe_get_custom_expr(self, arg, **kwargs)) + is not None + ): return new_collection(expr) + else: + return super().aggregate(_translate_arg(arg), **kwargs) class SeriesGroupBy(DXSeriesGroupBy): From 430c0f1db29db08804152a0cbf6cac531492d5d5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 17 Oct 2024 08:46:58 -0700 Subject: [PATCH 4/4] add test coverage --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index e30474f6b94..042e69d86f4 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -14,7 +14,11 @@ import dask_cudf from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized -from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr +from dask_cudf.tests.utils import ( + QUERY_PLANNING_ON, + require_dask_expr, + xfail_dask_expr, +) def assert_cudf_groupby_layers(ddf): @@ -556,10 +560,22 @@ def test_groupby_categorical_key(): ), ], ) +@pytest.mark.parametrize( + "fused", + [ + True, + pytest.param( + False, + marks=require_dask_expr("Not supported by legacy API"), + ), + ], +) @pytest.mark.parametrize("split_out", ["use_dask_default", 1, 2]) @pytest.mark.parametrize("split_every", [False, 4]) @pytest.mark.parametrize("npartitions", [1, 10]) -def test_groupby_agg_params(npartitions, split_every, split_out, as_index): +def test_groupby_agg_params( + npartitions, split_every, split_out, fused, as_index +): df = cudf.datasets.randomdata( nrows=150, dtypes={"name": str, "a": int, "b": int, "c": float}, @@ -574,6 +590,7 @@ def test_groupby_agg_params(npartitions, split_every, split_out, as_index): "c": ["mean", "std", "var"], } + fused_kwarg = {"fused": fused} if QUERY_PLANNING_ON else {} split_kwargs = {"split_every": split_every, "split_out": split_out} if split_out == "use_dask_default": split_kwargs.pop("split_out") @@ -593,6 +610,7 @@ def test_groupby_agg_params(npartitions, split_every, split_out, as_index): ddf.groupby(["name", "a"], sort=True, **maybe_as_index) .aggregate( agg_dict, + **fused_kwarg, **split_kwargs, ) .compute() @@ -614,6 +632,7 @@ def test_groupby_agg_params(npartitions, split_every, split_out, as_index): # Full check (`sort=False`) gr = ddf.groupby(["name", "a"], sort=False, **maybe_as_index).aggregate( agg_dict, + **fused_kwarg, **split_kwargs, ) pr = pddf.groupby(["name", "a"], sort=False).agg(