Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] try to right size hash repartition count for aggregates #11901

Open
revans2 opened this issue Dec 20, 2024 · 0 comments
Open

[FEA] try to right size hash repartition count for aggregates #11901

revans2 opened this issue Dec 20, 2024 · 0 comments
Labels
? - Needs Triage Need team to review and classify feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@revans2
Copy link
Collaborator

revans2 commented Dec 20, 2024

Is your feature request related to a problem? Please describe.
In the current hash aggregation code, if the intermediate data grows too large, then we repartition it to try and have smaller pieces that can be merged to get a final result.

The code as of #11816 will recursively repartition outputs that are too large 16 ways, up to 10 times. That could result in up to 16 ^ 10 partitions being output. So for even a single 1 byte partition this could handle 1 TiB of input, which is more than we would ever need. But it also would have to run something on the order of 16^9 kernels and the output would require a crazy amount of host memory to hold the metadata. This is an unrealistic case, but we probably should still try to optimize this given that partitioning the data two ways might not be that uncommon, and if we ever do hit a case where we might need to partition it with a depth of 3, then we are wasting a lot of resources, and might be spilling a lot more than we need to.

We don't have a lot of information as we do the first pass of repartition because we want to avoid spilling, so we don't know how much data there is in total until we are done reading it. For this pass we probably want to do the repatition 16-ways, but as soon as we have read all of the data we know

  1. The size of the input data in bytes
  2. The number of rows total and per batch
  3. The number of input batches

Along with this we know that all of the rows in a batch have unique keys (because it has passed the initial aggregation stage)

From this we can derive that the number of unique keys (cardinality) in a bucket is between aggregateBatches.map(_.numRows).max() and aggregatebatches.map(_.numRows).total

With this we can do a fairly simple algorithm to decide how many batches we need to split the data into.

val minCardinality = aggreateBatches.map(_.numRows).max()
val targetBatchesBySize = aggregateBatches.map(_.size).sum() / targetBatchSize
val numBuckets = min(minCardinality, targetBatchesBySize)

If after repartitioning a second time we still have batches that are too large, it probably means that we have a lot of duplicate values and we can just merge the batches to output a result.

In addition to this we should be more aggressive about releasing batches that are small enough to process instead of waiting for all of the data to be repartitioned. It is not that uncommon to have skewed keys in an aggregation. It is unlikely to be that skewed after an initial aggregation pass, but it is not impossible. So if we do a first pass repartition and some of the batches are small enough we should just merge them and release them to be processed instead of trying to repartition the data fully.

@revans2 revans2 added ? - Needs Triage Need team to review and classify feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Dec 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
? - Needs Triage Need team to review and classify feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

No branches or pull requests

1 participant