Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/core: support union all for mpp. (#24287) #25051

Merged
merged 11 commits into from
Jun 21, 2021
52 changes: 41 additions & 11 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,13 +295,6 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))

return
}
if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
Expand All @@ -316,12 +309,49 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail)
}

for i, detail := range r.selectResp.GetExecutionSummaries() {
// If hasExecutor is true, it means the summary is returned from TiFlash.
hasExecutor := false
for _, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
if detail.ExecutorId != nil {
hasExecutor = true
}
break
}
}
if hasExecutor {
var recorededPlanIDs = make(map[int]int)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
recorededPlanIDs[r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)] = 0
}
}
num := uint64(0)
dummySummary := &tipb.ExecutorExecutionSummary{TimeProcessedNs: &num, NumProducedRows: &num, NumIterations: &num, ExecutorId: nil}
for _, planID := range r.copPlanIDs {
if _, ok := recorededPlanIDs[planID]; !ok {
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneCopTask(planID, r.storeType.Name(), callee, dummySummary)
}
}
} else {
// For cop task cases, we still need this protection.
if len(r.selectResp.GetExecutionSummaries()) != len(r.copPlanIDs) {
logutil.Logger(ctx).Error("invalid cop task execution summaries length",
zap.Int("expected", len(r.copPlanIDs)),
zap.Int("received", len(r.selectResp.GetExecutionSummaries())))
return
}
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
}
}
}
}
Expand Down
24 changes: 10 additions & 14 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ type MPPGather struct {
respIter distsql.SelectResult
}

