Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repartition-based fallback for hash aggregate #11116

Open
wants to merge 11 commits into
base: branch-24.10
Choose a base branch
from

Conversation

binmahone
Copy link
Collaborator

@binmahone binmahone commented Jul 1, 2024

this PR closes #8391.

this PR add a config called spark.rapids.sql.agg.fallbackAlgorithm to let user decide a sort-based algorithm or repartition-based algorithm to use when agg cannot be done in a single pass in memory.

This optimization is orthogonal to #10950

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone binmahone changed the title workable version without tests [FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate Jul 1, 2024
@binmahone binmahone marked this pull request as draft July 1, 2024 09:16
@binmahone binmahone changed the title [FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate repartition-based fallback for hash aggregate Jul 1, 2024
@binmahone
Copy link
Collaborator Author

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

2 similar comments
@binmahone
Copy link
Collaborator Author

build

@pxLi
Copy link
Collaborator

pxLi commented Jul 2, 2024

build

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

@binmahone binmahone requested a review from revans2 July 2, 2024 10:40
@binmahone binmahone marked this pull request as ready for review July 2, 2024 10:40
@revans2
Copy link
Collaborator

revans2 commented Jul 2, 2024

Can we please get a performance comparison for this change?

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a pass through this and I have a few concerns. Mainly that we don't have any performance numbers to share and it is not clear why/if we need to keep both the sort based fallback and the hash based fallback.

batches: mutable.ArrayBuffer[SpillableColumnarBatch],
metrics: GpuHashAggregateMetrics,
concatAndMergeHelper: AggHelper): SpillableColumnarBatch = {
// TODO: concatenateAndMerge (and calling code) could output a sequence
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there plans to deal with this TODO comment? I see that this is a copy and paste, so if there isn't it is fine. I just wanted to check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the TODO is there before my PR. In this PR I refactored tryMergeAggregatedBatches and its related functions into object AggregateUtils, so that tryMergeAggregatedBatches can be called with different parameters. (Previously it's a member function of GpuMergeAggregateIterator and coupled with GpuMergeAggregateIterator's local fields)

spillableBatch.getColumnarBatch()
}
}
})
} else {
// fallback to sort agg, this is the third pass agg
fallbackIter = Some(buildSortFallbackIterator())
aggFallbackAlgorithm.toLowerCase match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use an enum or something like it here? a string comparison feels potentially problematic.


def totalSize(): Long = batches.map(_.sizeInBytes).sum

def isAllBatchesSingleRow: Boolean = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit areAllBatchesSingleRow


def split(): ListBuffer[AggregatePartition] = {
withResource(new NvtxWithMetrics("agg repartition", NvtxColor.CYAN, repartitionTime)) { _ =>
if (seed >= hashSeed + 100) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we ever need to repartition the data more than once?

The current code does a single aggregation pass through the data. Once you have done that pass you know the statistics about the data and should be able to make a very good guess about how to combine the data based on the number of shuffle partitions. Is this because there might be a large number of hash collisions? I think in practice that would never happen, but I would like to understand the reasoning here.

Copy link
Collaborator Author

@binmahone binmahone Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly for integrations tests. In hash_aggregate_test.py there're some cases with where one round of repartition cannot make meet the terminate criterial. By terminate criterial I mean either criterial is met:

  1. the new partition is less than targetMergeBatchSize in size (https://github.com/binmahone/spark-rapids/blob/4cf4a4566008321f6bc9f600365563daa11614cf/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L1095) , however in integration tests, with a very small batchsize (250 bytes as in https://github.com/binmahone/spark-rapids/blob/0b50434faba9ca526cfbfea560fd2e50058e7bcd/integration_tests/src/main/python/hash_aggregate_test.py#L35), the new partition is usually larger than 250 bytes (considering the size overhead for each parititon), that leads to:
  2. isAllBatchesSingleRow in https://github.com/binmahone/spark-rapids/blob/4cf4a4566008321f6bc9f600365563daa11614cf/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L1094C28-L1094C49, if all batches are single row, we treat it as a terminate state as well. However this state is harder to achieve as it requires more rounds of repartition.

It's also worth mentioning that this PR tends to be conservative in determining the number of new partitions (https://github.com/binmahone/spark-rapids/blob/4cf4a4566008321f6bc9f600365563daa11614cf/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L1070). It is designed so for performance consideration. Using a larger partition number for eah repartition may speed up meeting the terminate criterial. But still we cannot guarantee one round of repartition is sufficent.

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Collaborator Author

build

@@ -60,6 +60,7 @@ Name | Description | Default Value | Applicable at
<a name="shuffle.ucx.activeMessages.forceRndv"></a>spark.rapids.shuffle.ucx.activeMessages.forceRndv|Set to true to force 'rndv' mode for all UCX Active Messages. This should only be required with UCX 1.10.x. UCX 1.11.x deployments should set to false.|false|Startup
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null|Startup
<a name="shuffle.ucx.useWakeup"></a>spark.rapids.shuffle.ucx.useWakeup|When set to true, use UCX's event-based progress (epoll) in order to wake up the progress thread when needed, instead of a hot loop.|true|Startup
<a name="sql.agg.fallbackAlgorithm"></a>spark.rapids.sql.agg.fallbackAlgorithm|When agg cannot be done in a single pass, use sort-based fallback or repartition-based fallback.|sort|Runtime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood we make sort default to understand regression. Can we set this by default on to gain more exercises? And eventually, we should deprecate sort based fallback within aggregation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per our offline discussion , this should be unnecessary now, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Particular for this one, I still felt to make it default ON and this is more like an alternative to sort-based approach.

@@ -43,7 +43,8 @@ object Arm extends ArmScalaSpecificImpl {
}

/** Executes the provided code block and then closes the sequence of resources */
def withResource[T <: AutoCloseable, V](r: Seq[T])(block: Seq[T] => V): V = {
def withResource[T <: AutoCloseable, V](r: Seq[T])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any idea code formatter changed here?

@@ -99,6 +98,7 @@ object AggregateUtils {
isReductionOnly: Boolean): Long = {
def typesToSize(types: Seq[DataType]): Long =
types.map(GpuBatchUtils.estimateGpuMemory(_, nullable = false, rowCount = 1)).sum

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary change.

// this only happens in test cases). Doing more re-partitioning will not help to reduce
// the partition size anymore. In this case we should merge all the batches into one
// regardless of the target size.
logWarning(s"Unable to merge aggregated batches within " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Will this be more friendly to turn this into debug metric.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this line is not added by this PR, I just refactored the functions.

logInfo(s"Falling back to sort-based aggregation with ${aggregatedBatches.size()} batches")
private def buildRepartitionFallbackIterator(): Iterator[ColumnarBatch] = {
logInfo(s"Falling back to repartition-based aggregation with " +
s"${aggregatedBatches.size} batches")
metrics.numTasksFallBacked += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As offline mentioned, remove this one as no more sort fallback.

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@revans2
Copy link
Collaborator

revans2 commented Jul 10, 2024

I think there may be something wrong with your metrics for the repartition case. If I run.

spark.conf.set("spark.sql.shuffle.partitions", 2)
spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false)
spark.time(spark.range(0, 3000000000L, 1, 2).selectExpr("CAST(rand(0) * 3000000000 AS LONG) DIV 2 as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())

with repartition then the metrics for aggregations are all very large compared to running it for sort. But the total run time is actually smaller.

@binmahone
Copy link
Collaborator Author

binmahone commented Jul 15, 2024

Hi @revans2 , do you mean the op time metrics? I did some investigation, and found that for sort-based fallback, op time can be very inaccurate in terms of failing to capture many spill times. E.g. if a spill is triggered by https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala#L969, the time for the spill is not counted to op time. If we take a look at NSYS, we can see many NVTX ranges named "device memory sync spill". These ranges do not hava a parent NVTX and seem not captured by op time metrics.

image

On the other side, op time can be inaccurate for repartition-based fallback as well. (But may not miss as many ranges as sort-based do). Actually, the inaccurancy is rooted in the way we measure op time. Do you think we need to refine how op time is measured, so that we can make sure the sum of all operators' op time is equals to wall time?

@binmahone
Copy link
Collaborator Author

binmahone commented Jul 15, 2024

I think there may be something wrong with your metrics for the repartition case. If I run.

spark.conf.set("spark.sql.shuffle.partitions", 2)
spark.conf.set("spark.rapids.sql.agg.singlePassPartialSortEnabled", false)
spark.time(spark.range(0, 3000000000L, 1, 2).selectExpr("CAST(rand(0) * 3000000000 AS LONG) DIV 2 as id", "id % 2 as data").groupBy("id").agg(count(lit(1)), avg(col("data"))).orderBy("id").show())

with repartition then the metrics for aggregations are all very large compared to running it for sort. But the total run time is actually smaller.

I also found that, for this synthetic case, sort-based fallback beats repartition-based fallback on my PC (it's about 6.2 min vs. 6.6 min), with following configs:

bin/spark-shell \
       --master 'local[10]' \
       --driver-memory 20g \
       --conf spark.rapids.memory.pinnedPool.size=20G \
       --conf spark.sql.files.maxPartitionBytes=2g \
       --conf spark.driver.extraJavaOptions=-Dai.rapids.cudf.nvtx.enabled=true \
       --conf spark.plugins=com.nvidia.spark.SQLPlugin \
      --conf  spark.rapids.sql.metrics.level='DEBUG' \
      --conf spark.rapids.sql.agg.fallbackAlgorithm='repartition' \
       --conf spark.eventLog.enabled=true \
       --jars /home/hongbin/code/spark-rapids2/dist/target/rapids-4-spark_2.12-24.08.0-SNAPSHOT-cuda11.jar 

I also compared the repartition-based fallbacks over the sort-based fallback on NDS, and found that despite the total duration has a little improvement, we CANNOT garantee repartition-based fallbacks always wins. I haven't found a simple rule/heusitic to decided when to use repartition-based and when to use the other, so it would be difficult for us to explain which is better to the users.

For now, I would suggest users to try repartition-based fallback if a lot of buffer spills are observed. However it's still not a rule of thumb because a lot spill also occurred in your synthetic case (where repartition-based fallback is slower).

Any thoughts?

@revans2
Copy link
Collaborator

revans2 commented Jul 15, 2024

Any thoughts?

I think we need to do some profiling of cases when the partition based code is worse than the sort based code to understand what is happening. Ideally we get it down to something like a micro-benchmark so we can better isolate it when doing profiling. I have a few ideas about what it could be, but this is just speculation.

  1. Sort of a single numeric field can be very fast. It might be fast enough to beat the partitioning code for the same path.
  2. The partitioning code might have a bug in it where it ends up doing extra work, or there are some kernels that are not as optimized as the sort case.
  3. Spilling/repartitioning/sorting has a high enough run to run variance that we see it lose some of the time but overall it is a win.

@binmahone If you get some profiling info I am happy to look into it with you.

@binmahone
Copy link
Collaborator Author

binmahone commented Jul 26, 2024

Per our offline discussion with @revans2 and @jlowe

Even though current repartition-based fallback has already showcased a significant win over sort-based in our customer query, we need to : 1. further compare repartition-based vs. sort-based on NDS, and check in what situation sort-based will surpass repartition-based (i.e. regression), and if the regression is acceptable. 2. try some more radical improvement for repartition-based, e.g. skip the first pass of aggregation entirely.

With above done, we may able to rip out the sort-based code entirely, and check in this PR.

Suggest to move this PR from 2408 to 2410 to allow above items being done. @GaryShen2008

@sameerz sameerz added the performance A performance related task/issue label Jul 29, 2024
@sameerz
Copy link
Collaborator

sameerz commented Jul 29, 2024

Please retarget to 24.10

@binmahone
Copy link
Collaborator Author

Please retarget to 24.10

got. Meanwhile I'm still refactoring this PR to see if there's more potentials

@binmahone binmahone changed the base branch from branch-24.08 to branch-24.10 August 6, 2024 05:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate
5 participants