Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: hehechen <chentongli@pingcap.com>
  • Loading branch information
hehechen committed Dec 27, 2022
1 parent 705a542 commit cfa2737
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 80 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3527,16 +3527,16 @@ def go_deps():
name = "com_github_tiancaiamao_gp",
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=",
version = "v0.0.0-20221214071713-abacb15f16f1",
sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=",
version = "v0.0.0-20221221095600-1a473d1f9b4b",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:cbixEWKIx4bCvXE/kmriIv7q/eiZmGxAamcPiTbYd7I=",
version = "v2.0.4-0.20221219091653-fe3536dd5909",
sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=",
version = "v2.0.4-0.20221226080148-018c59dbd837",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
9 changes: 3 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ type executorBuilder struct {
// can return a correct value even if the session context has already been destroyed
forDataReaderBuilder bool
dataReaderTS uint64
queryTS uint64
localQueryID uint64
mppQueryID kv.MPPQueryID
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand All @@ -132,8 +131,7 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *Te
isStaleness: staleread.IsStmtStaleness(ctx),
txnScope: txnManager.GetTxnScope(),
readReplicaScope: txnManager.GetReadReplicaScope(),
queryTS: getMPPQueryTS(ctx),
localQueryID: getMPPQueryID(ctx),
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(ctx), LocalQueryID: getMPPQueryID(ctx), ServerID: domain.GetDomain(ctx).ServerID()},
}
}

Expand Down Expand Up @@ -3406,8 +3404,7 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
queryTS: b.queryTS,
localQueryID: b.localQueryID,
mppQueryID: b.mppQueryID,
}
return gather
}
Expand Down
30 changes: 14 additions & 16 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -59,6 +58,7 @@ type MPPGather struct {
is infoschema.InfoSchema
originalPlan plannercore.PhysicalPlan
startTS uint64
mppQueryID kv.MPPQueryID
queryTS uint64
localQueryID uint64

Expand Down Expand Up @@ -94,21 +94,19 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.QueryTs), zap.Uint64("LocalQueryId", mppTask.LocalQueryID),
zap.Uint64("ServerID", mppTask.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
QueryTs: mppTask.QueryTs,
LocalQueryID: mppTask.LocalQueryID,
ServerID: mppTask.ServerID,
State: kv.MppTaskReady,
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
MppQueryID: mppTask.MppQueryID,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand All @@ -129,7 +127,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.queryTS, e.localQueryID, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.mppQueryID, sender, e.is)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -144,7 +142,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, kv.MPPQueryID{QueryTs: e.queryTS, LocalQueryID: e.localQueryID, ServerID: domain.GetDomain(e.ctx).ServerID()})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"//executor",
"//meta/autoid",
"//parser/terror",
"//planner/core",
"//store/mockstore",
"//store/mockstore/unistore",
"//testkit",
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -219,7 +219,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 // indirect
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -932,10 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909 h1:cbixEWKIx4bCvXE/kmriIv7q/eiZmGxAamcPiTbYd7I=
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909/go.mod h1:CUlYic0IhmNy2WU2liHHOEK57Hw+2kQ+SRFcLsnjkPw=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837/go.mod h1:ptS8K+VBrEH2gIS3JxaiFSSLfDDyuS2xcdLozOtBWBw=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
31 changes: 14 additions & 17 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type MPPTaskMeta interface {
GetAddress() string
}

