From f710f3d9a3f6b4bbaa63afb8761ccafd66ad6113 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Wed, 15 Sep 2021 14:52:41 +0800 Subject: [PATCH] executor: support selecting store by tidb_replica_read for stale read (#28033) --- distsql/request_builder.go | 2 +- executor/analyze.go | 5 +++-- executor/batch_point_get.go | 7 ++++--- executor/point_get.go | 7 ++++--- executor/stale_txn_test.go | 14 ++++++++++++++ kv/option.go | 9 ++++++++- session/session.go | 10 ++++++---- sessionctx/variable/sysvar.go | 4 +++- 8 files changed, 43 insertions(+), 15 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index d5aea026ad922..f5e99ec111002 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -50,7 +50,7 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { if builder.ReadReplicaScope == "" { builder.ReadReplicaScope = kv.GlobalReplicaScope } - if builder.IsStaleness && builder.ReadReplicaScope != kv.GlobalReplicaScope { + if builder.ReplicaRead.IsClosestRead() && builder.ReadReplicaScope != kv.GlobalReplicaScope { builder.MatchStoreLabels = []*metapb.StoreLabel{ { Key: placement.DCLabelKey, diff --git a/executor/analyze.go b/executor/analyze.go index 9ac1332e848ef..cb82f50886a64 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -1866,8 +1866,9 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot.SetOption(kv.IsolationLevel, kv.SI) snapshot.SetOption(kv.Priority, kv.PriorityLow) setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot) - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + readReplicaType := e.ctx.GetSessionVars().GetReplicaRead() + if readReplicaType.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, readReplicaType) } rander := rand.New(rand.NewSource(e.randSeed)) // #nosec G404 diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 27b72cc47f55c..4e6c7a5a21c9c 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -123,8 +123,9 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + replicaReadType := e.ctx.GetSessionVars().GetReplicaRead() + if replicaReadType.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, replicaReadType) } snapshot.SetOption(kv.TaskID, stmtCtx.TaskID) snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope) @@ -138,7 +139,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { } }) - if e.isStaleness && e.readReplicaScope != kv.GlobalTxnScope { + if replicaReadType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope { snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, diff --git a/executor/point_get.go b/executor/point_get.go index e3baedf0ff560..a356087eff099 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -161,13 +161,14 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + readReplicaType := e.ctx.GetSessionVars().GetReplicaRead() + if readReplicaType.IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, readReplicaType) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) e.snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope) e.snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness) - if e.isStaleness && e.readReplicaScope != kv.GlobalTxnScope { + if readReplicaType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope { e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{ { Key: placement.DCLabelKey, diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 3a929da0e7b61..622aac1905e9e 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -288,6 +288,7 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { assert: "github.com/pingcap/tidb/executor/assertBatchPointStalenessOption", }, } + tk.MustExec("set @@tidb_replica_read='closest-replicas'") for _, testcase := range testcases { failpoint.Enable(testcase.assert, `return("sh")`) tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`) @@ -303,6 +304,19 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) { tk.MustExec(`commit`) failpoint.Disable(testcase.assert) } + // assert follower read closest read + for _, testcase := range testcases { + failpoint.Enable(testcase.assert, `return("sh")`) + tk.MustExec(`begin;`) + tk.MustQuery(testcase.sql) + tk.MustExec(`commit`) + failpoint.Disable(testcase.assert) + } + for _, testcase := range testcases { + failpoint.Enable(testcase.assert, `return("sh")`) + tk.MustQuery(testcase.sql) + failpoint.Disable(testcase.assert) + } tk.MustExec(`insert into t1 (c,d,e) values (1,1,1);`) tk.MustExec(`insert into t1 (c,d,e) values (2,3,5);`) time.Sleep(2 * time.Second) diff --git a/kv/option.go b/kv/option.go index f4c931c025ed4..682c2be4f2d60 100644 --- a/kv/option.go +++ b/kv/option.go @@ -78,11 +78,18 @@ const ( ReplicaReadLeader ReplicaReadType = iota // ReplicaReadFollower stands for 'read from follower'. ReplicaReadFollower - // ReplicaReadMixed stands for 'read from leader and follower and learner'. + // ReplicaReadMixed stands for 'read from leader and follower'. ReplicaReadMixed + // ReplicaReadClosest stands for 'read from leader and follower which locates with the same zone' + ReplicaReadClosest ) // IsFollowerRead checks if follower is going to be used to read data. func (r ReplicaReadType) IsFollowerRead() bool { return r != ReplicaReadLeader } + +// IsClosestRead checks whether is going to request closet store to read +func (r ReplicaReadType) IsClosestRead() bool { + return r == ReplicaReadClosest +} diff --git a/session/session.go b/session/session.go index 279b82f98ec2d..03b045388f20a 100644 --- a/session/session.go +++ b/session/session.go @@ -2019,8 +2019,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { } s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() s.txn.SetVars(s.sessionVars.KVVars) - if s.sessionVars.GetReplicaRead().IsFollowerRead() { - s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + readReplicaType := s.sessionVars.GetReplicaRead() + if readReplicaType.IsFollowerRead() { + s.txn.SetOption(kv.ReplicaRead, readReplicaType) } } return &s.txn, nil @@ -2072,8 +2073,9 @@ func (s *session) NewTxn(ctx context.Context) error { return err } txn.SetVars(s.sessionVars.KVVars) - if s.GetSessionVars().GetReplicaRead().IsFollowerRead() { - txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + replicaReadType := s.GetSessionVars().GetReplicaRead() + if replicaReadType.IsFollowerRead() { + txn.SetOption(kv.ReplicaRead, replicaReadType) } s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 75f4cc1695e1a..211e852676520 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1443,13 +1443,15 @@ var defaultSysVars = []*SysVar{ s.EnableNoopFuncs = TiDBOptOn(val) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBReplicaRead, Value: "leader", Type: TypeEnum, PossibleValues: []string{"leader", "follower", "leader-and-follower"}, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBReplicaRead, Value: "leader", Type: TypeEnum, PossibleValues: []string{"leader", "follower", "leader-and-follower", "closest-replicas"}, SetSession: func(s *SessionVars, val string) error { if strings.EqualFold(val, "follower") { s.SetReplicaRead(kv.ReplicaReadFollower) } else if strings.EqualFold(val, "leader-and-follower") { s.SetReplicaRead(kv.ReplicaReadMixed) } else if strings.EqualFold(val, "leader") || len(val) == 0 { s.SetReplicaRead(kv.ReplicaReadLeader) + } else if strings.EqualFold(val, "closest-replicas") { + s.SetReplicaRead(kv.ReplicaReadClosest) } return nil }},