Skip to content

Commit

Permalink
gc: add resolve locks interface for tidb gc_worker (#945) (#964)
Browse files Browse the repository at this point in the history
Signed-off-by: 3pointer <luancheng@pingcap.com>
  • Loading branch information
3pointer authored Sep 18, 2023
1 parent ddb89d5 commit 0108750
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 25 deletions.
114 changes: 90 additions & 24 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
zap "go.uber.org/zap"
)

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const GCScanLockLimit = txnlock.ResolvedCacheSize / 2

// GC does garbage collection (GC) of the TiKV cluster.
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
//
Expand All @@ -60,8 +63,9 @@ func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64
}

func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
lockResolver := NewRegionLockResolver("gc-client-go-api", s)
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit)
}

runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
Expand All @@ -73,72 +77,131 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
return nil
}

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2
type BaseRegionLockResolver struct {
identifier string
store Storage
}

func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver {
return &BaseRegionLockResolver{
identifier: identifier,
store: store,
}
}

func (l *BaseRegionLockResolver) Identifier() string {
return l.identifier
}

func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) {
return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc)
}

func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit)
}

func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
func (l *BaseRegionLockResolver) GetStore() Storage {
return l.store
}

// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region.
type RegionLockResolver interface {
// Identifier represents the name of this resolver.
Identifier() string

// ResolveLocksInOneRegion tries to resolve expired locks for one region.
// 1. For GCWorker it will scan locks before *safepoint*,
// and force remove these locks. rollback the txn, no matter the lock is expired of not.
// 2. For log backup advancer, it will scan all locks for a small range.
// and it will check status of the txn. resolve the locks if txn is expired, Or do nothing.
//
// regionLocation should return if resolve locks succeed. if regionLocation return nil,
// which means not all locks are resolved in someway. the caller should retry scan locks.
// ** the locks are assumed sorted by key in ascending order **
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error)

// ScanLocksInOneRegion return locks and location with given start key in a region.
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation.
// which will used by ResolveLocksInOneRegion later.
ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)

// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
GetStore() Storage
}

func ResolveLocksForRange(
ctx context.Context,
resolver RegionLockResolver,
maxVersion uint64,
startKey []byte,
endKey []byte,
createBackoffFn func(context.Context) *Backoffer,
scanLimit uint32,
) (rangetask.TaskStat, error) {
// for scan lock request, we must return all locks even if they are generated
// by the same transaction. because gc worker need to make sure all locks have been
// cleaned.

var stat rangetask.TaskStat
key := startKey
bo := NewGcResolveLockMaxBackoffer(ctx)
// create new backoffer for every scan and resolve locks
bo := createBackoffFn(ctx)
for {
select {
case <-ctx.Done():
return stat, errors.New("[gc worker] gc job canceled")
default:
}

locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit)
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit)
if err != nil {
return stat, err
}

resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc)
if err1 != nil {
return stat, err1
resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc)
if err != nil {
return stat, err
}
// resolve locks failed since the locks are not in one region anymore, need retry.
if resolvedLocation == nil {
continue
}
if len(locks) < gcScanLockLimit {
if len(locks) < int(scanLimit) {
stat.CompletedRegions++
key = loc.EndKey
logutil.Logger(ctx).Info("[gc worker] one region finshed ",
logutil.Logger(ctx).Debug("resolve one region finshed ",
zap.String("identifier", resolver.Identifier()),
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)))
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
logutil.Logger(ctx).Info("region has more than limit locks",
zap.String("identifier", resolver.Identifier()),
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)),
zap.Int("scan lock limit", gcScanLockLimit))
zap.Uint32("scan lock limit", scanLimit))
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = NewGcResolveLockMaxBackoffer(ctx)
bo = createBackoffFn(ctx)
}
return stat, nil
}

func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
for {
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return nil, loc, err
}
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: gcScanLockLimit,
Limit: limit,
StartKey: startKey,
EndKey: loc.EndKey,
})
resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return nil, loc, err
}
Expand Down Expand Up @@ -169,15 +232,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
}
}

// batchResolveLocksInARegion resolves locks in a region.
// batchResolveLocksInOneRegion resolves locks in a region.
// It returns the real location of the resolved locks if resolve locks success.
// It returns error when meet an unretryable error.
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
// Used it in gcworker only!
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
if expectedLoc == nil {
return nil, nil
}
resolvedLocation = expectedLoc
for {
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
if ok {
return resolvedLocation, nil
}
Expand All @@ -188,7 +254,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc
if err != nil {
return nil, err
}
region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key)
region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key)
if err1 != nil {
return nil, err1
}
Expand Down
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0108750

Please sign in to comment.