func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.MPPTask, isRoot bool) error {
func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.ExchangeSender}, kv.TiFlash)
if err != nil {
return errors.Trace(err)
}
for i := range pf.ExchangeSender.Schema().Columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
if !isRoot {
if !pf.IsRoot {
dagReq.EncodeType = tipb.EncodeType_TypeCHBlock
} else {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
for _, mppTask := range tasks {
for _, mppTask := range pf.ExchangeSender.Tasks {
err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.TableID, true)
if err != nil {
return errors.Trace(err)
Expand All @@ -77,20 +77,14 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: isRoot,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
for _, r := range pf.ExchangeReceivers {
err = e.appendMPPDispatchReq(r.GetExchangeSender().Fragment, r.Tasks, false)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

Expand All @@ -108,13 +102,15 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
if err != nil {
return errors.Trace(err)
}
err = e.appendMPPDispatchReq(sender.Fragment, rootTasks, true)
if err != nil {
return errors.Trace(err)
for _, frag := range frags {
err = e.appendMPPDispatchReq(frag)
if err != nil {
return errors.Trace(err)
}
}
failpoint.Inject("checkTotalMPPTasks", func(val failpoint.Value) {
if val.(int) != len(e.mppReqs) {
Expand Down
54 changes: 54 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,60 @@ func (s *tiflashTestSuite) TestMppGoroutinesExitFromErrors(c *C) {
c.Assert(failpoint.Disable(hang), IsNil)
}

func (s *tiflashTestSuite) TestMppUnionAll(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists x1")
tk.MustExec("create table x1(a int , b int);")
tk.MustExec("alter table x1 set tiflash replica 1")
tk.MustExec("drop table if exists x2")
tk.MustExec("create table x2(a int , b int);")
tk.MustExec("alter table x2 set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "x1")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tb = testGetTableByName(c, tk.Se, "test", "x2")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)

tk.MustExec("set @@session.tidb_opt_mpp_outer_join_fixed_build_side=1")

tk.MustExec("insert into x1 values (1, 1), (2, 2), (3, 3), (4, 4)")
tk.MustExec("insert into x2 values (5, 1), (2, 2), (3, 3), (4, 4)")

// test join + union (join + select)
tk.MustQuery("select x1.a, x.a from x1 left join (select x2.b a, x1.b from x1 join x2 on x1.a = x2.b union all select * from x1 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "2 2", "2 2", "3 3", "3 3", "4 4", "4 4"))
tk.MustQuery("select x1.a, x.a from x1 left join (select count(*) a, sum(b) b from x1 group by a union all select * from x2 ) x on x1.a = x.a order by x1.a").Check(testkit.Rows("1 1", "1 1", "1 1", "1 1", "2 2", "3 3", "4 4"))

tk.MustExec("drop table if exists x3")
tk.MustExec("create table x3(a int , b int);")
tk.MustExec("alter table x3 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "x3")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)

tk.MustExec("insert into x3 values (2, 2), (2, 3), (2, 4)")
// test nested union all
tk.MustQuery("select count(*) from (select a, b from x1 union all select a, b from x3 union all (select x1.a, x3.b from (select * from x3 union all select * from x2) x3 left join x1 on x3.a = x1.b))").Check(testkit.Rows("14"))
// test union all join union all
tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29"))
tk.MustExec("set @@session.tidb_broadcast_join_threshold_count=100000")
failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(6)`)
tk.MustQuery("select count(*) from (select * from x1 union all select * from x2 union all select * from x3) x join (select * from x1 union all select * from x2 union all select * from x3) y on x.a = y.b").Check(testkit.Rows("29"))
failpoint.Disable("github.com/pingcap/tidb/executor/checkTotalMPPTasks")

tk.MustExec("drop table if exists x4")
tk.MustExec("create table x4(a int not null, b int not null);")
tk.MustExec("alter table x4 set tiflash replica 1")
tb = testGetTableByName(c, tk.Se, "test", "x4")
err = domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)

tk.MustExec("insert into x4 values (2, 2), (2, 3)")
tk.MustQuery("(select * from x1 union all select * from x4) order by a, b").Check(testkit.Rows("1 1", "2 2", "2 2", "2 3", "3 3", "4 4"))

}

func (s *tiflashTestSuite) TestMppApply(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 1 addition & 2 deletions expression/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ type CorrelatedColumn struct {

// Clone implements Expression interface.
func (col *CorrelatedColumn) Clone() Expression {
var d types.Datum
return &CorrelatedColumn{
Column: col.Column,
Data: &d,
Data: col.Data,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change may cause the data race in ParallelNestedLoopApplyExec, which should fix later

}
}

Expand Down
34 changes: 30 additions & 4 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,7 @@ func (p *baseLogicalPlan) canPushToCop(storeTp kv.StoreType) bool {
}
}
ret = ret && validDs
case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin:
case *LogicalAggregation, *LogicalProjection, *LogicalSelection, *LogicalJoin, *LogicalUnionAll:
if storeTp == kv.TiFlash {
ret = ret && c.canPushToCop(storeTp)
} else {
Expand Down Expand Up @@ -2519,15 +2519,41 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P

func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) {
// TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order.
if !prop.IsEmpty() || prop.IsFlashProp() {
if !prop.IsEmpty() || (prop.IsFlashProp() && prop.TaskTp != property.MppTaskType) {
return nil, true
}
// TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down.
if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType {
return nil, true
}
canUseMpp := p.ctx.GetSessionVars().AllowMPPExecution && p.canPushToCop(kv.TiFlash)
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt})
if canUseMpp && prop.TaskTp == property.MppTaskType {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
})
} else {
chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt})
}
}
ua := PhysicalUnionAll{}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
ua := PhysicalUnionAll{
mpp: canUseMpp && prop.TaskTp == property.MppTaskType,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
ua.SetSchema(p.Schema())
if canUseMpp && prop.TaskTp == property.RootTaskType {
chReqProps = make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
})
}
mppUA := PhysicalUnionAll{mpp: true}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, chReqProps...)
mppUA.SetSchema(p.Schema())
return []PhysicalPlan{ua, mppUA}, true
}
return []PhysicalPlan{ua}, true
}

Expand Down
Loading