Skip to content

Commit

Permalink
planner, sessionvar: avoid sending same task id to TiFlash (#23747) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Apr 14, 2021
1 parent b69c9ed commit f9a6c56
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 15 deletions.
2 changes: 0 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ type executorBuilder struct {
snapshotTSCached bool
err error // err is set when there is error happened during Executor building process.
hasLock bool
mppTaskID int64
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
Expand Down Expand Up @@ -2630,7 +2629,6 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
allocTaskID: &b.mppTaskID,
}
return gather
}
Expand Down
5 changes: 2 additions & 3 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ type MPPGather struct {
originalPlan plannercore.PhysicalPlan
startTS uint64

allocTaskID *int64
mppReqs []*kv.MPPDispatchRequest
mppReqs []*kv.MPPDispatchRequest

respIter distsql.SelectResult
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.allocTaskID, e.is)
rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,18 @@ func (s *tiflashTestSuite) TestMppExecution(c *C) {
tk.MustQuery("select count(*) k, t2.a/2 m from t2 group by t2.a / 2 order by m").Check(testkit.Rows("1 0.5000", "1 1.0000", "1 1.5000"))
tk.MustQuery("select count(*) k, t2.a div 2 from t2 group by t2.a div 2 order by k").Check(testkit.Rows("1 0", "2 1"))

// test task id for same start ts.
tk.MustExec("begin")
tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4"))
txn, err := tk.Se.Txn(true)
c.Assert(err, IsNil)
ts := txn.StartTS()
taskID := tk.Se.GetSessionVars().AllocMPPTaskID(ts)
c.Assert(taskID, Equals, int64(6))
tk.MustExec("commit")
taskID = tk.Se.GetSessionVars().AllocMPPTaskID(ts + 1)
c.Assert(taskID, Equals, int64(1))

failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`)
// all the data is related to one store, so there are three tasks.
Expand Down
17 changes: 7 additions & 10 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,14 @@ type Fragment struct {
}

type mppTaskGenerator struct {
ctx sessionctx.Context
startTS uint64
allocTaskID *int64
is infoschema.InfoSchema
ctx sessionctx.Context
startTS uint64
is infoschema.InfoSchema
}

// GenerateRootMPPTasks generate all mpp tasks and return root ones.
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, allocTaskID *int64, is infoschema.InfoSchema) ([]*kv.MPPTask, error) {
g := &mppTaskGenerator{ctx: ctx, startTS: startTs, allocTaskID: allocTaskID, is: is}
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*kv.MPPTask, error) {
g := &mppTaskGenerator{ctx: ctx, startTS: startTs, is: is}
return g.generateMPPTasks(sender)
}

Expand Down Expand Up @@ -84,10 +83,9 @@ func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask)
addr := task.Meta.GetAddress()
_, ok := addressMap[addr]
if !ok {
*e.allocTaskID++
mppTask := &kv.MPPTask{
Meta: &mppAddr{addr: addr},
ID: *e.allocTaskID,
ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS),
StartTs: e.startTS,
TableID: -1,
}
Expand Down Expand Up @@ -240,8 +238,7 @@ func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.
}
tasks := make([]*kv.MPPTask, 0, len(metas))
for _, meta := range metas {
*e.allocTaskID++
tasks = append(tasks, &kv.MPPTask{Meta: meta, ID: *e.allocTaskID, StartTs: e.startTS, TableID: tableID})
tasks = append(tasks, &kv.MPPTask{Meta: meta, ID: e.ctx.GetSessionVars().AllocMPPTaskID(e.startTS), StartTs: e.startTS, TableID: tableID})
}
return tasks, nil
}
18 changes: 18 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ type SessionVars struct {
value string
}

// mppTaskIDAllocator is used to allocate mpp task id for a session.
mppTaskIDAllocator struct {
lastTS uint64
taskID int64
}

// Status stands for the session status. e.g. in transaction or not, auto commit is on or off, and so on.
Status uint16

Expand Down Expand Up @@ -820,6 +826,18 @@ type SessionVars struct {
AllowFallbackToTiKV map[kv.StoreType]struct{}
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
// startTs is different.
func (s *SessionVars) AllocMPPTaskID(startTS uint64) int64 {
if s.mppTaskIDAllocator.lastTS == startTS {
s.mppTaskIDAllocator.taskID++
return s.mppTaskIDAllocator.taskID
}
s.mppTaskIDAllocator.lastTS = startTS
s.mppTaskIDAllocator.taskID = 1
return 1
}

// CheckAndGetTxnScope will return the transaction scope we should use in the current session.
func (s *SessionVars) CheckAndGetTxnScope() string {
if s.InRestrictedSQL {
Expand Down
14 changes: 14 additions & 0 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ func (*testSessionSuite) TestSession(c *C) {
c.Assert(ss.WarningCount(), Equals, uint16(0))
}

func (*testSessionSuite) TestAllocMPPID(c *C) {
ctx := mock.NewContext()

seVar := ctx.GetSessionVars()
c.Assert(seVar, NotNil)

c.Assert(seVar.AllocMPPTaskID(1), Equals, int64(1))
c.Assert(seVar.AllocMPPTaskID(1), Equals, int64(2))
c.Assert(seVar.AllocMPPTaskID(1), Equals, int64(3))
c.Assert(seVar.AllocMPPTaskID(2), Equals, int64(1))
c.Assert(seVar.AllocMPPTaskID(2), Equals, int64(2))
c.Assert(seVar.AllocMPPTaskID(2), Equals, int64(3))
}

func (*testSessionSuite) TestSlowLogFormat(c *C) {
ctx := mock.NewContext()

Expand Down

0 comments on commit f9a6c56

Please sign in to comment.