diff --git a/tikv/gc.go b/tikv/gc.go index 2b47e6bca..0ce3bbc5f 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -35,6 +35,9 @@ import ( zap "go.uber.org/zap" ) +// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. +const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 + // GC does garbage collection (GC) of the TiKV cluster. // GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee // @@ -81,8 +84,9 @@ func WithConcurrency(concurrency int) GCOpt { } func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { + lockResolver := NewRegionLockResolver("gc-client-go-api", s) handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { - return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey) + return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit) } runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler) @@ -94,72 +98,131 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc return nil } -// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. -const gcScanLockLimit = txnlock.ResolvedCacheSize / 2 +type BaseRegionLockResolver struct { + identifier string + store Storage +} + +func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { + return &BaseRegionLockResolver{ + identifier: identifier, + store: store, + } +} + +func (l *BaseRegionLockResolver) Identifier() string { + return l.identifier +} -func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) { +func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { + return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) +} + +func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { + return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) +} + +func (l *BaseRegionLockResolver) GetStore() Storage { + return l.store +} + +// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region. +type RegionLockResolver interface { + // Identifier represents the name of this resolver. + Identifier() string + + // ResolveLocksInOneRegion tries to resolve expired locks for one region. + // 1. For GCWorker it will scan locks before *safepoint*, + // and force remove these locks. rollback the txn, no matter the lock is expired of not. + // 2. For log backup advancer, it will scan all locks for a small range. + // and it will check status of the txn. resolve the locks if txn is expired, Or do nothing. + // + // regionLocation should return if resolve locks succeed. if regionLocation return nil, + // which means not all locks are resolved in someway. the caller should retry scan locks. + // ** the locks are assumed sorted by key in ascending order ** + ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error) + + // ScanLocksInOneRegion return locks and location with given start key in a region. + // The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation. + // which will used by ResolveLocksInOneRegion later. + ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) + + // GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. + GetStore() Storage +} + +func ResolveLocksForRange( + ctx context.Context, + resolver RegionLockResolver, + maxVersion uint64, + startKey []byte, + endKey []byte, + createBackoffFn func(context.Context) *Backoffer, + scanLimit uint32, +) (rangetask.TaskStat, error) { // for scan lock request, we must return all locks even if they are generated // by the same transaction. because gc worker need to make sure all locks have been // cleaned. - var stat rangetask.TaskStat key := startKey - bo := NewGcResolveLockMaxBackoffer(ctx) + // create new backoffer for every scan and resolve locks + bo := createBackoffFn(ctx) for { select { case <-ctx.Done(): return stat, errors.New("[gc worker] gc job canceled") default: } - - locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit) + locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit) if err != nil { return stat, err } - resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc) - if err1 != nil { - return stat, err1 + resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc) + if err != nil { + return stat, err } // resolve locks failed since the locks are not in one region anymore, need retry. if resolvedLocation == nil { continue } - if len(locks) < gcScanLockLimit { + if len(locks) < int(scanLimit) { stat.CompletedRegions++ key = loc.EndKey - logutil.Logger(ctx).Info("[gc worker] one region finshed ", + logutil.Logger(ctx).Debug("resolve one region finshed ", + zap.String("identifier", resolver.Identifier()), zap.Int("regionID", int(resolvedLocation.Region.GetID())), zap.Int("resolvedLocksNum", len(locks))) } else { - logutil.Logger(ctx).Info("[gc worker] region has more than limit locks", + logutil.Logger(ctx).Info("region has more than limit locks", + zap.String("identifier", resolver.Identifier()), zap.Int("regionID", int(resolvedLocation.Region.GetID())), zap.Int("resolvedLocksNum", len(locks)), - zap.Int("scan lock limit", gcScanLockLimit)) + zap.Uint32("scan lock limit", scanLimit)) key = locks[len(locks)-1].Key } if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) { break } - bo = NewGcResolveLockMaxBackoffer(ctx) + bo = createBackoffFn(ctx) } return stat, nil } -func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { +func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { for { - loc, err := s.GetRegionCache().LocateKey(bo, startKey) + loc, err := store.GetRegionCache().LocateKey(bo, startKey) if err != nil { return nil, loc, err } req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ MaxVersion: maxVersion, - Limit: gcScanLockLimit, + Limit: limit, StartKey: startKey, EndKey: loc.EndKey, }) - resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium) + resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium) if err != nil { return nil, loc, err } @@ -190,15 +253,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey [] } } -// batchResolveLocksInARegion resolves locks in a region. +// batchResolveLocksInOneRegion resolves locks in a region. // It returns the real location of the resolved locks if resolve locks success. // It returns error when meet an unretryable error. // When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err. // Used it in gcworker only! -func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { +func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { + if expectedLoc == nil { + return nil, nil + } resolvedLocation = expectedLoc for { - ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) + ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) if ok { return resolvedLocation, nil } @@ -209,7 +275,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc if err != nil { return nil, err } - region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key) + region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key) if err1 != nil { return nil, err1 } diff --git a/tikv/test_probe.go b/tikv/test_probe.go index 5971480f3..234d2585f 100644 --- a/tikv/test_probe.go +++ b/tikv/test_probe.go @@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV outerLoop: for { - locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit) + locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit) if err != nil { return nil, err }