Skip to content

Commit

Permalink
[fix](Nereids) ban right outer, right anti, full outer with bucket sh…
Browse files Browse the repository at this point in the history
…uffle (apache#26529) (apache#26702)

pick from master
PR: apache#26529
commit id: f80495d

if left bucket has no data, we do not generate left bucket instance.
These join should reserve all right side data. But because left instance
is not exists. So right data will be discard since no dest be set.

We ban these join temporarily until we could generate all instance
for left side in Coordinator.
  • Loading branch information
morrySnow authored and gnehil committed Dec 4, 2023
1 parent 0deb347 commit 5bb4627
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
Expand Down Expand Up @@ -169,6 +170,12 @@ public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, Void c
return true;
}

private boolean couldNotRightBucketShuffleJoin(JoinType joinType) {
return joinType == JoinType.RIGHT_ANTI_JOIN
|| joinType == JoinType.RIGHT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN;
}

@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
Expand Down Expand Up @@ -198,12 +205,22 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
Optional<PhysicalProperties> updatedForRight = Optional.empty();

if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
// check colocate join with scan
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
return true;
}
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) {
// right anti, right outer, full outer join could not do bucket shuffle join
// TODO remove this after we refactor coordinator
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ PhysicalResultSink
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[RIGHT_OUTER_JOIN](catalog_returns.cr_item_sk = catalog_sales.cs_item_sk)(catalog_returns.cr_order_number = catalog_sales.cs_order_number)
------------------PhysicalProject
--------------------PhysicalOlapScan[catalog_returns]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------PhysicalOlapScan[catalog_returns]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[LEFT_OUTER_JOIN](catalog_sales.cs_promo_sk = promotion.p_promo_sk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ PhysicalResultSink
--------------------------hashAgg[LOCAL]
----------------------------PhysicalProject
------------------------------hashJoin[RIGHT_OUTER_JOIN](store_sales.ss_item_sk = store_returns.sr_item_sk)(store_sales.ss_ticket_number = store_returns.sr_ticket_number)
--------------------------------PhysicalOlapScan[store_returns]
--------------------------------PhysicalDistribute
----------------------------------PhysicalOlapScan[store_returns]
--------------------------------PhysicalDistribute
----------------------------------hashJoin[INNER_JOIN](store_sales.ss_store_sk = store.s_store_sk)
------------------------------------hashJoin[INNER_JOIN](store_sales.ss_item_sk = item.i_item_sk)
Expand All @@ -42,7 +43,8 @@ PhysicalResultSink
----------------------------PhysicalProject
------------------------------hashJoin[INNER_JOIN](catalog_sales.cs_catalog_page_sk = catalog_page.cp_catalog_page_sk)
--------------------------------hashJoin[RIGHT_OUTER_JOIN](catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)(catalog_sales.cs_order_number = catalog_returns.cr_order_number)
----------------------------------PhysicalOlapScan[catalog_returns]
----------------------------------PhysicalDistribute
------------------------------------PhysicalOlapScan[catalog_returns]
----------------------------------PhysicalDistribute
------------------------------------hashJoin[INNER_JOIN](catalog_sales.cs_item_sk = item.i_item_sk)
--------------------------------------PhysicalDistribute
Expand All @@ -66,7 +68,8 @@ PhysicalResultSink
--------------------------hashAgg[LOCAL]
----------------------------PhysicalProject
------------------------------hashJoin[RIGHT_OUTER_JOIN](web_sales.ws_item_sk = web_returns.wr_item_sk)(web_sales.ws_order_number = web_returns.wr_order_number)
--------------------------------PhysicalOlapScan[web_returns]
--------------------------------PhysicalDistribute
----------------------------------PhysicalOlapScan[web_returns]
--------------------------------PhysicalDistribute
----------------------------------hashJoin[INNER_JOIN](web_sales.ws_web_site_sk = web_site.web_site_sk)
------------------------------------hashJoin[INNER_JOIN](web_sales.ws_item_sk = item.i_item_sk)
Expand Down
26 changes: 26 additions & 0 deletions regression-test/suites/nereids_p0/join/test_outer_join.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ suite("test_outer_join", "nereids_p0") {
sql "SET enable_fallback_to_original_planner=false"
def tbl1 = "test_outer_join1"
def tbl2 = "test_outer_join2"
def tbl3 = "test_outer_join3"

sql "DROP TABLE IF EXISTS ${tbl1}"
sql """
Expand All @@ -37,6 +38,15 @@ suite("test_outer_join", "nereids_p0") {
DISTRIBUTED BY RANDOM BUCKETS 30
PROPERTIES ("replication_num" = "1");
"""

sql "DROP TABLE IF EXISTS ${tbl3}"
sql """
CREATE TABLE IF NOT EXISTS ${tbl3} (
c0 DECIMALV3(8,3)
)
DISTRIBUTED BY HASH (c0) BUCKETS 1 PROPERTIES ("replication_num" = "1");
"""

sql """INSERT INTO ${tbl2} (c0) VALUES ('dr'), ('x7Tq'), ('');"""
sql """INSERT INTO ${tbl1} (c0) VALUES (0.47683432698249817), (0.8864791393280029);"""
sql """INSERT INTO ${tbl1} (c0) VALUES (0.11287713050842285);"""
Expand All @@ -56,6 +66,22 @@ suite("test_outer_join", "nereids_p0") {
qt_join """
SELECT * FROM ${tbl2} LEFT OUTER JOIN ${tbl1} ON (('') like ('15DScmSM')) WHERE ('abc' NOT LIKE 'abc');
"""

sql "set disable_join_reorder=true"
explain {
sql "SELECT * FROM ${tbl1} RIGHT OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "RIGHT OUTER JOIN(PARTITIONED)"
}
explain {
sql "SELECT * FROM ${tbl1} RIGHT ANTI JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "RIGHT ANTI JOIN(PARTITIONED)"
}
explain {
sql "SELECT * FROM ${tbl1} FULL OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "FULL OUTER JOIN(PARTITIONED)"
}

sql "DROP TABLE IF EXISTS ${tbl1}"
sql "DROP TABLE IF EXISTS ${tbl2}"
sql "DROP TABLE IF EXISTS ${tbl3}"
}

0 comments on commit 5bb4627

Please sign in to comment.