Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: refine the error handling and retry mechanism for stale read #24956

Merged
merged 18 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
e.snapshot.SetOption(kv.TxnScope, e.ctx.GetSessionVars().TxnCtx.TxnScope)
isStaleness := e.ctx.GetSessionVars().TxnCtx.IsStaleness
e.snapshot.SetOption(kv.IsStalenessReadOnly, isStaleness)
if isStaleness && e.ctx.GetSessionVars().TxnCtx.TxnScope != kv.GlobalTxnScope {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c h1:cy87vgUJT0U4JuxC7R14PuwBrabI9fDawYhyKTbjOBQ=
github.com/pingcap/kvproto v0.0.0-20210507054410-a8152f8a876c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb h1:6isHwZRl1fc9i1Mggiza2iQcfvVvYAAerFIs5t9msXg=
github.com/pingcap/kvproto v0.0.0-20210531063847-f42e582bf0bb/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ type Request struct {
TaskID uint64
// TiDBServerID is the specified TiDB serverID to execute request. `0` means all TiDB instances.
TiDBServerID uint64
// TxnScope is the scope of the current txn.
TxnScope string
// IsStaleness indicates whether the request read staleness data
IsStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
Expand Down
3 changes: 2 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,10 +713,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
if worker.kvclient.Stats == nil {
worker.kvclient.Stats = make(map[tikvrpc.CmdType]*tikv.RPCRuntimeStats)
}
req.TxnScope = worker.req.TxnScope
if worker.req.IsStaleness {
req.EnableStaleRead()
}
var ops []tikv.StoreSelectorOption
ops := make([]tikv.StoreSelectorOption, 0, 2)
if len(worker.req.MatchStoreLabels) > 0 {
ops = append(ops, tikv.WithMatchLabels(worker.req.MatchStoreLabels))
}
Expand Down
5 changes: 5 additions & 0 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ func (svr *Server) BatchCoprocessor(req *coprocessor.BatchRequest, batchCopServe
return nil
}

// RawCoprocessor implements implements the tikvpb.TikvServer interface.
func (svr *Server) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) {
panic("unimplemented")
}

func (mrm *MockRegionManager) removeMPPTaskHandler(taskID int64, storeID uint64) error {
set := mrm.getMPPTaskSet(storeID)
if set == nil {
Expand Down
26 changes: 20 additions & 6 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,20 @@ func (r *RegionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {

// return next leader or follower store's index
func (r *RegionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
fmt.Println(op, seed)
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
if op.leaderOnly {
return r.workTiKVIdx
}
candidates := make([]AccessIndex, 0, r.accessStoreNum(TiKVOnly))
for i := 0; i < r.accessStoreNum(TiKVOnly); i++ {
storeIdx, s := r.accessStore(TiKVOnly, AccessIndex(i))
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(AccessIndex(i), op) {
accessIdx := AccessIndex(i)
storeIdx, s := r.accessStore(TiKVOnly, accessIdx)
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(accessIdx, op) {
continue
}
candidates = append(candidates, AccessIndex(i))
candidates = append(candidates, accessIdx)
}
// If there is no candidates, send to current workTiKVIdx which generally is the leader.
if len(candidates) == 0 {
return r.workTiKVIdx
}
Expand Down Expand Up @@ -433,16 +439,24 @@ func (c *RPCContext) String() string {
}

type storeSelectorOp struct {
labels []*metapb.StoreLabel
leaderOnly bool
labels []*metapb.StoreLabel
}

// StoreSelectorOption configures storeSelectorOp.
type StoreSelectorOption func(*storeSelectorOp)

// WithMatchLabels indicates selecting stores with matched labels
// WithMatchLabels indicates selecting stores with matched labels.
func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
return func(op *storeSelectorOp) {
op.labels = labels
op.labels = append(op.labels, labels...)
}
}

// WithLeaderOnly indicates selecting stores with leader only.
func WithLeaderOnly() StoreSelectorOption {
return func(op *storeSelectorOp) {
op.leaderOnly = true
}
}

Expand Down
9 changes: 5 additions & 4 deletions store/tikv/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -1043,7 +1044,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash(c *C) {
r := ctxTiFlash.Meta
reqSend := NewRegionRequestSender(s.cache, nil)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}}
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr)
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil)

// check leader read should not go to tiflash
lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
Expand Down Expand Up @@ -1406,11 +1407,11 @@ func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch(c *C) {
c.Assert(ctxFollower1.Store.storeID, Equals, s.store2)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(1))

regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(2))
}

Expand All @@ -1437,7 +1438,7 @@ func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch(c *C) {
c.Assert(ctxFollower1.Store.storeID, Equals, s.store1)

regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &followReqSeed, regionErr)
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
c.Assert(followReqSeed, Equals, uint32(1))
}

