Skip to content

Conversation

@maryannxue
Copy link
Contributor

What changes were proposed in this pull request?

This PR fixes an correctness issue while enabling AQE for SQL Cache. This issue was caused by AQE coalescing the top-level shuffle in the physical plan of InMemoryTableScan and wrongfully reported the output partitioning of that InMemoryTableScan as HashPartitioning as if it had not been coalesced. The caller query of that InMemoryTableScan in turn failed to align the partitions correctly and output incorrect join results.

The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage. This fix also guarantees that AQE enabled for SQL cache vs. disabled would always be a performance win, since AQE optimizations are applied to all non-top-level stages and meanwhile no extra shuffle would be introduced between the parent query and the cached relation (if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an extra shuffle would end up being added on top of the cached relation when the cache is used in a join query and the partition key matches the join key in order to avoid the correctness issue).

Why are the changes needed?

To fix correctness issue and to avoid potential AQE perf regressions in queries using SQL cache.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@maryannxue maryannxue changed the title SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with InMemoryTableScanExec [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with InMemoryTableScanExec Nov 10, 2023
@github-actions github-actions bot added the SQL label Nov 10, 2023
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @maryannxue . Could you re-trigger the CI, please?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned #43729 (comment) and #43729 (comment), we are going to wait for your PR across all branches (master/3.5/3.4). Please let us know when this PR is ready for review again, @maryannxue .

@HyukjinKwon
Copy link
Member

For some reasons, it failed to link the test. Test is : https://github.com/maryannxue/spark/actions/runs/6827551601/job/18569949036

.createWithDefault(false)

val ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS =
buildConf("spark.sql.adaptive.applyFinalStageShuffleOptimizations")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you is this useful even for non-table-cache queries?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucket table write ? but I think people would specify partition number explicitly if there is a shuffle on bucket column. I can't find other case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this can be a use case. People may want to fully control the partitioning of the final write stage, which can affect number of files.

@dongjoon-hyun
Copy link
Member

Also, cc @viirya and @sunchao too.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fix the compilation failure?

[error] (sql / Test / compileIncremental) Compilation failed
[error] Total time: 518 s (08:38), completed Nov 13, 2023, 5:03:40 PM
[error] running /home/runner/work/spark/spark/build/sbt -Phadoop-3 -Pspark-ganglia-lgpl -Pkinesis-asl -Pvolcano -Pdocker-integration-tests -Pkubernetes -Phive-thriftserver -Pyarn -Pconnect -Phive -Phadoop-cloud Test/package streaming-kinesis-asl-assembly/assembly connect/assembly ; received return code 1
Error: Process completed with exit code 16.

assert(ds.count() == 2)
}

test("SPARK-45592: Coaleasced shuffle read is not compatible with hash partitioning") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maryannxue This test seems to pass on master branch without your patch. May I ask why this PR change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b/c this bug has been fixed by the other PR, right?
This patch disables the last stage coalescing in SQL cache. So it won't cause perf regression. It would still pass if you reverted the other fix. But the modified tests in CachedTableSuite would verify the new behavior of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply changed this test to make it more robust in terms of reproducing the original bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

session, SQLConf.AUTO_BUCKETED_SCAN_ENABLED :: Nil)
SparkSession.getOrCloneSessionWithConfigsOff(session,
SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS ::
SQLConf.AUTO_BUCKETED_SCAN_ENABLED :: Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a question here is: do we want strictly better performance, or generally better performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding an extra shuffle can be very expensive, so I'm inclined to this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is guaranteed better performance compared to AQE in SQL cache disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @maryannxue .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

dongjoon-hyun pushed a commit that referenced this pull request Nov 14, 2023
…yTableScanExec

### What changes were proposed in this pull request?

This PR fixes an correctness issue while enabling AQE for SQL Cache. This issue was caused by AQE coalescing the top-level shuffle in the physical plan of InMemoryTableScan and wrongfully reported the output partitioning of that InMemoryTableScan as HashPartitioning as if it had not been coalesced. The caller query of that InMemoryTableScan in turn failed to align the partitions correctly and output incorrect join results.

The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage. This fix also guarantees that AQE enabled for SQL cache vs. disabled would always be a performance win, since AQE optimizations are applied to all non-top-level stages and meanwhile no extra shuffle would be introduced between the parent query and the cached relation (if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an extra shuffle would end up being added on top of the cached relation when the cache is used in a join query and the partition key matches the join key in order to avoid the correctness issue).

### Why are the changes needed?

To fix correctness issue and to avoid potential AQE perf regressions in queries using SQL cache.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43760 from maryannxue/spark-45592.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 128f552)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

Merged to master/3.5.

Could you make a backporting PR to branch-3.4, please, @maryannxue ? There is a test case conflict on branch-3.4.

