diff --git a/internal/client/mock_server/mock_tikv_service.go b/internal/client/mock_server/mock_tikv_service.go index 392d3a5fb..a6d97b488 100644 --- a/internal/client/mock_server/mock_tikv_service.go +++ b/internal/client/mock_server/mock_tikv_service.go @@ -36,7 +36,7 @@ import ( "google.golang.org/grpc" ) -type MockServer struct { +type mockServer struct { tikvpb.TikvServer grpcServer *grpc.Server addr string @@ -49,28 +49,28 @@ type MockServer struct { } } -func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { +func (s *mockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err } return &kvrpcpb.GetResponse{}, nil } -func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { +func (s *mockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err } return &kvrpcpb.PrewriteResponse{}, nil } -func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { +func (s *mockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err } return ss.Send(&coprocessor.Response{}) } -func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { +func (s *mockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err } @@ -101,13 +101,13 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { } } -func (s *MockServer) SetMetaChecker(check func(context.Context) error) { +func (s *mockServer) SetMetaChecker(check func(context.Context) error) { s.metaChecker.Lock() s.metaChecker.check = check s.metaChecker.Unlock() } -func (s *MockServer) checkMetadata(ctx context.Context) error { +func (s *mockServer) checkMetadata(ctx context.Context) error { s.metaChecker.Lock() defer s.metaChecker.Unlock() if s.metaChecker.check != nil { @@ -116,20 +116,20 @@ func (s *MockServer) checkMetadata(ctx context.Context) error { return nil } -func (s *MockServer) IsRunning() bool { +func (s *mockServer) IsRunning() bool { return atomic.LoadInt64(&s.running) == 1 } -func (s *MockServer) Addr() string { +func (s *mockServer) Addr() string { return s.addr } -func (s *MockServer) Stop() { +func (s *mockServer) Stop() { s.grpcServer.Stop() atomic.StoreInt64(&s.running, 0) } -func (s *MockServer) Start(addr string) int { +func (s *mockServer) Start(addr string) int { if addr == "" { addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0) } @@ -160,8 +160,8 @@ func (s *MockServer) Start(addr string) int { } // StartMockTikvService try to start a gRPC server and retrun the server instance and binded port. -func StartMockTikvService() (*MockServer, int) { - server := &MockServer{} +func StartMockTikvService() (*mockServer, int) { + server := &mockServer{} port := server.Start("") return server, port } diff --git a/tikv/gc.go b/tikv/gc.go index a44ccf48c..e3ebcd373 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -35,7 +35,7 @@ import ( zap "go.uber.org/zap" ) -// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. +// GCScanLockLimit We don't want gc to sweep out the cached info belong to other processes, like coprocessor. const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 // GC does garbage collection (GC) of the TiKV cluster. @@ -77,11 +77,13 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc return nil } +// BaseRegionLockResolver is a base implementation of RegionLockResolver. type BaseRegionLockResolver struct { identifier string store Storage } +// NewRegionLockResolver creates a new BaseRegionLockResolver. func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { return &BaseRegionLockResolver{ identifier: identifier, @@ -89,18 +91,22 @@ func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockReso } } +// Identifier represents the name of this resolver. func (l *BaseRegionLockResolver) Identifier() string { return l.identifier } +// ResolveLocksInOneRegion tries to resolve expired locks for one region. func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) } +// ScanLocksInOneRegion return locks and location with given start key in a region. func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) } +// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. func (l *BaseRegionLockResolver) GetStore() Storage { return l.store } @@ -130,6 +136,7 @@ type RegionLockResolver interface { GetStore() Storage } +// ResolveLocksForRange resolves locks in a range. func ResolveLocksForRange( ctx context.Context, resolver RegionLockResolver,