Expand Down
42 changes: 40 additions & 2 deletions store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -323,8 +324,14 @@ func (s *RegionRequestSender) SendReqCtx(
if err != nil {
return nil, nil, errors.Trace(err)
}
failpoint.Inject("mockDataIsNotReadyError", func(val failpoint.Value) {
regionErr = &errorpb.Error{}
if tryTimesLimit, ok := val.(int); ok && tryTimes <= tryTimesLimit {
regionErr.DataIsNotReady = &errorpb.DataIsNotReady{}
}
})
if regionErr != nil {
retry, err = s.onRegionError(bo, rpcCtx, req.ReplicaReadSeed, regionErr)
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, &opts)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -633,14 +640,20 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "unknown"
}

func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed *uint32, regionErr *errorpb.Error) (shouldRetry bool, err error) {
func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, opts *[]StoreSelectorOption) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
defer span1.Finish()
// TODO(MyonKeminta): Make sure trace works without cloning the backoffer.
// bo = bo.Clone()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the incresing seed.
if req != nil && req.GetStaleRead() && req.TxnScope == oracle.GlobalTxnScope {
*opts = append(*opts, WithLeaderOnly())
}

metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
Expand Down Expand Up @@ -676,6 +689,7 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
return true, nil
}

seed := req.GetReplicaReadSeed()
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later",
zap.Stringer("EpochNotMatch", epochNotMatch),
Expand Down Expand Up @@ -712,6 +726,30 @@ func (s *RegionRequestSender) onRegionError(bo *Backoffer, ctx *RPCContext, seed
logutil.BgLogger().Warn("tikv reports `RaftEntryTooLarge`", zap.Stringer("ctx", ctx))
return false, errors.New(regionErr.String())
}
// A stale read request may be sent to a peer which the data is not ready yet, we should retry in this case.
if regionErr.GetDataIsNotReady() != nil {
logutil.BgLogger().Warn("tikv reports `DataIsNotReady` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("peer-id", regionErr.GetDataIsNotReady().GetPeerId()),
zap.Uint64("region-id", regionErr.GetDataIsNotReady().GetRegionId()),
zap.Uint64("safe-ts", regionErr.GetDataIsNotReady().GetSafeTs()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
// A read request may be sent to a peer which has not been initialized yet, we should retry in this case.
if regionErr.GetRegionNotInitialized() != nil {
logutil.BgLogger().Warn("tikv reports `RegionNotInitialized` retry later",
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
if regionErr.GetRegionNotFound() != nil && seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed))
Expand Down
91 changes: 91 additions & 0 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/coprocessor_v2"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/kvproto/pkg/tikvpb"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/retry"

"github.com/pingcap/tidb/store/tikv/config"
Expand Down Expand Up @@ -185,6 +187,92 @@ func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit(c *C) {
kv.StoreLimit.Store(oldStoreLimit)
}

// Test whether the Stale Read request will retry the leader or other peers on error.
func (s *testRegionRequestToThreeStoresSuite) TestStaleReadRetry(c *C) {
var seed uint32 = 0
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadMixed, &seed)
req.EnableStaleRead()

// Test whether a global Stale Read request will only retry on the leader.
req.TxnScope = oracle.GlobalTxnScope
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 1 time.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil)
resp, ctx, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 2 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 3 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
c.Assert(ctx.Peer.GetId(), Equals, s.leaderPeer)

// Test whether a local Stale Read request will retry on the leader and other peers.
req.TxnScope = "local"
seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 1 time.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(1)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID1 := ctx.Peer.GetId()

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 2 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(2)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID2 := ctx.Peer.GetId()
c.Assert(peerID2, Not(Equals), peerID1)

seed = 0
region, err = s.cache.LocateRegionByID(s.bo, s.regionID)
c.Assert(err, IsNil)
c.Assert(region, NotNil)
// Retry 3 times.
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError", `return(3)`), IsNil)
resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
c.Assert(err, IsNil)
c.Assert(resp.Resp, NotNil)
c.Assert(ctx, NotNil)
peerID3 := ctx.Peer.GetId()
c.Assert(peerID3, Not(Equals), peerID1)
c.Assert(peerID3, Not(Equals), peerID2)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockDataIsNotReadyError"), IsNil)
}

func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart(c *C) {
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
Key: []byte("key"),
Expand Down Expand Up @@ -435,6 +523,9 @@ func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request)
func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error {
return errors.New("unreachable")
}
func (s *mockTikvGrpcServer) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) {
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) DispatchMPPTask(context.Context, *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) {
return nil, errors.New("unreachable")
}
Expand Down
Loading