// MPPQueryID means the global unique id of a mpp query.
type MPPQueryID struct {
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
Expand All @@ -36,13 +37,11 @@ type MPPQueryID struct {

// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
TableID int64 // physical table id
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id

PartitionTableIDs []int64
}
Expand All @@ -51,9 +50,9 @@ type MPPTask struct {
func (t *MPPTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.StartTs,
QueryTs: t.QueryTs,
LocalQueryId: t.LocalQueryID,
ServerId: t.ServerID,
QueryTs: t.MppQueryID.QueryTs,
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
}
if t.ID != -1 {
Expand Down Expand Up @@ -83,13 +82,11 @@ type MPPDispatchRequest struct {
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64
ServerID uint64
ID int64 // identify a single task
State MppTaskStates
SchemaVar int64
StartTs uint64
MppQueryID MPPQueryID
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand Down
52 changes: 22 additions & 30 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -81,28 +80,27 @@ type tasksAndFrags struct {
}

type mppTaskGenerator struct {
ctx sessionctx.Context
startTS uint64
localQueryID uint64
queryTS uint64
is infoschema.InfoSchema
frags []*Fragment
cache map[int]tasksAndFrags
ctx sessionctx.Context
startTS uint64
mppQueryID kv.MPPQueryID
is infoschema.InfoSchema
frags []*Fragment
cache map[int]tasksAndFrags
}

// GenerateRootMPPTasks generate all mpp tasks and return root ones.
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, queryTs uint64, localQueryID uint64, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) {
func GenerateRootMPPTasks(ctx sessionctx.Context, startTs uint64, mppQueryID kv.MPPQueryID, sender *PhysicalExchangeSender, is infoschema.InfoSchema) ([]*Fragment, error) {
g := &mppTaskGenerator{
ctx: ctx,
startTS: startTs,
queryTS: queryTs,
localQueryID: localQueryID,
is: is,
cache: make(map[int]tasksAndFrags),
ctx: ctx,
startTS: startTs,
mppQueryID: mppQueryID,
is: is,
cache: make(map[int]tasksAndFrags),
}
return g.generateMPPTasks(sender)
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id when the query finished.
func AllocMPPTaskID(ctx sessionctx.Context) int64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
return mppQueryInfo.AllocatedMPPTaskID.Add(1)
Expand All @@ -118,11 +116,9 @@ func AllocMPPQueryID() uint64 {
func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*Fragment, error) {
logutil.BgLogger().Info("Mpp will generate tasks", zap.String("plan", ToString(s)))
tidbTask := &kv.MPPTask{
StartTs: e.startTS,
QueryTs: e.queryTS,
LocalQueryID: e.localQueryID,
ServerID: domain.GetDomain(e.ctx).ServerID(),
ID: -1,
StartTs: e.startTS,
MppQueryID: e.mppQueryID,
ID: -1,
}
_, frags, err := e.generateMPPTasksForExchangeSender(s)
if err != nil {
Expand Down Expand Up @@ -153,13 +149,11 @@ func (e *mppTaskGenerator) constructMPPTasksByChildrenTasks(tasks []*kv.MPPTask)
_, ok := addressMap[addr]
if !ok {
mppTask := &kv.MPPTask{
Meta: &mppAddr{addr: addr},
ID: AllocMPPTaskID(e.ctx),
QueryTs: e.queryTS,
LocalQueryID: e.localQueryID,
ServerID: domain.GetDomain(e.ctx).ServerID(),
StartTs: e.startTS,
TableID: -1,
Meta: &mppAddr{addr: addr},
ID: AllocMPPTaskID(e.ctx),
MppQueryID: e.mppQueryID,
StartTs: e.startTS,
TableID: -1,
}
newTasks = append(newTasks, mppTask)
addressMap[addr] = struct{}{}
Expand Down Expand Up @@ -412,9 +406,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
task := &kv.MPPTask{Meta: meta,
ID: AllocMPPTaskID(e.ctx),
StartTs: e.startTS,
QueryTs: e.queryTS,
LocalQueryID: e.localQueryID,
ServerID: domain.GetDomain(e.ctx).ServerID(),
MppQueryID: e.mppQueryID,
TableID: ts.Table.ID,
PartitionTableIDs: allPartitionsIDs}
tasks = append(tasks, task)
Expand Down
2 changes: 1 addition & 1 deletion store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
}

// meta for current task.
taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, QueryTs: req.QueryTs, LocalQueryId: req.LocalQueryID, TaskId: req.ID, ServerId: req.ServerID,
taskMeta := &mpp.TaskMeta{StartTs: req.StartTs, QueryTs: req.MppQueryID.QueryTs, LocalQueryId: req.MppQueryID.LocalQueryID, TaskId: req.ID, ServerId: req.MppQueryID.ServerID,
Address: req.Meta.GetAddress()}

mppReq := &mpp.DispatchTaskRequest{
Expand Down

0 comments on commit cfa2737

Please sign in to comment.