Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick follower read to TiDB (#11347) #12535

Merged
merged 5 commits into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
return builder
}

Expand Down
33 changes: 33 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -429,6 +431,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -474,6 +477,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -554,3 +558,32 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower

concurrency := 10

actual, err := (&RequestBuilder{}).
SetFromSessionVars(vars).
SetConcurrency(concurrency).
Build()
c.Assert(err, IsNil)

expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadFollower,
}

c.Assert(actual, DeepEquals, expect)
}
12 changes: 10 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -914,6 +915,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(t.StartKey, t.EndKey)
if err != nil {
Expand All @@ -931,10 +935,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))

for i := workID; i < len(e.sampTasks); i += e.concurrency {
task := e.sampTasks[i]
if task.SampSize == 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ PARTITION BY RANGE ( a ) (
}
}

func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
tk.MustExec("analyze table t")
}

func (s *testSuite1) TestAnalyzeParameters(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190917040145-a90dba59f50d
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d h1:vgmgBxetJN6YXdlbpjC0sxVceFrCV2aWlXETt7hGwH0=
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
27 changes: 27 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
)

// Priority value for transaction priority.
Expand All @@ -68,6 +70,23 @@ const (
RC
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
Expand Down Expand Up @@ -214,6 +233,8 @@ type Request struct {
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down Expand Up @@ -245,6 +266,12 @@ type Snapshot interface {
BatchGet(keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
3 changes: 3 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}

func (s *mockSnapshot) SetOption(opt Option, val interface{}) {}
func (s *mockSnapshot) DelOption(opt Option) {}
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ type session struct {
statsCollector *handle.SessionStatsCollector
// ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerChecker owner.DDLOwnerChecker

// shared coprocessor client per session
client kv.Client
}

// DDLOwnerChecker returns s.ddlOwnerChecker.
Expand Down Expand Up @@ -495,7 +498,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
}

func (s *session) GetClient() kv.Client {
return s.store.GetClient()
return s.client
}

func (s *session) String() string {
Expand Down Expand Up @@ -1268,6 +1271,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
if s.sessionVars.ReplicaRead.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
return &s.txn, nil
}
Expand All @@ -1291,6 +1297,9 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand Down Expand Up @@ -1570,6 +1579,7 @@ func createSession(store kv.Storage) (*session, error) {
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
ddlOwnerChecker: dom.DDL().OwnerManager(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand All @@ -1593,6 +1603,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
store: store,
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,3 +2714,15 @@ func (s *testSessionSuite) TestGrantViewRelated(c *C) {
tkUser.MustQuery("select current_user();").Check(testkit.Rows("u_version29@%"))
tkUser.MustExec("create view v_version29_c as select * from v_version29;")
}

func (s *testSessionSuite) TestReplicaRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ type SessionVars struct {

// ConnectionInfo indicates current connection info used by current session, only be lazy assigned by plugin.
ConnectionInfo *ConnectionInfo

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -444,6 +447,7 @@ func NewSessionVars() *SessionVars {
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
ReplicaRead: kv.ReplicaReadLeader,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -828,6 +832,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
// It's a global variable, but it also wants to be cached in server.
case TiDBMaxDeltaSchemaCount:
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeSession, TiDBReplicaRead, "leader"},
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
}

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) {

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")

f = GetSysVar("tidb_replica_read")
c.Assert(f.Value, Equals, "leader")
}

func (*testSysVarSuite) TestTxnMode(c *C) {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ const (

// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"

// TiDBReplicaRead is used for reading data from replicas, followers for example.
TiDBReplicaRead = "tidb_replica_read"
)

// TiDB system variable names that both in session and global scope.
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value)
}
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBEnableStmtSummary:
switch {
case strings.EqualFold(value, "ON") || value == "1":
Expand Down
Loading