@dongjoon-hyun
Copy link
Member

Gentle ping~, @maryannxue .

dongjoon-hyun pushed a commit that referenced this pull request Nov 15, 2023
…yTableScanExec

This PR fixes an correctness issue while enabling AQE for SQL Cache. This issue was caused by AQE coalescing the top-level shuffle in the physical plan of InMemoryTableScan and wrongfully reported the output partitioning of that InMemoryTableScan as HashPartitioning as if it had not been coalesced. The caller query of that InMemoryTableScan in turn failed to align the partitions correctly and output incorrect join results.

The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage. This fix also guarantees that AQE enabled for SQL cache vs. disabled would always be a performance win, since AQE optimizations are applied to all non-top-level stages and meanwhile no extra shuffle would be introduced between the parent query and the cached relation (if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an extra shuffle would end up being added on top of the cached relation when the cache is used in a join query and the partition key matches the join key in order to avoid the correctness issue).

To fix correctness issue and to avoid potential AQE perf regressions in queries using SQL cache.

No.

Added UTs.

No.

Closes #43760 from maryannxue/spark-45592.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 128f552)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

For the record, I landed at branch-3.4 after resolving conflicts.

cloud-fan pushed a commit that referenced this pull request Feb 7, 2024
### What changes were proposed in this pull request?

#43435 and #43760 are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan.

The current semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning`

([source code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411)):

when true, we enable AQE, but disable coalescing final stage (default)
when false, we disable AQE

But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible.  Thus we want to modify the semantic of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning

when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle
when false, we enable AQE, but disable coalescing final stage. (this is actually the `true` semantic of old behavior)
Also, to keep the default behavior unchanged, we might want to flip the default value of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to `false`

### Why are the changes needed?

To allow AQE coalesce final stage in SQL cached plan. Also make the semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` more reasonable.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Updated UTs.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45054 from liuzqt/SPARK-46995.

Authored-by: Ziqi Liu <ziqi.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…yTableScanExec

This PR fixes an correctness issue while enabling AQE for SQL Cache. This issue was caused by AQE coalescing the top-level shuffle in the physical plan of InMemoryTableScan and wrongfully reported the output partitioning of that InMemoryTableScan as HashPartitioning as if it had not been coalesced. The caller query of that InMemoryTableScan in turn failed to align the partitions correctly and output incorrect join results.

The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage. This fix also guarantees that AQE enabled for SQL cache vs. disabled would always be a performance win, since AQE optimizations are applied to all non-top-level stages and meanwhile no extra shuffle would be introduced between the parent query and the cached relation (if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an extra shuffle would end up being added on top of the cached relation when the cache is used in a join query and the partition key matches the join key in order to avoid the correctness issue).

To fix correctness issue and to avoid potential AQE perf regressions in queries using SQL cache.

No.

Added UTs.

No.

Closes apache#43760 from maryannxue/spark-45592.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 128f552)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@Tom-Newton
Copy link

The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage.

This PR seems to indicate that there will be a correctness bug if we allow coalescing in the final stage. Is this still true after #43435 which seems like its fixing the same issue? I have not noticed any correctness issues in this area but I have got a usecase where this PR causes a major performance regression.

I notice that #45054 brings back the option to enable coalescing in InMemoryTableScan for shuffles in the final stage. If I do this my performance problem is resolved but will I be at risk of the correctness bug again?

Details on my performance regression case

If you configure spark.sql.shuffle.partitions or spark.sql.adaptive.coalescePartitions.initialPartitionNum to a large number, we currently use 8192. The following code ends up using 8192 partitions and is really slow as a result.

      df = spark.createDataFrame(
          [
              {
                  "group": "group0",
                  "df1_column": 1,
              },
              {
                  "group": "group0",
                  "df1_column": 1,
              },
          ]
      )
      df = df.groupBy("group").agg(sf.max("df1_column"))
      df.cache()
      df.explain()
      df.show()

With this PR: 40 seconds
Without this PR: 2 seconds

Maybe its our mistake using spark.sql.adaptive.coalescePartitions.initialPartitionNum: 8192 but I don't really see any generic way to configure this appropriately. My strategy has just been make it bigger than we would ever need and rely on AQE to coalesce to something sensible, but my understanding could be lacking.

@cloud-fan
Copy link
Contributor

Does #45054 fix your perf issue?

@Tom-Newton
Copy link

Does #45054 fix your perf issue?

Assuming I set spark.sql.optimizer.canChangeCachedPlanOutputPartitioning=true, then yes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 20, 2024

There should be no correctness issues. canChangeCachedPlanOutputPartitioning is just for performance. Setting it to true usually leads to better performance, but may introduce extra shuffle for the the cache reading query as the cached plan has changed its output partitioning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants