Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: TiFlash supports stale read #40048

Merged
merged 13 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2923,8 +2923,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=",
version = "v0.0.0-20221130022225-6c56ac56fe5f",
sum = "h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=",
version = "v0.0.0-20221213093948-9ccc6beaf0aa",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3523,12 +3523,20 @@ def go_deps():
sum = "h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=",
version = "v0.0.0-20181126055449-889f96f722a2",
)
go_repository(
name = "com_github_tiancaiamao_gp",
hehechen marked this conversation as resolved.
Show resolved Hide resolved
build_file_proto_mode = "disable",
importpath = "github.com/tiancaiamao/gp",
sum = "h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=",
version = "v0.0.0-20221221095600-1a473d1f9b4b",
)

go_repository(
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=",
version = "v2.0.3",
sum = "h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=",
version = "v2.0.4-0.20221226080148-018c59dbd837",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ import (
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int, startTs uint64, mppQueryID kv.MPPQueryID) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = SetTiFlashMaxThreadsInContext(ctx, sctx)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs)
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback, startTs, mppQueryID)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
4 changes: 4 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,10 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
// Clean the MPP query info
sessVars.StmtCtx.MPPQueryInfo.QueryID.Store(0)
sessVars.StmtCtx.MPPQueryInfo.QueryTS.Store(0)
sessVars.StmtCtx.MPPQueryInfo.AllocatedMPPTaskID.Store(0)

if sessVars.StmtCtx.ReadFromTableCache {
metrics.ReadFromTableCacheCounter.Inc()
Expand Down
5 changes: 1 addition & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3402,17 +3402,14 @@ func (b *executorBuilder) buildMPPGather(v *plannercore.PhysicalTableReader) Exe
is: b.is,
originalPlan: v.GetTablePlan(),
startTS: startTs,
mppQueryID: kv.MPPQueryID{QueryTs: getMPPQueryTS(b.ctx), LocalQueryID: getMPPQueryID(b.ctx), ServerID: domain.GetDomain(b.ctx).ServerID()},
}
return gather
}

// buildTableReader builds a table reader executor. It first build a no range table reader,
// and then update it ranges from table scan plan.
func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) Executor {
if v.StoreType != kv.TiKV && b.isStaleness {
b.err = errors.New("stale requests require tikv backend")
return nil
}
failpoint.Inject("checkUseMPP", func(val failpoint.Value) {
if !b.ctx.GetSessionVars().InRestrictedSQL && val.(bool) != useMPPExecution(b.ctx, v) {
if val.(bool) {
Expand Down
2 changes: 2 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -211,6 +212,7 @@ func defaultCtx() sessionctx.Context {
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, ctx.GetSessionVars().MemQuotaQuery)
ctx.GetSessionVars().StmtCtx.DiskTracker = disk.NewTracker(-1, -1)
ctx.GetSessionVars().SnapshotTS = uint64(1)
domain.BindDomain(ctx, domain.NewMockDomain())
return ctx
}

Expand Down
38 changes: 27 additions & 11 deletions executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -38,13 +39,26 @@ func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader
return ok
}

func getMPPQueryID(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryID.CompareAndSwap(0, plannercore.AllocMPPQueryID())
return mppQueryInfo.QueryID.Load()
}

func getMPPQueryTS(ctx sessionctx.Context) uint64 {
mppQueryInfo := &ctx.GetSessionVars().StmtCtx.MPPQueryInfo
mppQueryInfo.QueryTS.CompareAndSwap(0, uint64(time.Now().UnixNano()))
return mppQueryInfo.QueryTS.Load()
}

// MPPGather dispatch MPP tasks and read data from root tasks.
type MPPGather struct {
// following fields are construct needed
baseExecutor
is infoschema.InfoSchema
originalPlan plannercore.PhysicalPlan
startTS uint64
mppQueryID kv.MPPQueryID

mppReqs []*kv.MPPDispatchRequest

Expand Down Expand Up @@ -78,17 +92,19 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment) error {
return errors.Trace(err)
}
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", mppTask.StartTs),
zap.Int64("ID", mppTask.ID), zap.String("address", mppTask.Meta.GetAddress()),
zap.Int64("ID", mppTask.ID), zap.Uint64("QueryTs", mppTask.MppQueryID.QueryTs), zap.Uint64("LocalQueryId", mppTask.MppQueryID.LocalQueryID),
zap.Uint64("ServerID", mppTask.MppQueryID.ServerID), zap.String("address", mppTask.Meta.GetAddress()),
zap.String("plan", plannercore.ToString(pf.ExchangeSender)))
req := &kv.MPPDispatchRequest{
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
State: kv.MppTaskReady,
Data: pbData,
Meta: mppTask.Meta,
ID: mppTask.ID,
IsRoot: pf.IsRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
MppQueryID: mppTask.MppQueryID,
State: kv.MppTaskReady,
}
e.mppReqs = append(e.mppReqs, req)
}
Expand All @@ -109,7 +125,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.is)
frags, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, e.mppQueryID, sender, e.is)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -124,7 +140,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
failpoint.Return(errors.Errorf("The number of tasks is not right, expect %d tasks but actually there are %d tasks", val.(int), len(e.mppReqs)))
}
})
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id, e.startTS, e.mppQueryID)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions executor/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"//executor",
"//meta/autoid",
"//parser/terror",
"//planner/core",
"//store/mockstore",
"//store/mockstore/unistore",
"//testkit",
Expand Down
16 changes: 6 additions & 10 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -267,14 +268,9 @@ func TestMppExecution(t *testing.T) {
tk.MustExec("begin")
tk.MustQuery("select count(*) from ( select * from t2 group by a, b) A group by A.b").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from t1 where t1.a+100 > ( select count(*) from t2 where t1.a=t2.a and t1.b=t2.b) group by t1.b").Check(testkit.Rows("4"))
txn, err := tk.Session().Txn(true)
require.NoError(t, err)
ts := txn.StartTS()
taskID := tk.Session().GetSessionVars().AllocMPPTaskID(ts)
require.Equal(t, int64(6), taskID)
tk.MustExec("commit")
taskID = tk.Session().GetSessionVars().AllocMPPTaskID(ts + 1)
taskID := plannercore.AllocMPPTaskID(tk.Session())
require.Equal(t, int64(1), taskID)
tk.MustExec("commit")

failpoint.Enable("github.com/pingcap/tidb/executor/checkTotalMPPTasks", `return(3)`)
// all the data is related to one store, so there are three tasks.
Expand Down Expand Up @@ -1043,7 +1039,7 @@ func TestTiFlashPartitionTableBroadcastJoin(t *testing.T) {
}
}

func TestForbidTiflashDuringStaleRead(t *testing.T) {
func TestTiflashSupportStaleRead(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1075,8 +1071,8 @@ func TestForbidTiflashDuringStaleRead(t *testing.T) {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
require.NotContains(t, res, "tiflash")
require.Contains(t, res, "tikv")
require.Contains(t, res, "tiflash")
require.NotContains(t, res, "tikv")
}

func TestForbidTiFlashIfExtraPhysTableIDIsNeeded(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -89,7 +89,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down Expand Up @@ -219,6 +219,7 @@ require (
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f h1:46ZD6xzQWJ8Jkeal/U7SqkX030Mgs8DAn6QV/9zbqOQ=
github.com/pingcap/kvproto v0.0.0-20221130022225-6c56ac56fe5f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa h1:v0Z0nC0knwWHn3e9br8EMNfLBB14QDULn142UGjiTMQ=
github.com/pingcap/kvproto v0.0.0-20221213093948-9ccc6beaf0aa/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -932,8 +932,10 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=
github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837 h1:m6glgBGCIds9QURbk8Mn+8mjLKDcv6nWrNwYh92fydQ=
github.com/tikv/client-go/v2 v2.0.4-0.20221226080148-018c59dbd837/go.mod h1:ptS8K+VBrEH2gIS3JxaiFSSLfDDyuS2xcdLozOtBWBw=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
34 changes: 23 additions & 11 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,32 @@ type MPPTaskMeta interface {
GetAddress() string
}

// MPPQueryID means the global unique id of a mpp query.
type MPPQueryID struct {
QueryTs uint64 // timestamp of query execution, used for TiFlash minTSO schedule
LocalQueryID uint64 // unique mpp query id in local tidb memory.
ServerID uint64
}

// MPPTask means the minimum execution unit of a mpp computation job.
type MPPTask struct {
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
TableID int64 // physical table id
Meta MPPTaskMeta // on which store this task will execute
ID int64 // mppTaskID
StartTs uint64
MppQueryID MPPQueryID
TableID int64 // physical table id

PartitionTableIDs []int64
}

// ToPB generates the pb structure.
func (t *MPPTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.StartTs,
TaskId: t.ID,
StartTs: t.StartTs,
QueryTs: t.MppQueryID.QueryTs,
LocalQueryId: t.MppQueryID.LocalQueryID,
ServerId: t.MppQueryID.ServerID,
TaskId: t.ID,
}
if t.ID != -1 {
meta.Address = t.Meta.GetAddress()
Expand Down Expand Up @@ -70,10 +81,11 @@ type MPPDispatchRequest struct {
IsRoot bool // root task returns data to tidb directly.
Timeout uint64 // If task is assigned but doesn't receive a connect request during timeout, the task should be destroyed.
// SchemaVer is for any schema-ful storage (like tiflash) to validate schema correctness if necessary.
SchemaVar int64
StartTs uint64
ID int64 // identify a single task
State MppTaskStates
SchemaVar int64
StartTs uint64
MppQueryID MPPQueryID
ID int64 // identify a single task
State MppTaskStates
}

// MPPClient accepts and processes mpp requests.
Expand All @@ -83,7 +95,7 @@ type MPPClient interface {
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64, mppQueryID MPPQueryID) Response
}

// MPPBuildTasksRequest request the stores allocation for a mpp plan fragment.
Expand Down
Loading