diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 48b33e7c1bef3..35ccecec65670 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -124,6 +124,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } snapshot.SetOption(kv.TaskID, stmtCtx.TaskID) + snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { diff --git a/executor/point_get.go b/executor/point_get.go index 78f5553069063..9d09a6acf6fb0 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -149,6 +149,7 @@ func (e *PointGetExecutor) Open(context.Context) error { e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + e.snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope) isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness) if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope { diff --git a/go.mod b/go.mod index ad115a92b4be3..74ef9b0bb07d6 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c + github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307 github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 diff --git a/go.sum b/go.sum index db9370b27cdac..b2583c595388f 100644 --- a/go.sum +++ b/go.sum @@ -433,8 +433,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c h1:cy87vgUJT0U4JuxC7R14PuwBrabI9fDawYhyKTbjOBQ= -github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb h1:6isHwZRl1fc9i1Mggiza2iQcfvVvYAAerFIs5t9msXg= +github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/kv/kv.go b/kv/kv.go index d65258131d184..283bc40078247 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -283,6 +283,8 @@ type Request struct { TaskID uint64 // TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances. TiDBServerID uint64 + // TxnScope is the scope of the current txn. + TxnScope string // IsStaleness indicates whether the request read staleness data IsStaleness bool // MatchStoreLabels indicates the labels the store should be matched diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 50d7d6c0a1546..d8ebf02d48136 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -713,10 +713,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch if worker.kvclient.Stats == nil { worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats) } + req.TxnScope = worker.req.TxnScope if worker.req.IsStaleness { req.EnableStaleRead() } - var ops []tikv.StoreSelectorOption + ops := make([]tikv.StoreSelectorOption, 0, 2) if len(worker.req.MatchStoreLabels) > 0 { ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels)) } diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 036d824a39ff9..1b8bfa163ec85 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -618,6 +618,11 @@ func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServe return nil } +// RawCoprocessor implements implements the tikvpb.TikvServer interface. +func (svr *Server) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) { + panic("unimplemented") +} + func (mrm *MockRegionManager) removeMPPTaskHandler(taskID int64, storeID uint64) error { set := mrm.getMPPTaskSet(storeID) if set == nil { diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 50eb437545024..76da1a2ef8927 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -172,14 +172,19 @@ func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex { // return next leader or follower store's index func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex { + if op.leaderOnly { + return r.workTiKVIdx + } candidates := make([]AccessIndex, 0, r.accessStoreNum(TiKVOnly)) for i := 0; i < r.accessStoreNum(TiKVOnly); i++ { - storeIdx, s := r.accessStore(TiKVOnly, AccessIndex(i)) - if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(AccessIndex(i), op) { + accessIdx := AccessIndex(i) + storeIdx, s := r.accessStore(TiKVOnly, accessIdx) + if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(accessIdx, op) { continue } - candidates = append(candidates, AccessIndex(i)) + candidates = append(candidates, accessIdx) } + // If there is no candidates, send to current workTiKVIdx which generally is the leader. if len(candidates) == 0 { return r.workTiKVIdx } @@ -422,6 +427,8 @@ type RPCContext struct { ProxyAccessIdx AccessIndex // valid when ProxyStore is not nil ProxyAddr string // valid when ProxyStore is not nil TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. + + tryTimes int } func (c *RPCContext) String() string { @@ -438,16 +445,24 @@ func (c *RPCContext) String() string { } type storeSelectorOp struct { - labels []*metapb.StoreLabel + leaderOnly bool + labels []*metapb.StoreLabel } // StoreSelectorOption configures storeSelectorOp. type StoreSelectorOption func(*storeSelectorOp) -// WithMatchLabels indicates selecting stores with matched labels +// WithMatchLabels indicates selecting stores with matched labels. func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption { return func(op *storeSelectorOp) { - op.labels = labels + op.labels = append(op.labels, labels...) + } +} + +// WithLeaderOnly indicates selecting stores with leader only. +func WithLeaderOnly() StoreSelectorOption { + return func(op *storeSelectorOp) { + op.leaderOnly = true } } diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index ed19243a40a6a..c7fe03638ee5a 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/mockstore/mocktikv" "github.com/pingcap/tidb/store/tikv/retry" + "github.com/pingcap/tidb/store/tikv/tikvrpc" pd "github.com/tikv/pd/client" ) @@ -1043,7 +1044,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) { r := ctxTiFlash.Meta reqSend := NewRegionRequestSender(s.cache, nil) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} - reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil) // check leader read should not go to tiflash lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) @@ -1406,11 +1407,11 @@ func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) { c.Assert(ctxFollower1.Store.storeID, Equals, s.store2) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} - reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr) + reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) c.Assert(followReqSeed, Equals, uint32(1)) regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}} - reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr) + reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) c.Assert(followReqSeed, Equals, uint32(2)) } @@ -1437,7 +1438,7 @@ func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) { c.Assert(ctxFollower1.Store.storeID, Equals, s.store1) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} - reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr) + reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) c.Assert(followReqSeed, Equals, uint32(1)) } diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index a90d638233578..9f180c26269f2 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" @@ -286,6 +287,9 @@ func (s *RegionRequestSender) SendReqCtx( if err != nil { return nil, nil, err } + if rpcCtx != nil { + rpcCtx.tryTimes = tryTimes + } failpoint.Inject("invalidCacheAndRetry", func() { // cooperate with github.com/pingcap/tidb/store/gcworker/setGcResolveMaxBackoff @@ -334,8 +338,14 @@ func (s *RegionRequestSender) SendReqCtx( if err != nil { return nil, nil, errors.Trace(err) } + failpoint.Inject("mockDataIsNotReadyError", func(val failpoint.Value) { + regionErr = &errorpb.Error{} + if tryTimesLimit, ok := val.(int); ok && tryTimes <= tryTimesLimit { + regionErr.DataIsNotReady = &errorpb.DataIsNotReady{} + } + }) if regionErr != nil { - retry, err = s.onRegionError(bo, rpcCtx, req.ReplicaReadSeed, regionErr) + retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, &opts) if err != nil { return nil, nil, errors.Trace(err) } @@ -644,7 +654,7 @@ func regionErrorToLabel(e *errorpb.Error) string { return "unknown" } -func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, err error) { +func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, opts *[]StoreSelectorOption) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -652,6 +662,12 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed // bo = bo.Clone() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } + // Stale Read request will retry the leader or next peer on error, + // if txnScope is global, we will only retry the leader by using the WithLeaderOnly option, + // if txnScope is local, we will retry both other peers and the leader by the incresing seed. + if ctx.tryTimes < 1 && req != nil && req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() { + *opts = append(*opts, WithLeaderOnly()) + } metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() if notLeader := regionErr.GetNotLeader(); notLeader != nil { @@ -687,6 +703,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed return true, nil } + seed := req.GetReplicaReadSeed() if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil { logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later", zap.Stringer("EpochNotMatch", epochNotMatch), @@ -723,6 +740,30 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx)) return false, errors.New(regionErr.String()) } + // A stale read request may be sent to a peer which the data is not ready yet, we should retry in this case. + if regionErr.GetDataIsNotReady() != nil { + logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later", + zap.Uint64("store-id", ctx.Store.storeID), + zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()), + zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()), + zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()), + zap.Stringer("ctx", ctx)) + if seed != nil { + *seed = *seed + 1 + } + return true, nil + } + // A read request may be sent to a peer which has not been initialized yet, we should retry in this case. + if regionErr.GetRegionNotInitialized() != nil { + logutil.BgLogger().Warn("tikv reports `RegionNotInitialized` retry later", + zap.Uint64("store-id", ctx.Store.storeID), + zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), + zap.Stringer("ctx", ctx)) + if seed != nil { + *seed = *seed + 1 + } + return true, nil + } if regionErr.GetRegionNotFound() != nil && seed != nil { logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader", zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed)) diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index d323f9f2f7caa..35bb0c185cdb5 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -24,6 +24,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/coprocessor_v2" "github.com/pingcap/kvproto/pkg/errorpb" @@ -33,6 +34,7 @@ import ( "github.com/pingcap/kvproto/pkg/tikvpb" tikverr "github.com/pingcap/tidb/store/tikv/error" "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/config" @@ -185,6 +187,92 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) { kv.StoreLimit.Store(oldStoreLimit) } +// Test whether the Stale Read request will retry the leader or other peers on error. +func (s *testRegionRequestToThreeStoresSuite) TestStaleReadRetry(c *C) { + var seed uint32 = 0 + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed) + req.EnableStaleRead() + + // Test whether a global Stale Read request will only retry on the leader. + req.TxnScope = oracle.GlobalTxnScope + region, err := s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 1 time. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil) + resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer) + + seed = 0 + region, err = s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 2 times. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil) + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer) + + seed = 0 + region, err = s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 3 times. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil) + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer) + + // Test whether a local Stale Read request will retry on the leader and other peers. + req.TxnScope = "local" + seed = 0 + region, err = s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 1 time. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil) + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + peerID1 := ctx.Peer.GetId() + + seed = 0 + region, err = s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 2 times. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil) + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + peerID2 := ctx.Peer.GetId() + c.Assert(peerID2, Not(Equals), peerID1) + + seed = 0 + region, err = s.cache.LocateRegionByID(s.bo, s.regionID) + c.Assert(err, IsNil) + c.Assert(region, NotNil) + // Retry 3 times. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil) + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) + peerID3 := ctx.Peer.GetId() + c.Assert(peerID3, Not(Equals), peerID1) + c.Assert(peerID3, Not(Equals), peerID2) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError"), IsNil) +} + func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart(c *C) { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), @@ -435,6 +523,9 @@ func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request) func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error { return errors.New("unreachable") } +func (s *mockTikvGrpcServer) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) { + return nil, errors.New("unreachable") +} func (s *mockTikvGrpcServer) DispatchMPPTask(context.Context, *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) { return nil, errors.New("unreachable") } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 743111827e85d..e9398e336ce86 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -105,6 +105,7 @@ type KVSnapshot struct { replicaRead kv.ReplicaReadType taskID uint64 isStaleness bool + txnScope string // MatchStoreLabels indicates the labels the store should be matched matchStoreLabels []*metapb.StoreLabel } @@ -306,8 +307,6 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec pending := batch.keys for { - isStaleness := false - var matchStoreLabels []*metapb.StoreLabel s.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{ Keys: pending, @@ -318,13 +317,15 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec TaskId: s.mu.taskID, ResourceGroupTag: s.resourceGroupTag, }) - isStaleness = s.mu.isStaleness - matchStoreLabels = s.mu.matchStoreLabels + txnScope := s.mu.txnScope + isStaleness := s.mu.isStaleness + matchStoreLabels := s.mu.matchStoreLabels s.mu.RUnlock() - var ops []StoreSelectorOption + req.TxnScope = txnScope if isStaleness { req.EnableStaleRead() } + ops := make([]StoreSelectorOption, 0, 2) if len(matchStoreLabels) > 0 { ops = append(ops, WithMatchLabels(matchStoreLabels)) } @@ -452,8 +453,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, cli := NewClientHelper(s.store, s.resolvedLocks) - isStaleness := false - var matchStoreLabels []*metapb.StoreLabel s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) @@ -471,8 +470,8 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, TaskId: s.mu.taskID, ResourceGroupTag: s.resourceGroupTag, }) - isStaleness = s.mu.isStaleness - matchStoreLabels = s.mu.matchStoreLabels + isStaleness := s.mu.isStaleness + matchStoreLabels := s.mu.matchStoreLabels s.mu.RUnlock() var ops []StoreSelectorOption if isStaleness { @@ -620,6 +619,13 @@ func (s *KVSnapshot) SetRuntimeStats(stats *SnapshotRuntimeStats) { s.mu.stats = stats } +// SetTxnScope sets up the txn scope. +func (s *KVSnapshot) SetTxnScope(txnScope string) { + s.mu.Lock() + defer s.mu.Unlock() + s.mu.txnScope = txnScope +} + // SetIsStatenessReadOnly indicates whether the transaction is staleness read only transaction func (s *KVSnapshot) SetIsStatenessReadOnly(b bool) { s.mu.Lock() diff --git a/store/tikv/tikv_test.go b/store/tikv/tikv_test.go index a5b703ee3ff7a..391af8e2663f9 100644 --- a/store/tikv/tikv_test.go +++ b/store/tikv/tikv_test.go @@ -15,9 +15,12 @@ package tikv import ( "flag" + "os" "sync" + "testing" . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/logutil" ) var ( @@ -48,6 +51,13 @@ type testTiKVSuite struct { OneByOneSuite } +func TestT(t *testing.T) { + CustomVerboseFlag = true + logLevel := os.Getenv("log_level") + logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false)) + TestingT(t) +} + var _ = Suite(&testTiKVSuite{}) func (s *testTiKVSuite) TestBasicFunc(c *C) { diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 9f1b4fc0806aa..ae450b6019799 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -180,6 +180,7 @@ type Request struct { Type CmdType Req interface{} kvrpcpb.Context + TxnScope string ReplicaReadType kv.ReplicaReadType // different from `kvrpcpb.Context.ReplicaRead` ReplicaReadSeed *uint32 // pointer to follower read seed in snapshot/coprocessor StoreTp EndpointType @@ -214,6 +215,14 @@ func NewReplicaReadRequest(typ CmdType, pointer interface{}, replicaReadType kv. return req } +// GetReplicaReadSeed returns ReplicaReadSeed pointer. +func (req *Request) GetReplicaReadSeed() *uint32 { + if req != nil { + return req.ReplicaReadSeed + } + return nil +} + // EnableStaleRead enables stale read func (req *Request) EnableStaleRead() { req.StaleRead = true