Skip to content

Commit

Permalink
planner: enforce push mpp down (pingcap#24849)
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall committed Jul 20, 2021
1 parent 0192259 commit bcf9061
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 16 deletions.
2 changes: 1 addition & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
if !ctx.GetSessionVars().IsMPPAllowed() {
return false
}
_, ok := tr.GetTablePlan().(*plannercore.PhysicalExchangeSender)
Expand Down
10 changes: 5 additions & 5 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
}
joins := make([]PhysicalPlan, 0, 8)
canPushToTiFlash := p.canPushToCop(kv.TiFlash)
if p.ctx.GetSessionVars().AllowMPPExecution && canPushToTiFlash {
if p.ctx.GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
if p.shouldUseMPPBCJ() {
mppJoins := p.tryToGetMppHashJoin(prop, true)
if (p.preferJoinType & preferBCJoin) > 0 {
Expand Down Expand Up @@ -2029,7 +2029,7 @@ func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPl
if !lt.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if lt.ctx.GetSessionVars().AllowMPPExecution {
if lt.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2449,7 +2449,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP()
canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down Expand Up @@ -2562,7 +2562,7 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
if !p.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().AllowMPPExecution {
if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
Expand Down Expand Up @@ -2600,7 +2600,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType {
return nil, true
}
canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCopImpl(kv.TiFlash, true)
canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCop(kv.TiFlash)
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
if canUseMpp && prop.TaskTp == property.MppTaskType {
Expand Down
127 changes: 127 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3551,3 +3551,130 @@ func (s *testIntegrationSuite) TestIssue23839(c *C) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin AUTO_INCREMENT=2000001")
tk.Exec("explain SELECT OUTR . col2 AS X FROM (SELECT INNR . col1 as col1, SUM( INNR . col2 ) as col2 FROM (SELECT INNR . `col_int_not_null` + 1 as col1, INNR . `pk` as col2 FROM BB AS INNR) AS INNR GROUP BY col1) AS OUTR2 INNER JOIN (SELECT INNR . col1 as col1, MAX( INNR . col2 ) as col2 FROM (SELECT INNR . `col_int_not_null` + 1 as col1, INNR . `pk` as col2 FROM BB AS INNR) AS INNR GROUP BY col1) AS OUTR ON OUTR2.col1 = OUTR.col1 GROUP BY OUTR . col1, OUTR2 . col1 HAVING X <> 'b'")
}


func (s *testIntegrationSuite) TestEnforceMPP(c *C) {
tk := testkit.NewTestKit(c, s.store)

// test value limit of tidb_opt_tiflash_concurrency_factor
err := tk.ExecToErr("set @@tidb_opt_tiflash_concurrency_factor = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_opt_tiflash_concurrency_factor' can't be set to the value of '0'`)

tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 1")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("1"))
tk.MustExec("set @@tidb_opt_tiflash_concurrency_factor = 24")
tk.MustQuery("select @@tidb_opt_tiflash_concurrency_factor").Check(testkit.Rows("24"))

// test set tidb_allow_mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@session.tidb_allow_mpp = off")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@session.tidb_allow_mpp = oN")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))
tk.MustExec("set @@session.tidb_allow_mpp = enForcE")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

tk.MustExec("set @@global.tidb_allow_mpp = faLsE")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("OFF"))
tk.MustExec("set @@global.tidb_allow_mpp = True")
tk.MustQuery("select @@global.tidb_allow_mpp").Check(testkit.Rows("ON"))

err = tk.ExecToErr("set @@global.tidb_allow_mpp = enforceWithTypo")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, `[variable:1231]Variable 'tidb_allow_mpp' can't be set to the value of 'enforceWithTypo'`)

// test query
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("create index idx on t(a)")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// ban mpp
tk.MustExec("set @@session.tidb_allow_mpp = 0")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("OFF"))

// read from tiflash, batch cop.
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"StreamAgg_20 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_21 1.00 root data:StreamAgg_9",
" └─StreamAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_19 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_18 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// open mpp
tk.MustExec("set @@session.tidb_allow_mpp = 1")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ON"))

// should use tikv to index read
tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_30 1.00 root funcs:count(Column#6)->Column#3",
"└─IndexReader_31 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#6",
" └─IndexRangeScan_29 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash, mpp with large cost
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// enforce mpp
tk.MustExec("set @@session.tidb_allow_mpp = 2")
tk.MustQuery("select @@session.tidb_allow_mpp").Check(testkit.Rows("ENFORCE"))

// should use mpp
tk.MustQuery("explain select count(*) from t where a=1;").Check(testkit.Rows(
"HashAgg_24 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_26 1.00 root data:ExchangeSender_25",
" └─ExchangeSender_25 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_23 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_22 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))

// read from tikv, indexRead
tk.MustQuery("explain select /*+ read_from_storage(tikv[t]) */ count(*) from t where a=1;").Check(testkit.Rows(
"StreamAgg_18 1.00 root funcs:count(Column#5)->Column#3",
"└─IndexReader_19 1.00 root index:StreamAgg_10",
" └─StreamAgg_10 1.00 cop[tikv] funcs:count(1)->Column#5",
" └─IndexRangeScan_17 10.00 cop[tikv] table:t, index:idx(a) range:[1,1], keep order:false, stats:pseudo"))

// read from tiflash
tk.MustQuery("explain select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))
}
6 changes: 4 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2068,8 +2068,10 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()

cst := t.cst + t.count()*ctx.GetSessionVars().NetworkFactor
cst = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
cst := t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
if p.ctx.GetSessionVars().IsMPPEnforced() {
cst = 0
}
rt := &rootTask{
p: p,
cst: cst,
Expand Down
21 changes: 16 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,12 @@ type SessionVars struct {
AllowWriteRowID bool

// AllowBatchCop means if we should send batch coprocessor to TiFlash. Default value is 1, means to use batch cop in case of aggregation and join.
// If value is set to 2 , which means to force to send batch cop for any query. Value is set to 0 means never use batch cop.
// Value set to 2 means to force to send batch cop for any query. Value set to 0 means never use batch cop.
AllowBatchCop int

// AllowMPPExecution will prefer using mpp way to execute a query.
AllowMPPExecution bool
// AllowMPPExecution means if we should use mpp way to execute query. Default value is "ON", means to be determined by the optimizer.
// Value set to "ENFORCE" means to use mpp whenever possible. Value set to "OFF" means never use mpp.
allowMPPExecution string

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool
Expand Down Expand Up @@ -850,6 +851,16 @@ func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 {
return 1
}

// IsMPPAllowed returns whether mpp execution is allowed.
func (s *SessionVars) IsMPPAllowed() bool {
return s.allowMPPExecution != "OFF"
}

// IsMPPEnforced returns whether mpp execution is enforced.
func (s *SessionVars) IsMPPEnforced() bool {
return s.allowMPPExecution == "ENFORCE"
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
func (s *SessionVars) CheckAndGetTxnScope() string {
if s.InRestrictedSQL {
Expand Down Expand Up @@ -1077,7 +1088,7 @@ func NewSessionVars() *SessionVars {
terror.Log(vars.SetSystemVar(TiDBEnableStreaming, enableStreaming))

vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.AllowMPPExecution = DefTiDBAllowMPPExecution
vars.allowMPPExecution = DefTiDBAllowMPPExecution

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down Expand Up @@ -1501,7 +1512,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case TiDBAllowBatchCop:
s.AllowBatchCop = int(tidbOptInt64(val, DefTiDBAllowBatchCop))
case TiDBAllowMPPExecution:
s.AllowMPPExecution = TiDBOptOn(val)
s.allowMPPExecution = val
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ var defaultSysVars = []*SysVar{
return oracle.LocalTxnScope
}()},
/* TiDB specific variables */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "ENFORCE"}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdSize, Value: strconv.Itoa(DefBroadcastJoinThresholdSize), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeSession, Name: TiDBSnapshot, Value: ""},
Expand All @@ -607,7 +607,7 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationThreshold, Value: strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: 1},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCorrelationExpFactor, Value: strconv.Itoa(DefOptCorrelationExpFactor), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCPUFactor, Value: strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptTiFlashConcurrencyFactor, Value: strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 1, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptCopCPUFactor, Value: strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptNetworkFactor, Value: strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptScanFactor, Value: strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: math.MaxUint64},
Expand Down
4 changes: 3 additions & 1 deletion sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ const (
// The default value is 0
TiDBAllowBatchCop = "tidb_allow_batch_cop"

// TiDBAllowMPPExecution means if we should use mpp way to execute query. Default value is 1 (or 'ON'), means to be determined by the optimizer.
// Value set to 2 (or 'ENFORCE') which means to use mpp whenever possible. Value set to 2 (or 'OFF') means never use mpp.
TiDBAllowMPPExecution = "tidb_allow_mpp"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
Expand Down Expand Up @@ -622,7 +624,7 @@ const (
DefBroadcastJoinThresholdCount = 10 * 1024
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBAllowBatchCop = 1
DefTiDBAllowMPPExecution = true
DefTiDBAllowMPPExecution = "ON"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down

0 comments on commit bcf9061

Please sign in to comment.