Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: disable tidb_prefer_broadcast_join_by_exchange_data_size by default; set scale factor to optimize estimating broadcast join; #42915

Merged
merged 17 commits into from
Apr 12, 2023
Merged
56 changes: 56 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,62 @@ func TestMPPBCJModel(t *testing.T) {
}
}

func TestMPPPreferBCJ(t *testing.T) {
store := testkit.CreateMockStore(t, internal.WithMockTiFlash(3))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int)")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t2 (b int)")

tk.MustExec("insert into t1 values (1);")
tk.MustExec("insert into t2 values (1), (2), (3), (4), (5), (6), (7), (8);")

{
tk.MustExec("alter table t1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
{
tk.MustExec("alter table t2 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "t2")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
}
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;")
{
var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
planSuiteData := GetPlanSuiteData()
planSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
testdata.OnRecord(func() {
output[i].SQL = tt
})
if strings.HasPrefix(tt, "set") || strings.HasPrefix(tt, "insert") {
tk.MustExec(tt)
continue
}
testdata.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
})
res := tk.MustQuery(tt)
res.Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}
}

func TestMPPBCJModelOneTiFlash(t *testing.T) {
/*
if there are 1 mpp stores, planner should choose broadcast join if `tidb_prefer_broadcast_join_by_exchange_data_size` is ON
Expand Down
10 changes: 10 additions & 0 deletions planner/core/casetest/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@
"explain select * from t t1, t t2 where t1.a=t2.a"
]
},
{
"name": "TestMPPPreferBCJ",
"cases": [
"explain select * from t1, t2 where t1.a=t2.b",
"set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1",
"explain select * from t1, t2 where t1.a=t2.b",
"insert into t2 values (9); analyze table t2;",
"explain select * from t1, t2 where t1.a=t2.b"
]
},
{
"name": "TestMPPBCJModelOneTiFlash",
"cases": [
Expand Down
62 changes: 62 additions & 0 deletions planner/core/casetest/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,68 @@
}
]
},
{
"Name": "TestMPPPreferBCJ",
"Cases": [
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33",
"└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─Selection_17(Probe) 8.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
},
{
"SQL": "set @@session.tidb_prefer_broadcast_join_by_exchange_data_size=1",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_36 1.00 root MppVersion: 1, data:ExchangeSender_35",
"└─ExchangeSender_35 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_34 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t1.a, collate: binary]",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─ExchangeReceiver_19(Probe) 8.00 mpp[tiflash] ",
" └─ExchangeSender_18 8.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.t2.b, collate: binary]",
" └─Selection_17 8.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 8.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
},
{
"SQL": "insert into t2 values (9); analyze table t2;",
"Plan": null,
"Warn": null
},
{
"SQL": "explain select * from t1, t2 where t1.a=t2.b",
"Plan": [
"TableReader_34 1.00 root MppVersion: 1, data:ExchangeSender_33",
"└─ExchangeSender_33 1.00 mpp[tiflash] ExchangeType: PassThrough",
" └─HashJoin_32 1.00 mpp[tiflash] inner join, equal:[eq(test.t1.a, test.t2.b)]",
" ├─ExchangeReceiver_15(Build) 1.00 mpp[tiflash] ",
" │ └─ExchangeSender_14 1.00 mpp[tiflash] ExchangeType: Broadcast, Compression: FAST",
" │ └─Selection_13 1.00 mpp[tiflash] not(isnull(test.t1.a))",
" │ └─TableFullScan_12 1.00 mpp[tiflash] table:t1 pushed down filter:empty, keep order:false",
" └─Selection_17(Probe) 9.00 mpp[tiflash] not(isnull(test.t2.b))",
" └─TableFullScan_16 9.00 mpp[tiflash] table:t2 pushed down filter:empty, keep order:false"
],
"Warn": null
}
]
},
{
"Name": "TestMPPBCJModelOneTiFlash",
"Cases": [
Expand Down
13 changes: 9 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,23 +2129,28 @@ func calcHashExchangeSizeByChild(p1 Plan, p2 Plan, mppStoreCnt int) (float64, fl
return row1 + row2, 0, false
}

// The size of `Build` hash table when using broadcast join is `X`.
// The size of `Build` hash table when using shuffle join is `X / (mppStoreCnt)`.
// It will cost more time to search `Probe` data in hash table.
// Set a scale factor (`mppStoreCnt^*`) when estimating broadcast join in `isJoinFitMPPBCJ` and `isJoinChildFitMPPBCJ` (based on TPCH benchmark, it has been verified in Q9).

solotzg marked this conversation as resolved.
Show resolved Hide resolved
func isJoinFitMPPBCJ(p *LogicalJoin, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
if hasSizeBC && hasSizeHash {
return szBC <= szHash
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC <= rowHash
return rowBC*float64(mppStoreCnt) <= rowHash
}

func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.children[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)

if hasSizeBC && hasSizeHash {
return szBC <= szHash
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC <= rowHash
return rowBC*float64(mppStoreCnt) <= rowHash
}

// If we can use mpp broadcast join, that's our first choice.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ const (
DefTiDBProjectionConcurrency = ConcurrencyUnset
DefBroadcastJoinThresholdSize = 100 * 1024 * 1024
DefBroadcastJoinThresholdCount = 10 * 1024
DefPreferBCJByExchangeDataSize = true
DefPreferBCJByExchangeDataSize = false
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBOptimizerEnableNewOFGB = false
DefTiDBEnableOuterJoinReorder = true
Expand Down