Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-partition Shuffle operation to cuDF Polars #17744

Merged
merged 23 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
89392c0
try importing dask_expr from dask.dataframe
rjzamora Jan 9, 2025
2a6821d
Merge remote-tracking branch 'upstream/branch-25.02' into dask-expr-m…
rjzamora Jan 9, 2025
5743030
Merge branch 'branch-25.02' into dask-expr-migration
rjzamora Jan 11, 2025
7d36d3b
Merge remote-tracking branch 'upstream/branch-25.02' into dask-expr-m…
rjzamora Jan 14, 2025
88e078d
update the error message
rjzamora Jan 14, 2025
1f77ec4
add basic shuffle support
rjzamora Jan 14, 2025
8c52fde
major revision
rjzamora Jan 15, 2025
0886ab7
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Jan 15, 2025
f714a51
roll back unrelated changes
rjzamora Jan 15, 2025
677ef36
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Jan 22, 2025
6b0b9f1
address some code review
rjzamora Jan 22, 2025
4da24b1
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Jan 22, 2025
c7b81e3
check the result
rjzamora Jan 22, 2025
fd6e39c
fix test
rjzamora Jan 22, 2025
ecba98d
Merge remote-tracking branch 'upstream/branch-25.02' into cudf-polars…
rjzamora Jan 23, 2025
f02c146
simplify Shuffle (only handle hash-based partitioning for now)
rjzamora Jan 23, 2025
8604e1b
remove multi-child validation
rjzamora Jan 23, 2025
86fad9d
Merge remote-tracking branch 'upstream/branch-25.04' into cudf-polars…
rjzamora Jan 27, 2025
9624396
address code review
rjzamora Jan 27, 2025
264fcfd
avoid shuffling single partition
rjzamora Jan 27, 2025
82f9c78
fix test bug
rjzamora Jan 27, 2025
a502f71
turn do_evaluate back into a no-op
rjzamora Jan 28, 2025
08b4db5
Merge branch 'branch-25.04' into cudf-polars-multi-shuffle
rjzamora Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition base classes."""

Expand All @@ -12,6 +12,7 @@
from collections.abc import Iterator, Sequence

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.nodebase import Node


Expand All @@ -22,10 +23,15 @@ class PartitionInfo:
This class only tracks the partition count (for now).
"""
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

__slots__ = ("count",)
__slots__ = ("count", "partitioned_on")

def __init__(self, count: int):
def __init__(
self,
count: int,
partitioned_on: tuple[NamedExpr, ...] = (),
):
self.count = count
self.partitioned_on = partitioned_on

def keys(self, node: Node) -> Iterator[tuple[str, int]]:
"""Return the partitioned keys for a given node."""
Expand Down
5 changes: 3 additions & 2 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition Dask execution."""

Expand All @@ -10,7 +10,8 @@
from typing import TYPE_CHECKING, Any

import cudf_polars.experimental.io
import cudf_polars.experimental.select # noqa: F401
import cudf_polars.experimental.select
import cudf_polars.experimental.shuffle # noqa: F401
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union
from cudf_polars.dsl.traversal import CachingVisitor, traversal
from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name
Expand Down
245 changes: 245 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/shuffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Shuffle Logic."""

from __future__ import annotations

import json
import operator
from functools import reduce
from typing import TYPE_CHECKING, Any

import pyarrow as pa

import pylibcudf as plc

from cudf_polars.containers import DataFrame
from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import _concat, get_key_name
from cudf_polars.experimental.dispatch import generate_ir_tasks, lower_ir_node

if TYPE_CHECKING:
from collections.abc import Hashable, MutableMapping

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.experimental.dispatch import LowerIRTransformer
from cudf_polars.experimental.parallel import PartitionInfo
from cudf_polars.typing import Schema


