Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix colocated join when no mapping project is used #13666

Merged

Conversation

gortiz
Copy link
Contributor

@gortiz gortiz commented Jul 19, 2024

This PR fixes an issue related to colocated join.

For example, in ColocatedJoinEngineQuickStart, the following query should be executed in colocated fashion given:

  1. We are specifying the correct tableOptions
  2. We are joining by the partition key
EXPLAIN IMPLEMENTATION PLAN FOR
SELECT a.mySum, b.totalTrips
FROM (select userUUID, totalTrips + daysSinceFirstTrip as mySum
      FROM userAttributes /*+ tableOptions(partition_key='userUUID', partition_size='4') */) as a
JOIN userAttributes /*+ tableOptions(partition_key='userUUID', partition_size='4') */ as b
ON a.userUUID = b.userUUID

But the actual plan is:

[0]@192.168.1.42:46431|[0] MAIL_RECEIVE(BROADCAST_DISTRIBUTED)
├── [1]@192.168.1.42:37333|[3] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:42455|[0] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
├── [1]@192.168.1.42:37933|[2] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]} (Subtree Omitted)
└── [1]@192.168.1.42:43961|[1] MAIL_SEND(BROADCAST_DISTRIBUTED)->{[0]@192.168.1.42:46431|[0]}
    └── [1]@192.168.1.42:43961|[1] PROJECT
        └── [1]@192.168.1.42:43961|[1] JOIN
            ├── [1]@192.168.1.42:43961|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)
            │   ├── [2]@192.168.1.42:37333|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
            │   ├── [2]@192.168.1.42:37333|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
            │   ├── [2]@192.168.1.42:37933|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
            │   └── [2]@192.168.1.42:37933|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
            │       └── [2]@192.168.1.42:37933|[3] PROJECT
            │           └── [2]@192.168.1.42:37933|[3] TABLE SCAN (userAttributes) null
            └── [1]@192.168.1.42:43961|[1] MAIL_RECEIVE(HASH_DISTRIBUTED)
                ├── [3]@192.168.1.42:37333|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
                ├── [3]@192.168.1.42:37333|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
                ├── [3]@192.168.1.42:37933|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]} (Subtree Omitted)
                └── [3]@192.168.1.42:37933|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@192.168.1.42:34523|[0],[1]@192.168.1.42:42163|[1],[1]@192.168.1.42:43717|[2],[1]@192.168.1.42:46477|[3]}
                    └── [3]@192.168.1.42:37933|[3] PROJECT
                        └── [3]@192.168.1.42:37933|[3] TABLE SCAN (userAttributes) null

Which is not colocated (see the first join input).

The reason is that PinotRelDistributionTraitRule.deriveDistribution is doing inputRelDistribution.apply(project.getMapping()) for projects. project.getMapping() returns null if the project is not a mapping, which is defined as _all inputs are references. In our case the project includes the projection key, but it also includes a call to + so it is not mapping.

What PinotRelDistributionTraitRule.deriveDistribution should be doing is similar to Calcites RelMdDistribution#L165 (in fact we should be using that class, but that would require some refactors) which is ask for a partial mapping instead of complete mapping.

I've tried that solution, but found what it looks to be a bug in Calcite. I've reported the issue in Calcite's dev mailist (see https://lists.apache.org/thread/qz18qxrfp5bqldnoln2tg4582g402zyv). But there is a simple solution that consist on creating a copy of the mapping itself and that is what I'm adding here (including a note to try to remove my hack once the issue or my lack of knowledge is solved).

@gortiz gortiz added bugfix performance multi-stage Related to the multi-stage query engine labels Jul 19, 2024
@gortiz gortiz requested a review from Jackie-Jiang July 19, 2024 14:06
@gortiz gortiz force-pushed the fix-colocated-join-with-no-mapping-project branch from 7e54702 to 316ae09 Compare July 19, 2024 14:07
@gortiz gortiz force-pushed the fix-colocated-join-with-no-mapping-project branch from 316ae09 to dae4f3d Compare July 19, 2024 14:07
Comment on lines +465 to +468
" │ ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[2]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:2|[3]} (Subtree Omitted)\n",
" │ ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[0]} (Subtree Omitted)\n",
" │ └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)[PARTITIONED]->{[1]@localhost:1|[1]}\n",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In master this is:

            │   ├── [2]@localhost:2|[2] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)
            │   ├── [2]@localhost:2|[3] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)
            │   ├── [2]@localhost:1|[0] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]} (Subtree Omitted)
            │   └── [2]@localhost:1|[1] MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:1|[1],[1]@localhost:2|[0]}

Comment on lines +481 to +482
"description": "explain plan with colocated join and a projection that doesn't keep the key column",
"sql": "EXPLAIN IMPLEMENTATION PLAN FOR SELECT a.mySum, b.col3 FROM (select col3 as col2, col3 + col2 as mySum from a /*+ tableOptions(partition_function='hashcode', partition_key='col2', partition_size='4') */) as a JOIN b /*+ tableOptions(partition_function='hashcode', partition_key='col1', partition_size='4') */ ON a.col2 = b.col1",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an extra test to verify that when projection is removing the partition key, colocated is not being used.

@gortiz gortiz requested review from xiangfu0 and yashmayya July 19, 2024 14:10
@codecov-commenter
Copy link

codecov-commenter commented Jul 19, 2024

Codecov Report

Attention: Patch coverage is 90.00000% with 1 line in your changes missing coverage. Please review.

Project coverage is 62.00%. Comparing base (59551e4) to head (ea21a5a).
Report is 806 commits behind head on master.

Files Patch % Lines
...lcite/rel/rules/PinotRelDistributionTraitRule.java 90.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13666      +/-   ##
============================================
+ Coverage     61.75%   62.00%   +0.25%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2554     +118     
  Lines        133233   140620    +7387     
  Branches      20636    21875    +1239     
============================================
+ Hits          82274    87195    +4921     
- Misses        44911    46795    +1884     
- Partials       6048     6630     +582     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.97% <90.00%> (+0.26%) ⬆️
java-21 61.89% <90.00%> (+0.26%) ⬆️
skip-bytebuffers-false 61.99% <90.00%> (+0.24%) ⬆️
skip-bytebuffers-true 61.86% <90.00%> (+34.14%) ⬆️
temurin 62.00% <90.00%> (+0.25%) ⬆️
unittests 62.00% <90.00%> (+0.25%) ⬆️
unittests1 46.44% <90.00%> (-0.45%) ⬇️
unittests2 27.78% <0.00%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this!

(In a separate PR) In order to have a way to work around similar bugs, can we enforce using/not using colocated join if query hint is explicitly provided?

@gortiz gortiz merged commit 6ad08a7 into apache:master Jul 30, 2024
19 of 20 checks passed
@gortiz gortiz deleted the fix-colocated-join-with-no-mapping-project branch July 30, 2024 09:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bugfix multi-stage Related to the multi-stage query engine performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants