Skip to content

Commit

Permalink
executor: support selecting store by tidb_replica_read for stale read (
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed Sep 15, 2021
1 parent 83445d6 commit f710f3d
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 15 deletions.
2 changes: 1 addition & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
if builder.ReadReplicaScope == "" {
builder.ReadReplicaScope = kv.GlobalReplicaScope
}
if builder.IsStaleness && builder.ReadReplicaScope != kv.GlobalReplicaScope {
if builder.ReplicaRead.IsClosestRead() && builder.ReadReplicaScope != kv.GlobalReplicaScope {
builder.MatchStoreLabels = []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down
5 changes: 3 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,8 +1866,9 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.IsolationLevel, kv.SI)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, readReplicaType)
}

rander := rand.New(rand.NewSource(e.randSeed)) // #nosec G404
Expand Down
7 changes: 4 additions & 3 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
replicaReadType := e.ctx.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, replicaReadType)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope)
Expand All @@ -138,7 +139,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
}
})

if e.isStaleness && e.readReplicaScope != kv.GlobalTxnScope {
if replicaReadType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down
7 changes: 4 additions & 3 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, readReplicaType)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
e.snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope)
e.snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness)
if e.isStaleness && e.readReplicaScope != kv.GlobalTxnScope {
if readReplicaType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope {
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Expand Down
14 changes: 14 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
assert: "github.com/pingcap/tidb/executor/assertBatchPointStalenessOption",
},
}
tk.MustExec("set @@tidb_replica_read='closest-replicas'")
for _, testcase := range testcases {
failpoint.Enable(testcase.assert, `return("sh")`)
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
Expand All @@ -303,6 +304,19 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
tk.MustExec(`commit`)
failpoint.Disable(testcase.assert)
}
// assert follower read closest read
for _, testcase := range testcases {
failpoint.Enable(testcase.assert, `return("sh")`)
tk.MustExec(`begin;`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
failpoint.Disable(testcase.assert)
}
for _, testcase := range testcases {
failpoint.Enable(testcase.assert, `return("sh")`)
tk.MustQuery(testcase.sql)
failpoint.Disable(testcase.assert)
}
tk.MustExec(`insert into t1 (c,d,e) values (1,1,1);`)
tk.MustExec(`insert into t1 (c,d,e) values (2,3,5);`)
time.Sleep(2 * time.Second)
Expand Down
9 changes: 8 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,18 @@ const (
ReplicaReadLeader ReplicaReadType = iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
// ReplicaReadMixed stands for 'read from leader and follower'.
ReplicaReadMixed
// ReplicaReadClosest stands for 'read from leader and follower which locates with the same zone'
ReplicaReadClosest
)

// IsFollowerRead checks if follower is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r != ReplicaReadLeader
}

// IsClosestRead checks whether is going to request closet store to read
func (r ReplicaReadType) IsClosestRead() bool {
return r == ReplicaReadClosest
}
10 changes: 6 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,8 +2019,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
s.txn.SetVars(s.sessionVars.KVVars)
if s.sessionVars.GetReplicaRead().IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
readReplicaType := s.sessionVars.GetReplicaRead()
if readReplicaType.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, readReplicaType)
}
}
return &s.txn, nil
Expand Down Expand Up @@ -2072,8 +2073,9 @@ func (s *session) NewTxn(ctx context.Context) error {
return err
}
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().GetReplicaRead().IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
replicaReadType := s.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, replicaReadType)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
Expand Down
4 changes: 3 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,13 +1443,15 @@ var defaultSysVars = []*SysVar{
s.EnableNoopFuncs = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBReplicaRead, Value: "leader", Type: TypeEnum, PossibleValues: []string{"leader", "follower", "leader-and-follower"}, SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession, Name: TiDBReplicaRead, Value: "leader", Type: TypeEnum, PossibleValues: []string{"leader", "follower", "leader-and-follower", "closest-replicas"}, SetSession: func(s *SessionVars, val string) error {
if strings.EqualFold(val, "follower") {
s.SetReplicaRead(kv.ReplicaReadFollower)
} else if strings.EqualFold(val, "leader-and-follower") {
s.SetReplicaRead(kv.ReplicaReadMixed)
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.SetReplicaRead(kv.ReplicaReadLeader)
} else if strings.EqualFold(val, "closest-replicas") {
s.SetReplicaRead(kv.ReplicaReadClosest)
}
return nil
}},
Expand Down

0 comments on commit f710f3d

Please sign in to comment.