Skip to content

Commit

Permalink
store/copr: invalidate stale regions for Mpp query. (#24410)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored May 6, 2021
1 parent 0d4e66e commit c269b11
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf
github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf h1:y8ZVU2X20+3XZW2M0/B8YAZ8RhsTnOuneXr1UfIKeNU=
github.com/pingcap/kvproto v0.0.0-20210402093459-65aa336ccbbf/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfejh9/jQ+4UZ5xk9MRYcouWJ0oXRKNE=
github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand Down
7 changes: 7 additions & 0 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer,
m.sendError(errors.New(realResp.Error.Msg))
return
}
if len(realResp.RetryRegions) > 0 {
for _, retry := range realResp.RetryRegions {
id := tikv.NewRegionVerID(retry.Id, retry.RegionEpoch.ConfVer, retry.RegionEpoch.Version)
logutil.BgLogger().Info("invalid region because tiflash detected stale region", zap.String("region id", id.String()))
m.store.GetRegionCache().InvalidateCachedRegionWithReason(id, tikv.EpochNotMatch)
}
}
failpoint.Inject("mppNonRootTaskError", func(val failpoint.Value) {
if val.(bool) && !req.IsRoot {
time.Sleep(1 * time.Second)
Expand Down
30 changes: 0 additions & 30 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,36 +951,6 @@ func (svr *Server) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserv
return &kvrpcpb.RemoveLockObserverResponse{}, nil
}

// VerGet implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) {
panic("unimplemented")
}

// VerBatchGet implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) {
panic("unimplemented")
}

// VerMut implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) {
panic("unimplemented")
}

// VerBatchMut implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) {
panic("unimplemented")
}

// VerScan implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) {
panic("unimplemented")
}

// VerDeleteRange implements implements the tikvpb.TikvServer interface.
func (svr *Server) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) {
panic("unimplemented")
}

// CheckLeader implements implements the tikvpb.TikvServer interface.
func (svr *Server) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) {
panic("unimplemented")
Expand Down
5 changes: 5 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,11 @@ type RegionVerID struct {
ver uint64
}

// NewRegionVerID creates a region ver id, which used for invalidating regions.
func NewRegionVerID(id, confVer, ver uint64) RegionVerID {
return RegionVerID{id, confVer, ver}
}

// GetID returns the id of the region
func (r *RegionVerID) GetID() uint64 {
return r.id
Expand Down
24 changes: 0 additions & 24 deletions store/tikv/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,30 +471,6 @@ func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexReques
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerGet(context.Context, *kvrpcpb.VerGetRequest) (*kvrpcpb.VerGetResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerBatchGet(context.Context, *kvrpcpb.VerBatchGetRequest) (*kvrpcpb.VerBatchGetResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerMut(context.Context, *kvrpcpb.VerMutRequest) (*kvrpcpb.VerMutResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerBatchMut(context.Context, *kvrpcpb.VerBatchMutRequest) (*kvrpcpb.VerBatchMutResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerScan(context.Context, *kvrpcpb.VerScanRequest) (*kvrpcpb.VerScanResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) VerDeleteRange(context.Context, *kvrpcpb.VerDeleteRangeRequest) (*kvrpcpb.VerDeleteRangeResponse, error) {
return nil, errors.New("unreachable")
}

func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) {
return nil, errors.New("unreachable")
}
Expand Down

0 comments on commit c269b11

Please sign in to comment.