Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: TiFlash supports stale read #40048

Merged
merged 13 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2923,8 +2923,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=",
version = "v0.0.0-20221213093948-9ccc6beaf0aa",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
hehechen marked this conversation as resolved.
Show resolved Hide resolved
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=",
version = "v0.0.0-20221214071713-abacb15f16f1",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:cbixEWKIx4bCvXE/kmriIv7q/eiZmGxAamcPiTbYd7I=",
version = "v2.0.4-0.20221219091653-fe3536dd5909",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, rootDestinationTask *kv.MPPTask) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, rootDestinationTask)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
10 changes: 6 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type executorBuilder struct {
// can return a correct value even if the session context has already been destroyed
forDataReaderBuilder bool
dataReaderTS uint64
queryTS uint64
localQueryID uint64
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand All @@ -130,6 +132,8 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *Te
isStaleness: staleread.IsStmtStaleness(ctx),
txnScope: txnManager.GetTxnScope(),
readReplicaScope: txnManager.GetReadReplicaScope(),
queryTS: uint64(time.Now().UnixNano()),
localQueryID: plannercore.AllocMPPQueryID(),
hehechen marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -3402,17 +3406,15 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
queryTS: b.queryTS,
localQueryID: b.localQueryID,
}
return gather
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
if v.StoreType != kv.TiKV && b.isStaleness {
b.err = errors.New("stale requests require tikv backend")
return nil
}
failpoint.Inject("checkUseMPP", func(val failpoint.Value) {
if !b.ctx.GetSessionVars().InRestrictedSQL && val.(bool) != useMPPExecution(b.ctx, v) {
if val.(bool) {
Expand Down
29 changes: 18 additions & 11 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type MPPGather struct {
is infoschema.InfoSchema
originalPlan plannercore.PhysicalPlan
startTS uint64
queryTS uint64
localQueryID uint64
hehechen marked this conversation as resolved.
Show resolved Hide resolved

mppReqs []*kv.MPPDispatchRequest

Expand Down Expand Up @@ -78,17 +80,21 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.QueryTs), zap.Uint64("LocalQueryId", mppTask.LocalQueryID),
zap.Uint64("ServerID", mppTask.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
QueryTs: mppTask.QueryTs,
LocalQueryID: mppTask.LocalQueryID,
ServerID: mppTask.ServerID,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand All @@ -109,10 +115,11 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.queryTS, e.localQueryID, sender, e.is)
if err != nil {
return errors.Trace(err)
}
rootDestinationTask := frags[0].ExchangeSender.TargetTasks[0]
for _, frag := range frags {
err = e.appendMPPDispatchReq(frag)
if err != nil {
Expand All @@ -124,7 +131,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, rootDestinationTask)
if err != nil {
return errors.Trace(err)
}
Expand Down
14 changes: 4 additions & 10 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,8 @@ func TestMppExecution(t *testing.T) {
tk.MustExec("begin")
tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4"))
txn, err := tk.Session().Txn(true)
require.NoError(t, err)
ts := txn.StartTS()
taskID := tk.Session().GetSessionVars().AllocMPPTaskID(ts)
require.Equal(t, int64(6), taskID)

tk.MustExec("commit")
taskID = tk.Session().GetSessionVars().AllocMPPTaskID(ts + 1)
require.Equal(t, int64(1), taskID)

failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`)
// all the data is related to one store, so there are three tasks.
Expand Down Expand Up @@ -1043,7 +1037,7 @@ func TestTiFlashPartitionTableBroadcastJoin(t *testing.T) {
}
}

func TestForbidTiflashDuringStaleRead(t *testing.T) {
func TestTiflashSupportStaleRead(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1075,8 +1069,8 @@ func TestForbidTiflashDuringStaleRead(t *testing.T) {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
require.NotContains(t, res, "tiflash")
require.Contains(t, res, "tikv")
require.Contains(t, res, "tiflash")
require.NotContains(t, res, "tikv")
}

func TestForbidTiFlashIfExtraPhysTableIDIsNeeded(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -89,7 +89,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -219,6 +219,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=
github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909 h1:cbixEWKIx4bCvXE/kmriIv7q/eiZmGxAamcPiTbYd7I=
github.com/tikv/client-go/v2 v2.0.4-0.20221219091653-fe3536dd5909/go.mod h1:CUlYic0IhmNy2WU2liHHOEK57Hw+2kQ+SRFcLsnjkPw=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
31 changes: 20 additions & 11 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,25 @@ type MPPTaskMeta interface {

// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
TableID int64 // physical table id
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
TableID int64 // physical table id
hehechen marked this conversation as resolved.
Show resolved Hide resolved

PartitionTableIDs []int64
}

// ToPB generates the pb structure.
func (t *MPPTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.StartTs,
TaskId: t.ID,
StartTs: t.StartTs,
QueryTs: t.QueryTs,
LocalQueryId: t.LocalQueryID,
ServerId: t.ServerID,
TaskId: t.ID,
}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
Expand Down Expand Up @@ -71,10 +77,13 @@ type MPPDispatchRequest struct {
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
SchemaVar int64
StartTs uint64
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64
ServerID uint64
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand All @@ -84,7 +93,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, rootDestinationTask *MPPTask) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
Loading