Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](nereids)adjust distribution cost for better choice of broadcast join and shuffle join #27113

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Cost visitPhysicalDistribute(
}

// any
// cost of randome shuffle is lower than hash shuffle.
// cost of random shuffle is lower than hash shuffle.
return CostV1.of(context.getSessionVariable(),
0,
0,
Expand Down Expand Up @@ -290,8 +290,13 @@ public Cost visitPhysicalHashJoin(
int parallelInstance = Math.max(1, context.getSessionVariable().getParallelExecInstanceNum());
int totalInstanceNumber = parallelInstance * beNumber;
if (buildSideFactor <= 1.0) {
// use totalInstanceNumber to the power of 2 as the default factor value
buildSideFactor = Math.pow(totalInstanceNumber, 0.5);
if (buildStats.computeSize() < 1024 * 1024) {
// no penalty to broadcast if build side is small
buildSideFactor = 1.0;
} else {
// use totalInstanceNumber to the power of 2 as the default factor value
buildSideFactor = Math.pow(totalInstanceNumber, 0.5);
}
}
return CostV1.of(context.getSessionVariable(),
leftRowCount + rightRowCount * buildSideFactor + outputRowCount * probeSideFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public double computeSize() {
}

public double dataSizeFactor() {
return computeTupleSize() / K_BYTES;
double lowerBound = 0.03;
double upperBound = 0.07;
return Math.min(Math.max(computeTupleSize() / K_BYTES, lowerBound), upperBound);
}

@Override
Expand Down
15 changes: 7 additions & 8 deletions regression-test/data/nereids_hint_tpch_p0/shape/q16.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ PhysicalResultSink
----------PhysicalDistribute
------------hashAgg[LOCAL]
--------------hashJoin[LEFT_ANTI_JOIN] hashCondition=((partsupp.ps_suppkey = supplier.s_suppkey)) otherCondition=()
----------------PhysicalDistribute
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((part.p_partkey = partsupp.ps_partkey)) otherCondition=() build RFs:RF0 p_partkey->[ps_partkey]
----------------------PhysicalProject
------------------------PhysicalOlapScan[partsupp] apply RFs: RF0
----------------------PhysicalProject
------------------------filter(( not (p_brand = 'Brand#45')) and ( not (p_type like 'MEDIUM POLISHED%')) and p_size IN (14, 19, 23, 3, 36, 45, 49, 9))
--------------------------PhysicalOlapScan[part]
----------------PhysicalProject
------------------hashJoin[INNER_JOIN] hashCondition=((part.p_partkey = partsupp.ps_partkey)) otherCondition=() build RFs:RF0 p_partkey->[ps_partkey]
--------------------PhysicalProject
----------------------PhysicalOlapScan[partsupp] apply RFs: RF0
--------------------PhysicalProject
----------------------filter(( not (p_brand = 'Brand#45')) and ( not (p_type like 'MEDIUM POLISHED%')) and p_size IN (14, 19, 23, 3, 36, 45, 49, 9))
------------------------PhysicalOlapScan[part]
----------------PhysicalDistribute
------------------PhysicalProject
--------------------filter((s_comment like '%Customer%Complaints%'))
Expand Down
55 changes: 16 additions & 39 deletions regression-test/data/nereids_hint_tpch_p0/shape/q5.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,22 @@
-- !select --
PhysicalResultSink
--PhysicalQuickSort[MERGE_SORT]
----PhysicalDistribute
------PhysicalQuickSort[LOCAL_SORT]
--------hashAgg[GLOBAL]
----------PhysicalDistribute
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((customer.c_custkey = orders.o_custkey) and (customer.c_nationkey = supplier.s_nationkey)) otherCondition=() build RFs:RF4 c_nationkey->[s_nationkey];RF5 c_custkey->[o_custkey]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((lineitem.l_orderkey = orders.o_orderkey)) otherCondition=() build RFs:RF3 o_orderkey->[l_orderkey]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------hashJoin[INNER_JOIN] hashCondition=((lineitem.l_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF2 s_suppkey->[l_suppkey]
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[lineitem] apply RFs: RF2 RF3
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((supplier.s_nationkey = nation.n_nationkey)) otherCondition=() build RFs:RF1 n_nationkey->[s_nationkey]
------------------------------------PhysicalDistribute
--------------------------------------PhysicalProject
----------------------------------------PhysicalOlapScan[supplier] apply RFs: RF1 RF4
------------------------------------PhysicalDistribute
--------------------------------------PhysicalProject
----------------------------------------hashJoin[INNER_JOIN] hashCondition=((nation.n_regionkey = region.r_regionkey)) otherCondition=() build RFs:RF0 r_regionkey->[n_regionkey]
------------------------------------------PhysicalDistribute
--------------------------------------------PhysicalProject
----------------------------------------------PhysicalOlapScan[nation] apply RFs: RF0
------------------------------------------PhysicalDistribute
--------------------------------------------PhysicalProject
----------------------------------------------filter((region.r_name = 'ASIA'))
------------------------------------------------PhysicalOlapScan[region]
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((orders.o_orderdate < '1995-01-01') and (orders.o_orderdate >= '1994-01-01'))
------------------------------PhysicalOlapScan[orders] apply RFs: RF5
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------PhysicalOlapScan[customer]
----PhysicalQuickSort[LOCAL_SORT]
------hashAgg[GLOBAL]
--------hashAgg[LOCAL]
----------hashJoin[INNER_JOIN] hashCondition=((customer.c_custkey = orders.o_custkey) and (customer.c_nationkey = supplier.s_nationkey)) otherCondition=() build RFs:RF4 c_nationkey->[s_nationkey];RF5 c_custkey->[o_custkey]
------------hashJoin[INNER_JOIN] hashCondition=((lineitem.l_orderkey = orders.o_orderkey)) otherCondition=() build RFs:RF3 o_orderkey->[l_orderkey]
--------------hashJoin[INNER_JOIN] hashCondition=((lineitem.l_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF2 s_suppkey->[l_suppkey]
----------------PhysicalOlapScan[lineitem] apply RFs: RF2 RF3
----------------hashJoin[INNER_JOIN] hashCondition=((supplier.s_nationkey = nation.n_nationkey)) otherCondition=() build RFs:RF1 n_nationkey->[s_nationkey]
------------------PhysicalOlapScan[supplier] apply RFs: RF1 RF4
------------------hashJoin[INNER_JOIN] hashCondition=((nation.n_regionkey = region.r_regionkey)) otherCondition=() build RFs:RF0 r_regionkey->[n_regionkey]
--------------------PhysicalOlapScan[nation] apply RFs: RF0
--------------------filter((region.r_name = 'ASIA'))
----------------------PhysicalOlapScan[region]
--------------filter((orders.o_orderdate < '1995-01-01') and (orders.o_orderdate >= '1994-01-01'))
----------------PhysicalOlapScan[orders] apply RFs: RF5
------------PhysicalOlapScan[customer]

Used: leading(lineitem { supplier { nation region } } orders customer)
UnUsed:
Expand Down
5 changes: 2 additions & 3 deletions regression-test/data/nereids_hint_tpch_p0/shape/q9.out
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ PhysicalResultSink
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((part.p_partkey = lineitem.l_partkey)) otherCondition=() build RFs:RF1 p_partkey->[l_partkey]
------------------------------------PhysicalDistribute
--------------------------------------PhysicalProject
----------------------------------------PhysicalOlapScan[lineitem] apply RFs: RF1 RF2 RF4 RF5
------------------------------------PhysicalProject
--------------------------------------PhysicalOlapScan[lineitem] apply RFs: RF1 RF2 RF4 RF5
------------------------------------PhysicalDistribute
--------------------------------------PhysicalProject
----------------------------------------filter((p_name like '%green%'))
Expand Down
37 changes: 37 additions & 0 deletions regression-test/data/nereids_shape_check/load.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !bc1 --
PhysicalResultSink
--PhysicalDistribute
----PhysicalProject
------hashJoin[INNER_JOIN] hashCondition=((t1.code = t2.ACCEPT_ORG_CODE)) otherCondition=() build RFs:RF0 code->[ACCEPT_ORG_CODE]
--------PhysicalOlapScan[t2] apply RFs: RF0
--------PhysicalDistribute
----------PhysicalOlapScan[t1]

-- !bc2 --
PhysicalResultSink
--PhysicalDistribute
----PhysicalProject
------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.code = t2.ACCEPT_ORG_CODE)) otherCondition=()
--------PhysicalOlapScan[t2]
--------PhysicalDistribute
----------PhysicalOlapScan[t1]

-- !bc3 --
PhysicalResultSink
--PhysicalDistribute
----PhysicalProject
------hashJoin[INNER_JOIN] hashCondition=((t1.code = t2.ACCEPT_ORG_CODE)) otherCondition=() build RFs:RF0 code->[ACCEPT_ORG_CODE]
--------PhysicalOlapScan[t2] apply RFs: RF0
--------PhysicalDistribute
----------PhysicalOlapScan[t1]

-- !bc4 --
PhysicalResultSink
--PhysicalDistribute
----PhysicalProject
------hashJoin[LEFT_OUTER_JOIN] hashCondition=((t1.code = t2.ACCEPT_ORG_CODE)) otherCondition=()
--------PhysicalOlapScan[t2]
--------PhysicalDistribute
----------PhysicalOlapScan[t1]

17 changes: 17 additions & 0 deletions regression-test/data/nereids_ssb_shape_sf100_p0/shape/flat.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
PhysicalResultSink
--PhysicalDistribute
----PhysicalProject
------hashJoin[INNER_JOIN] hashCondition=((s.s_suppkey = l.lo_suppkey)) otherCondition=() build RFs:RF2 s_suppkey->[lo_suppkey]
--------PhysicalProject
----------hashJoin[INNER_JOIN] hashCondition=((c.c_custkey = l.lo_custkey)) otherCondition=() build RFs:RF1 c_custkey->[lo_custkey]
------------hashJoin[INNER_JOIN] hashCondition=((p.p_partkey = l.lo_partkey)) otherCondition=() build RFs:RF0 p_partkey->[lo_partkey]
--------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
--------------PhysicalDistribute
----------------PhysicalOlapScan[part]
------------PhysicalDistribute
--------------PhysicalOlapScan[customer]
--------PhysicalDistribute
----------PhysicalOlapScan[supplier]

24 changes: 11 additions & 13 deletions regression-test/data/nereids_ssb_shape_sf100_p0/shape/q2.2.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ PhysicalResultSink
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_orderdate = dates.d_datekey)) otherCondition=() build RFs:RF2 d_datekey->[lo_orderdate]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF1 s_suppkey->[lo_suppkey]
------------------------PhysicalDistribute
--------------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_partkey = part.p_partkey)) otherCondition=() build RFs:RF0 p_partkey->[lo_partkey]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter((part.p_brand <= 'MFGR#2228') and (part.p_brand >= 'MFGR#2221'))
----------------------------------PhysicalOlapScan[part]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF1 s_suppkey->[lo_suppkey]
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_partkey = part.p_partkey)) otherCondition=() build RFs:RF0 p_partkey->[lo_partkey]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((supplier.s_region = 'ASIA'))
------------------------------PhysicalOlapScan[supplier]
----------------------------filter((part.p_brand <= 'MFGR#2228') and (part.p_brand >= 'MFGR#2221'))
------------------------------PhysicalOlapScan[part]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------filter((supplier.s_region = 'ASIA'))
----------------------------PhysicalOlapScan[supplier]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------PhysicalOlapScan[dates]
Expand Down
24 changes: 11 additions & 13 deletions regression-test/data/nereids_ssb_shape_sf100_p0/shape/q2.3.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ PhysicalResultSink
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_orderdate = dates.d_datekey)) otherCondition=() build RFs:RF2 d_datekey->[lo_orderdate]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF1 s_suppkey->[lo_suppkey]
------------------------PhysicalDistribute
--------------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_partkey = part.p_partkey)) otherCondition=() build RFs:RF0 p_partkey->[lo_partkey]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter((part.p_brand = 'MFGR#2239'))
----------------------------------PhysicalOlapScan[part]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF1 s_suppkey->[lo_suppkey]
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_partkey = part.p_partkey)) otherCondition=() build RFs:RF0 p_partkey->[lo_partkey]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((supplier.s_region = 'EUROPE'))
------------------------------PhysicalOlapScan[supplier]
----------------------------filter((part.p_brand = 'MFGR#2239'))
------------------------------PhysicalOlapScan[part]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------filter((supplier.s_region = 'EUROPE'))
----------------------------PhysicalOlapScan[supplier]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------PhysicalOlapScan[dates]
Expand Down
24 changes: 11 additions & 13 deletions regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.3.out
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ PhysicalResultSink
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_orderdate = dates.d_datekey)) otherCondition=() build RFs:RF2 d_datekey->[lo_orderdate]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_custkey = customer.c_custkey)) otherCondition=() build RFs:RF1 c_custkey->[lo_custkey]
------------------------PhysicalDistribute
--------------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF0 s_suppkey->[lo_suppkey]
----------------------------PhysicalProject
------------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
----------------------------PhysicalDistribute
------------------------------PhysicalProject
--------------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5'))
----------------------------------PhysicalOlapScan[supplier]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_custkey = customer.c_custkey)) otherCondition=() build RFs:RF1 c_custkey->[lo_custkey]
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF0 s_suppkey->[lo_suppkey]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter(c_city IN ('UNITED KI1', 'UNITED KI5'))
------------------------------PhysicalOlapScan[customer]
----------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5'))
------------------------------PhysicalOlapScan[supplier]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------filter(c_city IN ('UNITED KI1', 'UNITED KI5'))
----------------------------PhysicalOlapScan[customer]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter((dates.d_year <= 1997) and (dates.d_year >= 1992))
Expand Down
29 changes: 14 additions & 15 deletions regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.4.out
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ PhysicalResultSink
----------PhysicalDistribute
------------hashAgg[LOCAL]
--------------PhysicalProject
----------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_custkey = customer.c_custkey)) otherCondition=() build RFs:RF2 c_custkey->[lo_custkey]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_orderdate = dates.d_datekey)) otherCondition=() build RFs:RF1 d_datekey->[lo_orderdate]
------------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF0 s_suppkey->[lo_suppkey]
--------------------------PhysicalProject
----------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
--------------------------PhysicalDistribute
----------------------------PhysicalProject
------------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5'))
--------------------------------PhysicalOlapScan[supplier]
----------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_orderdate = dates.d_datekey)) otherCondition=() build RFs:RF2 d_datekey->[lo_orderdate]
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_custkey = customer.c_custkey)) otherCondition=() build RFs:RF1 c_custkey->[lo_custkey]
----------------------hashJoin[INNER_JOIN] hashCondition=((lineorder.lo_suppkey = supplier.s_suppkey)) otherCondition=() build RFs:RF0 s_suppkey->[lo_suppkey]
------------------------PhysicalProject
--------------------------PhysicalOlapScan[lineorder] apply RFs: RF0 RF1 RF2
------------------------PhysicalDistribute
--------------------------PhysicalProject
----------------------------filter((dates.d_yearmonth = 'Dec1997'))
------------------------------PhysicalOlapScan[dates]
----------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5'))
------------------------------PhysicalOlapScan[supplier]
----------------------PhysicalDistribute
------------------------PhysicalProject
--------------------------filter(c_city IN ('UNITED KI1', 'UNITED KI5'))
----------------------------PhysicalOlapScan[customer]
------------------PhysicalDistribute
--------------------PhysicalProject
----------------------filter(c_city IN ('UNITED KI1', 'UNITED KI5'))
------------------------PhysicalOlapScan[customer]
----------------------filter((dates.d_yearmonth = 'Dec1997'))
englefly marked this conversation as resolved.
Show resolved Hide resolved
------------------------PhysicalOlapScan[dates]

Loading
Loading