Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc: add resolve locks interface for tidb gc_worker #945

Merged
merged 8 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 88 additions & 24 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import (
zap "go.uber.org/zap"
)

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const GCScanLockLimit = txnlock.ResolvedCacheSize / 2

// GC does garbage collection (GC) of the TiKV cluster.
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
//
Expand Down Expand Up @@ -81,8 +84,9 @@ func WithConcurrency(concurrency int) GCOpt {
}

func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
lockResolver := NewRegionLockResolver("gc-client-go-api", s)
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit)
}

runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
Expand All @@ -94,72 +98,129 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
return nil
}

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2
type BaseRegionLockResolver struct {
identifier string
store Storage
}

func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver {
return &BaseRegionLockResolver{
identifier: identifier,
store: store,
}
}

func (l *BaseRegionLockResolver) Identifier() string {
return l.identifier
}

func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) {
return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc)
}

func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit)
}

func (l *BaseRegionLockResolver) GetStore() Storage {
return l.store
}

// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region.
type RegionLockResolver interface {
// Identifier represents the name of this resolver.
Identifier() string

// ResolveLocksInOneRegion tries to resolve expired locks for one region.
// 1. For GCWorker it will scan locks before *safepoint*,
// and force remove these locks. rollback the txn, no matter the lock is expired of not.
// 2. For log backup advancer, it will scan all locks for a small range.
// and it will check status of the txn. resolve the locks if txn is expired, Or do nothing.
//
// regionLocation should return if resolve locks succeed. if regionLocation return nil,
// which means not all locks are resolved in someway. the caller should retry scan locks.
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please emphasize in comments that locks is assumed to be sorted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. PTAL


// ScanLocksInOneRegion return locks and location with given start key in a region.
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation.
// which will used by ResolveLocksInOneRegion later.
ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)

// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
GetStore() Storage
}

func ResolveLocksForRange(
ctx context.Context,
resolver RegionLockResolver,
maxVersion uint64,
startKey []byte,
endKey []byte,
createBackoffFn func(context.Context) *Backoffer,
scanLimit uint32,
) (rangetask.TaskStat, error) {
// for scan lock request, we must return all locks even if they are generated
// by the same transaction. because gc worker need to make sure all locks have been
// cleaned.

var stat rangetask.TaskStat
key := startKey
bo := NewGcResolveLockMaxBackoffer(ctx)
for {
select {
case <-ctx.Done():
return stat, errors.New("[gc worker] gc job canceled")
default:
}

locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit)
// create new backoffer for every scan and resolve locks
bo := createBackoffFn(ctx)
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit)
if err != nil {
return stat, err
}

resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc)
if err1 != nil {
return stat, err1
resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc)
if err != nil {
return stat, err
}
// resolve locks failed since the locks are not in one region anymore, need retry.
if resolvedLocation == nil {
continue
}
Comment on lines 186 to 188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the backoffer is created outside the loop and recreated whenever the code in loop is successfully. Now you made the backoffer recreated every time in the loop, and as a result when this continue is executed, the backoffer is not counted as expected. Please consider keeping the old logic about backoff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. good catch.

if len(locks) < gcScanLockLimit {
if len(locks) < int(scanLimit) {
stat.CompletedRegions++
key = loc.EndKey
logutil.Logger(ctx).Info("[gc worker] one region finshed ",
logutil.Logger(ctx).Debug("resolve one region finshed ",
zap.String("identifier", resolver.Identifier()),
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)))
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
logutil.Logger(ctx).Info("region has more than limit locks",
zap.String("identifier", resolver.Identifier()),
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)),
zap.Int("scan lock limit", gcScanLockLimit))
zap.Uint32("scan lock limit", scanLimit))
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = NewGcResolveLockMaxBackoffer(ctx)
}
return stat, nil
}

func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
for {
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return nil, loc, err
}
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: gcScanLockLimit,
Limit: limit,
StartKey: startKey,
EndKey: loc.EndKey,
})
resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return nil, loc, err
}
Expand Down Expand Up @@ -190,15 +251,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
}
}

// batchResolveLocksInARegion resolves locks in a region.
// batchResolveLocksInOneRegion resolves locks in a region.
// It returns the real location of the resolved locks if resolve locks success.
// It returns error when meet an unretryable error.
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
// Used it in gcworker only!
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
if expectedLoc == nil {
return nil, nil
}
resolvedLocation = expectedLoc
for {
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
if ok {
return resolvedLocation, nil
}
Expand All @@ -209,7 +273,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc
if err != nil {
return nil, err
}
region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key)
region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key)
if err1 != nil {
return nil, err1
}
Expand Down
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
locks, loc, err := scanLocksInOneRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
Expand Down
Loading