From c269b117054ad73feb642033e2e2d1f713c70bc9 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Thu, 6 May 2021 13:15:52 +0800 Subject: [PATCH] store/copr: invalidate stale regions for Mpp query. (#24410) --- go.mod | 2 +- go.sum | 4 ++-- store/copr/mpp.go | 7 ++++++ store/mockstore/unistore/tikv/server.go | 30 ------------------------- store/tikv/region_cache.go | 5 +++++ store/tikv/region_request_test.go | 24 -------------------- 6 files changed, 15 insertions(+), 57 deletions(-) diff --git a/go.mod b/go.mod index 4ad29e29e6273..bf927f9cc55ce 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf + github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 diff --git a/go.sum b/go.sum index d4863b5d8b369..74b4f623789b8 100644 --- a/go.sum +++ b/go.sum @@ -436,8 +436,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf h1:y8ZVU2X20+3XZW2M0/B8YAZ8RhsTnOuneXr1UfIKeNU= -github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfejh9/jQ+4UZ5xk9MRYcouWJ0oXRKNE= +github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 74fe82627a036..b170e9fee7f02 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -245,6 +245,13 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, m.sendError(errors.New(realResp.Error.Msg)) return } + if len(realResp.RetryRegions) > 0 { + for _, retry := range realResp.RetryRegions { + id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version) + logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String())) + m.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch) + } + } failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) { if val.(bool) && !req.IsRoot { time.Sleep(1 * time.Second) diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index 478d11d16e131..f571ff4fe963f 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -951,36 +951,6 @@ func (svr *Server) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserv return &kvrpcpb.RemoveLockObserverResponse{}, nil } -// VerGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) { - panic("unimplemented") -} - -// VerBatchGet implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) { - panic("unimplemented") -} - -// VerMut implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) { - panic("unimplemented") -} - -// VerBatchMut implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) { - panic("unimplemented") -} - -// VerScan implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) { - panic("unimplemented") -} - -// VerDeleteRange implements implements the tikvpb.TikvServer interface. -func (svr *Server) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) { - panic("unimplemented") -} - // CheckLeader implements implements the tikvpb.TikvServer interface. func (svr *Server) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { panic("unimplemented") diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 9adfa65dd316a..c06394c6b166c 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1545,6 +1545,11 @@ type RegionVerID struct { ver uint64 } +// NewRegionVerID creates a region ver id, which used for invalidating regions. +func NewRegionVerID(id, confVer, ver uint64) RegionVerID { + return RegionVerID{id, confVer, ver} +} + // GetID returns the id of the region func (r *RegionVerID) GetID() uint64 { return r.id diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index dab843013314b..81e9cc4498a07 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -471,30 +471,6 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques return nil, errors.New("unreachable") } -func (s *mockTikvGrpcServer) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) { - return nil, errors.New("unreachable") -} - -func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) { - return nil, errors.New("unreachable") -} - func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { return nil, errors.New("unreachable") }