Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: show accessed partition when explain mpp query over partition table #30367

Merged
merged 10 commits into from
Dec 7, 2021
31 changes: 31 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -1726,6 +1727,36 @@ func (s *partitionTableSuite) TestAddDropPartitions(c *C) {
tk.MustPartition(`select * from t where a < 20`, "p1,p2,p3").Sort().Check(testkit.Rows("12", "15", "7"))
}

func (s *partitionTableSuite) TestMPPQueryExplainInfo(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
}

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create database tiflash_partition_test")
tk.MustExec("use tiflash_partition_test")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")

tk.MustExec(`create table t(a int) partition by range(a) (
partition p0 values less than (5),
partition p1 values less than (10),
partition p2 values less than (15))`)
tb := testGetTableByName(c, tk.Se, "tiflash_partition_test", "t")
for _, partition := range tb.Meta().GetPartitionInfo().Definitions {
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, partition.ID, true)
c.Assert(err, IsNil)
}
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec(`insert into t values (2), (7), (12)`)
tk.MustExec("set tidb_enforce_mpp=1")
tk.MustPartition(`select * from t where a < 3`, "p0").Sort().Check(testkit.Rows("2"))
tk.MustPartition(`select * from t where a < 8`, "p0,p1").Sort().Check(testkit.Rows("2", "7"))
tk.MustPartition(`select * from t where a < 20`, "all").Sort().Check(testkit.Rows("12", "2", "7"))
tk.MustPartition(`select * from t where a < 5 union all select * from t where a > 10`, "p0").Sort().Check(testkit.Rows("12", "2"))
tk.MustPartition(`select * from t where a < 5 union all select * from t where a > 10`, "p2").Sort().Check(testkit.Rows("12", "2"))
}

func (s *partitionTableSuite) PartitionPruningInTransaction(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
Expand Down
49 changes: 46 additions & 3 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ func (p *PhysicalTableReader) ExplainNormalizedInfo() string {
return ""
}

func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
ts := p.TablePlans[0].(*PhysicalTableScan)
func getAccessObjectForTableScan(sctx sessionctx.Context, ts *PhysicalTableScan, partitionInfo PartitionInfo) string {
pi := ts.Table.GetPartitionInfo()
if pi == nil || !sctx.GetSessionVars().UseDynamicPartitionPrune() {
return ""
Expand All @@ -346,7 +345,51 @@ func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
}
tbl := tmp.(table.PartitionedTable)

return partitionAccessObject(sctx, tbl, pi, &p.PartitionInfo)
return partitionAccessObject(sctx, tbl, pi, &partitionInfo)
}

func (p *PhysicalTableReader) accessObject(sctx sessionctx.Context) string {
if !sctx.GetSessionVars().UseDynamicPartitionPrune() {
return ""
}
if len(p.PartitionInfos) == 0 {
ts := p.TablePlans[0].(*PhysicalTableScan)
return getAccessObjectForTableScan(sctx, ts, p.PartitionInfo)
}
if len(p.PartitionInfos) == 1 {
return getAccessObjectForTableScan(sctx, p.PartitionInfos[0].tableScan, p.PartitionInfos[0].partitionInfo)
}
containsPartitionTable := false
for _, info := range p.PartitionInfos {
if info.tableScan.Table.GetPartitionInfo() != nil {
containsPartitionTable = true
break
}
}
if !containsPartitionTable {
return ""
}
var buffer bytes.Buffer
for index, info := range p.PartitionInfos {
if index > 0 {
buffer.WriteString(", ")
}

tblName := info.tableScan.Table.Name.O
if info.tableScan.TableAsName != nil && info.tableScan.TableAsName.O != "" {
tblName = info.tableScan.TableAsName.O
}

if info.tableScan.Table.GetPartitionInfo() == nil {
buffer.WriteString("table of ")
buffer.WriteString(tblName)
continue
}
buffer.WriteString(getAccessObjectForTableScan(sctx, info.tableScan, info.partitionInfo))
buffer.WriteString(" of ")
buffer.WriteString(tblName)
}
return buffer.String()
}

func partitionAccessObject(sctx sessionctx.Context, tbl table.PartitionedTable, pi *model.PartitionInfo, partTable *PartitionInfo) string {
Expand Down
7 changes: 7 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ var (
_ PhysicalPlan = &PhysicalTableSample{}
)

type tableScanAndPartitionInfo struct {
tableScan *PhysicalTableScan
partitionInfo PartitionInfo
}

// PhysicalTableReader is the table reader in tidb.
type PhysicalTableReader struct {
physicalSchemaProducer
Expand All @@ -86,6 +91,8 @@ type PhysicalTableReader struct {

// Used by partition table.
PartitionInfo PartitionInfo
// Used by MPP, because MPP plan may contain join/union/union all, it is possible that a physical table reader contains more than 1 table scan
PartitionInfos []tableScanAndPartitionInfo
}

// PartitionInfo indicates partition helper info in physical plan.
Expand Down
12 changes: 12 additions & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,17 @@ func (t *mppTask) convertToRootTask(ctx sessionctx.Context) *rootTask {
return t.copy().(*mppTask).convertToRootTaskImpl(ctx)
}

func collectPartitionInfosFromMPPPlan(p *PhysicalTableReader, mppPlan PhysicalPlan) {
switch x := mppPlan.(type) {
case *PhysicalTableScan:
p.PartitionInfos = append(p.PartitionInfos, tableScanAndPartitionInfo{x, x.PartitionInfo})
default:
for _, ch := range mppPlan.Children() {
collectPartitionInfosFromMPPPlan(p, ch)
}
}
}

func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType_PassThrough,
Expand All @@ -2266,6 +2277,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
StoreType: kv.TiFlash,
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()
collectPartitionInfosFromMPPPlan(p, t.p)

cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil)
p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
Expand Down