-
Notifications
You must be signed in to change notification settings - Fork 924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multi-partition Shuffle
operation to cuDF Polars
#17744
Add multi-partition Shuffle
operation to cuDF Polars
#17744
Conversation
cc @wence- - It may make sense to get this in before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions for discussion
A Shuffle node may have either one or two children. In both | ||
cases, the first child corresponds to the DataFrame we are | ||
shuffling. The optional second child corresponds to a distinct | ||
DataFrame to extract the shuffle keys from. For example, it | ||
may be useful to reference a distinct DataFrame in the case | ||
of sorting. | ||
|
||
The type of argument `keys` controls whether or not hash | ||
partitioning will be applied. If `keys` is a tuple, we | ||
assume that the corresponding columns must be hashed. If | ||
`keys` is a `NamedExpr`, we assume that the corresponding | ||
column already contains a direct partition mapping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances do we shuffle one dataframe with the keys/expressions from another dataframe?
In the case of a sortby
then all the referenced columns must live in the same dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this would be simpler if we always took a dataframe that is being shuffled and a dataframe that is being used to compute the partitioning keys (these can be the same), along with a NamedExpr
(or just an Expr
) that can produce the partition mapping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under what circumstances do we shuffle one dataframe with the keys/expressions from another dataframe?
In the case of a sortby then all the referenced columns must live in the same dataframe.
My thinking is that we want the Shuffle
design to be something that we can use to "lower" both a hash-based shuffle (for a join or groupby), or a sortby. In the case of sortby, we don't actually care whether the referenced columns live in the same dataframe being sorted, because we need to do something like a global quantiles calculation on the referenced columns to figure out which partition each row corresponds to. Therefore, when we are sort df
on column "A"
, we will probably want to add a new graph that transforms df["A"]
into the final partition mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I guess somehow the thing we're using to shuffle the dataframe does come from that dataframe (otherwise it seems like you would have had to do a join first, at least morally).
So are you kind of asking for an extension of the expression language to express the computation on the input dataframe that results in a new column with appropriate partition keys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So are you kind of asking for an extension of the expression language to express the computation on the input dataframe that results in a new column with appropriate partition keys?
Yes, that is probably a reasonable way to think about it. For a simple hash-based shuffle, the hypothetical expression for finding the output partition of each row is pointwise. In the case of a sort, the expression requires global data movement (i.e. the histogram/quantiles).
At the moment, it's trivial to evaluate a pointwise expression to calculate the partition mapping. However, it is not possible to evaluate a non-pointwise expression without offloading that calculation to a distinct IR
node.
Relevant context: We don't currently support multi-partition expression unless they are "pointwise". We spent some time refactoring the IR
class so that we can "lower" the evaluation of an IR
node into tasks that execute the (static) IR.do_evaluate
method. However, we cannot do this for Expr.do_evaluate
yet. My impression was that we are not planning to refactor the Expr
class. If so, we will probably need to decompose a single IR
node containing a non-pointwise expression into one or more IR
nodes that we know how to map onto a task graph.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all you work so far @rjzamora! My apologies, I don't have anything to add to the review. I'm adding this comment just to check my understanding.
At the moment, it's trivial to evaluate a pointwise expression to calculate the partition mapping.
So we've got hash-based shuffles which are pointwise. This makes it relatively straightforward to determine the partition mapping. Eg. hash(df["A"]) % num_partitions
only depends on row "A"`.
Sort-based shuffles are non-pointwise because you'd need to know the ranges that divide the dataframe into partitions. Eg. [8, 4, 10, 2, 1] into 3 partitions -> {0: [1, 2], 1: [4], 2: [8,10]}
. How would we calculate the boundaries? (which I think is the quantile calculation)
However, it is not possible to evaluate a non-pointwise expression without offloading that calculation to a distinct IR node.
Would you use multiple IR nodes to do the calculation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delayed response here @Matt711 !
So we've got hash-based shuffles which are pointwise.
Exactly right. Just to state this a slightly-different way: Any shuffle operation is actually two distinct operations. First, we need to figure out where each row is going, then we perform the actual shuffle. Lets call that first step the "partition-mapping" calculation. For a hash-bashed shuffle, the partition-mapping step is indeed pointwise. For a sort, the partition-mapping step is not.
Sort-based shuffles ... How would we calculate the boundaries? (which I think is the quantile calculation)
In Dask DataFrame, we essentially calculate a list of N quantiles on each partition independently (where N is >= the number of output partitions). Since the data may not be balanced, we then calculate an approximate "global" quantiles by merging these independent quantile calculations together (the code is generally in dask/dataframe/partitionquantiles.py).
In Dask DataFrame, we reduce these "global" quantiles on the client. However, for cudf-polars we may want to write it as more of an all-reduce pattern (TBD).
Would you use multiple IR nodes to do the calculation?
Yes, I think so. But this is just a design choice that allows us to keep "Shuffle" logic separate from "partition-mapping" logic. There is no fundamental requirement for us to do this.
@wence- - As we discussed offline, I decided to simplify the |
@wence- Are we good here? (should I re-target 25.04?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small suggestions, but let's go for 25.04
self.schema = schema | ||
self.keys = keys | ||
self.options = options | ||
self._non_child_args = () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be (schema, keys, options)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that a Shuffle
IR node is a "special" case where we don't actually want the do_evaluate
method to be used at all. I actually just changed Shuffle.do_evaluate
to return a NotImplementedError
, since a single-partition shuffle should never occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I think it would be useful to be able to evaluate it, because then one can test the rewrites on a single partition independent of the partitioning and dask backend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, seems reasonable to me. I changed Shuffle.do_evaluate
to be a no-op for now.
Thanks @galipremsagar - Does anyone know what's going on with the "pre-commit.ci" check? Do I need to do something to update my local pre-commit hooks? |
CI is unblocked. They are optional for now. But @bdice will know more about it. |
You need to merge the latest changes in from 25.02. 25.04 is a bit behind because the forward merger was blocked. We should be able to get that resolved this morning. |
@wence- We happy here once CI is clear? |
/merge |
Description
This PR pulls out the
Shuffle
logic from #17518 to simplify the review process.The goal is to establish the shuffle groundwork for multi-partition
Join
andSort
operations.Checklist