Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible native shuffle optimization #977

Open
andygrove opened this issue Sep 27, 2024 · 2 comments
Open

Possible native shuffle optimization #977

andygrove opened this issue Sep 27, 2024 · 2 comments
Assignees
Labels
enhancement New feature or request performance

Comments

@andygrove
Copy link
Member

andygrove commented Sep 27, 2024

What is the problem the feature request solves?

I noticed that we execute each query stage with two separate native plans.

For example, here is the first query stage for TPC-H q1:

+- CometExchange: Hash partitioning on [l_returnflag, l_linestatus]
   +- CometHashAggregate (Partial): keys = [l_returnflag, l_linestatus]
      +- CometProject
         +- CometFilter: l_shipdate NOT NULL AND l_shipdate <= 1998-09-24
            +- CometScan: lineitem.parquet

We execute one plan for the aggregate:

AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
  ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
    FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
      ScanExec: source=[CometScan parquet  (unknown)]

We then stream those results back into the JVM and then stream them back out to the following native plan to perform the shuffle write:

ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
  ScanExec: source=[], schema=[col_0: Utf8, col_1: Utf8, ..]

Would it be possible to combine these so that we just have the following plan? This would avoid a lot of JNI back and forth between the aggregate and the shuffle write.

ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, Column { name: "col_1", index: 1 }], 200)
  AggregateExec: mode=Partial, gby=[col_4@4 as col_0, col_5@5 as col_1], aggr=[sum, sum, sum, sum, avg, avg, avg, count]
    ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5]
      FilterExec: col_6@6 IS NOT NULL AND col_6@6 <= 1998-09-24
        ScanExec: source=[CometScan parquet  (unknown)]

Describe the potential solution

No response

Additional context

No response

@andygrove andygrove added enhancement New feature or request performance labels Sep 27, 2024
@andygrove andygrove changed the title Possible shuffle optimization Possible native shuffle optimization Sep 27, 2024
@andygrove andygrove added this to the 0.4.0 milestone Sep 27, 2024
@viirya
Copy link
Member

viirya commented Sep 27, 2024

Shuffle is not a SQL operator like Project or Filter in Spark. It is not specified to Spark SQL but also a fundamental block in Spark distributed execution model. Spark has its designed mechanisms for shuffle. It is out of the range of Spark SQL. That's said, it is no way to simply add other SQL operator as a child node to ShuffleWriterExec and expect they are executed as a stream.

@andygrove
Copy link
Member Author

andygrove commented Dec 1, 2024

I have been learning more about Spark shuffle and now understand why this issue does not make sense.

edit: I thought I understood this, but now I am not so sure, so will take another look and at least document why we can't do this before closing this issue

@andygrove andygrove reopened this Dec 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Projects
None yet
Development

No branches or pull requests

2 participants