class Shuffle(IR):
"""
Suffle multi-partition data.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Notes
-----
A Shuffle node may have either one or two children. In both
cases, the first child corresponds to the DataFrame we are
shuffling. The optional second child corresponds to a distinct
DataFrame to extract the shuffle keys from. For example, it
may be useful to reference a distinct DataFrame in the case
of sorting.

The type of argument `keys` controls whether or not hash
partitioning will be applied. If `keys` is a tuple, we
assume that the corresponding columns must be hashed. If
`keys` is a `NamedExpr`, we assume that the corresponding
column already contains a direct partition mapping.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances do we shuffle one dataframe with the keys/expressions from another dataframe?

In the case of a sortby then all the referenced columns must live in the same dataframe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this would be simpler if we always took a dataframe that is being shuffled and a dataframe that is being used to compute the partitioning keys (these can be the same), along with a NamedExpr (or just an Expr) that can produce the partition mapping?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances do we shuffle one dataframe with the keys/expressions from another dataframe?
In the case of a sortby then all the referenced columns must live in the same dataframe.

My thinking is that we want the Shuffle design to be something that we can use to "lower" both a hash-based shuffle (for a join or groupby), or a sortby. In the case of sortby, we don't actually care whether the referenced columns live in the same dataframe being sorted, because we need to do something like a global quantiles calculation on the referenced columns to figure out which partition each row corresponds to. Therefore, when we are sort df on column "A", we will probably want to add a new graph that transforms df["A"] into the final partition mapping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I guess somehow the thing we're using to shuffle the dataframe does come from that dataframe (otherwise it seems like you would have had to do a join first, at least morally).

So are you kind of asking for an extension of the expression language to express the computation on the input dataframe that results in a new column with appropriate partition keys?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So are you kind of asking for an extension of the expression language to express the computation on the input dataframe that results in a new column with appropriate partition keys?

Yes, that is probably a reasonable way to think about it. For a simple hash-based shuffle, the hypothetical expression for finding the output partition of each row is pointwise. In the case of a sort, the expression requires global data movement (i.e. the histogram/quantiles).

At the moment, it's trivial to evaluate a pointwise expression to calculate the partition mapping. However, it is not possible to evaluate a non-pointwise expression without offloading that calculation to a distinct IR node.

Relevant context: We don't currently support multi-partition expression unless they are "pointwise". We spent some time refactoring the IR class so that we can "lower" the evaluation of an IR node into tasks that execute the (static) IR.do_evaluate method. However, we cannot do this for Expr.do_evaluate yet. My impression was that we are not planning to refactor the Expr class. If so, we will probably need to decompose a single IR node containing a non-pointwise expression into one or more IR nodes that we know how to map onto a task graph.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all you work so far @rjzamora! My apologies, I don't have anything to add to the review. I'm adding this comment just to check my understanding.

At the moment, it's trivial to evaluate a pointwise expression to calculate the partition mapping.

So we've got hash-based shuffles which are pointwise. This makes it relatively straightforward to determine the partition mapping. Eg. hash(df["A"]) % num_partitions only depends on row "A"`.

Sort-based shuffles are non-pointwise because you'd need to know the ranges that divide the dataframe into partitions. Eg. [8, 4, 10, 2, 1] into 3 partitions -> {0: [1, 2], 1: [4], 2: [8,10]}. How would we calculate the boundaries? (which I think is the quantile calculation)

However, it is not possible to evaluate a non-pointwise expression without offloading that calculation to a distinct IR node.

Would you use multiple IR nodes to do the calculation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delayed response here @Matt711 !

So we've got hash-based shuffles which are pointwise.

Exactly right. Just to state this a slightly-different way: Any shuffle operation is actually two distinct operations. First, we need to figure out where each row is going, then we perform the actual shuffle. Lets call that first step the "partition-mapping" calculation. For a hash-bashed shuffle, the partition-mapping step is indeed pointwise. For a sort, the partition-mapping step is not.

Sort-based shuffles ... How would we calculate the boundaries? (which I think is the quantile calculation)

In Dask DataFrame, we essentially calculate a list of N quantiles on each partition independently (where N is >= the number of output partitions). Since the data may not be balanced, we then calculate an approximate "global" quantiles by merging these independent quantile calculations together (the code is generally in dask/dataframe/partitionquantiles.py).

In Dask DataFrame, we reduce these "global" quantiles on the client. However, for cudf-polars we may want to write it as more of an all-reduce pattern (TBD).

Would you use multiple IR nodes to do the calculation?

Yes, I think so. But this is just a design choice that allows us to keep "Shuffle" logic separate from "partition-mapping" logic. There is no fundamental requirement for us to do this.

"""

__slots__ = ("keys", "options")
_non_child = ("schema", "keys", "options")
keys: tuple[NamedExpr, ...] | NamedExpr
"""Keys to shuffle on."""
options: dict[str, Any]
"""Shuffling options."""

def __init__(
self,
schema: Schema,
keys: tuple[NamedExpr, ...] | NamedExpr,
options: dict[str, Any],
*children: IR,
):
self.schema = schema
self.keys = keys
self.options = options
self._non_child_args = ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be (schema, keys, options)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that a Shuffle IR node is a "special" case where we don't actually want the do_evaluate method to be used at all. I actually just changed Shuffle.do_evaluate to return a NotImplementedError, since a single-partition shuffle should never occur.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think it would be useful to be able to evaluate it, because then one can test the rewrites on a single partition independent of the partitioning and dask backend

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, seems reasonable to me. I changed Shuffle.do_evaluate to be a no-op for now.

self.children = children
if len(children) > 2: # pragma: no cover
raise ValueError(f"Expected a maximum of two children, got {children}")

def get_hashable(self) -> Hashable:
"""Hashable representation of the node."""
return (
type(self),
tuple(self.schema.items()),
self.keys,
json.dumps(self.options),
self.children,
)

@classmethod
def do_evaluate(cls, df: DataFrame): # pragma: no cover
"""Evaluate and return a dataframe."""
# Single-partition logic is a no-op
return df


def _partition_dataframe(
df: DataFrame,
index: DataFrame | None,
keys: tuple[NamedExpr, ...] | NamedExpr,
count: int,
) -> dict[int, DataFrame]:
"""
Partition an input DataFrame for shuffling.

Parameters
----------
df
DataFrame to partition.
index
Optional DataFrame from which to extract partitioning
keys. If None, keys will be extracted from `df`.
keys
Shuffle key(s) to extract from index or df.
count
Total number of output partitions.

Returns
-------
A dictionary mapping between int partition indices and
DataFrame fragments.
"""
# Extract output-partition mapping
if isinstance(keys, tuple):
# Hash the specified keys to calculate the output
# partition for each row (partition_map)
partition_map = plc.binaryop.binary_operation(
plc.hashing.murmurhash3_x86_32(
DataFrame([expr.evaluate(index or df) for expr in keys]).table
),
plc.interop.from_arrow(pa.scalar(count, type="uint32")),
plc.binaryop.BinaryOperator.PYMOD,
plc.types.DataType(plc.types.TypeId.UINT32),
)
else: # pragma: no cover
# Specified key column already contains the
# output-partition index in each row
partition_map = keys.evaluate(index or df).obj

# Apply partitioning
t, offsets = plc.partitioning.partition(
df.table,
partition_map,
count,
)

# Split and return the partitioned result
return {
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
i: DataFrame.from_table(
split,
df.column_names,
)
for i, split in enumerate(plc.copying.split(t, offsets[1:-1]))
}


def _simple_shuffle_graph(
name_out: str,
name_in: str,
name_index: str | None,
keys: tuple[NamedExpr, ...] | NamedExpr,
count_in: int,
count_out: int,
) -> MutableMapping[Any, Any]:
"""Make a simple all-to-all shuffle graph."""
split_name = f"split-{name_out}"
inter_name = f"inter-{name_out}"

graph: MutableMapping[Any, Any] = {}
for part_out in range(count_out):
_concat_list = []
for part_in in range(count_in):
graph[(split_name, part_in)] = (
_partition_dataframe,
(name_in, part_in),
None if name_index is None else (name_index, part_in),
keys,
count_out,
)
_concat_list.append((inter_name, part_out, part_in))
graph[_concat_list[-1]] = (
operator.getitem,
(split_name, part_in),
part_out,
)
graph[(name_out, part_out)] = (_concat, _concat_list)
return graph


@lower_ir_node.register(Shuffle)
def _(
ir: Shuffle, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
# Simple lower_ir_node handling for the default hash-based shuffle.
# More-complex logic (e.g. joining and sorting) should
# be handled separately.
from cudf_polars.experimental.parallel import PartitionInfo

# Check ir.keys
if not isinstance(ir.keys, tuple): # pragma: no cover
raise NotImplementedError(
f"Default hash Shuffle does not support NamedExpr keys argument. Got {ir.keys}"
)

# Extract child partitioning
children, _partition_info = zip(*(rec(c) for c in ir.children), strict=True)
partition_info = reduce(operator.or_, _partition_info)
pi = partition_info[children[0]]

# Check child count
if len(children) > 1: # pragma: no cover
raise NotImplementedError(
f"Default hash Shuffle does not support multiple children. Got {children}"
)

# Check if we are already shuffled or update partition_info
if ir.keys == pi.partitioned_on:
# Already shuffled!
new_node = children[0]
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
else:
new_node = ir.reconstruct(children)
partition_info[new_node] = PartitionInfo(
# Default shuffle preserves partition count
count=pi.count,
# Add partitioned_on info
partitioned_on=ir.keys,
)

return new_node, partition_info


@generate_ir_tasks.register(Shuffle)
def _(
ir: Shuffle, partition_info: MutableMapping[IR, PartitionInfo]
) -> MutableMapping[Any, Any]:
# Use a simple all-to-all shuffle graph.

# TODO: Optionally use rapidsmp.
if len(ir.children) > 1: # pragma: no cover
child_ir, index_ir = ir.children
index_name = get_key_name(index_ir)
else:
child_ir = ir.children[0]
index_name = None

return _simple_shuffle_graph(
get_key_name(ir),
get_key_name(child_ir),
index_name,
ir.keys,
partition_info[child_ir].count,
partition_info[ir].count,
)
63 changes: 63 additions & 0 deletions python/cudf_polars/tests/experimental/test_shuffle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest

import polars as pl

from cudf_polars import Translator
from cudf_polars.dsl.expr import Col, NamedExpr
from cudf_polars.experimental.parallel import evaluate_dask, lower_ir_graph
from cudf_polars.experimental.shuffle import Shuffle


@pytest.fixture(scope="module")
def engine():
return pl.GPUEngine(
raise_on_fail=True,
executor="dask-experimental",
executor_options={"max_rows_per_partition": 30000},
)


@pytest.fixture(scope="module")
def df():
return pl.LazyFrame(
{
"x": [1, 2, 3, 4, 5, 6, 7],
"y": [1, 1, 1, 1, 1, 1, 1],
"z": ["a", "b", "c", "d", "e", "f", "g"],
}
)


def test_hash_shuffle(df, engine):
# Extract translated IR
qir = Translator(df._ldf.visit(), engine).translate_ir()

# Add first Shuffle node
keys = (NamedExpr("x", Col(qir.schema["x"], "x")),)
options = {}
qir1 = Shuffle(qir.schema, keys, options, qir)

# Add second Shuffle node (on the same keys)
qir2 = Shuffle(qir.schema, keys, options, qir1)

# Check that sequential shuffles on the same keys
# are replaced with a single shuffle node
partition_info = lower_ir_graph(qir2)[1]
assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 1

# Add second Shuffle node (on different keys)
keys2 = (NamedExpr("z", Col(qir.schema["z"], "z")),)
qir3 = Shuffle(qir2.schema, keys2, options, qir2)

# Check that we have an additional shuffle
# node after shuffling on different keys
partition_info = lower_ir_graph(qir3)[1]
assert len([node for node in partition_info if isinstance(node, Shuffle)]) == 2

# Check that Dask evaluation works
evaluate_dask(qir3)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
Loading