Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc: add resolve locks interface for tidb gc_worker #945

Merged
merged 8 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func WithConcurrency(concurrency int) GCOpt {
}

func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error {
lockResolver := NewBaseLockResolver("gc-client-go-api", s)
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)
return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit)
}

runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler)
Expand All @@ -94,72 +95,124 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
return nil
}

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2
type BaseLockResolver struct {
identifier string
store Storage
}

func NewBaseLockResolver(identifier string, store Storage) *BaseLockResolver {
return &BaseLockResolver{
identifier: identifier,
store: store,
}
}

func (l *BaseLockResolver) Identifier() string {
return l.identifier
}

func (l *BaseLockResolver) ResolveLocks(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) {
return batchResolveLocksInARegion(bo, l.GetStore(), locks, loc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function batchResolveLocksInARegion only resolves locks in the single region specified by loc. If error happens, it may reload the region info, but still only handles the first region containing the key locks[0]. If locks locates in more than one region, the remaining will be ignored. This works in the GC logic (or the scanlocks-resolvelocks for range logic) because it guarantees locks passed to this function is sorted, and the returned KeyLocation indicates the next key to scan the next batch, so no lock will be missed. However if you public this function to be accessible everywhere, it's very weird that it doesn't promise to completely handle the given locks and returns a KeyLocation, and is very very very likely to cause misusing. Please consider adjust the implementation to suite the interface's semantics or any other way to avoid the problem.

}

func (l *BaseLockResolver) ScanLocks(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) {
return scanLocksInRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit)
}

func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
func (l *BaseLockResolver) GetStore() Storage {
return l.store
}

// GCLockResolver is used for GCWorker and log backup advancer to resolve locks.
type GCLockResolver interface {
Identifier() string
// ResolveLocks tries to resolve expired locks.
// 1. For GCWorker it will scan locks for all regions before *safepoint*,
// and force remove locks. rollback the txn, no matter the lock is expired of not.
// 2. For log backup advancer, it will scan all locks for a small range.
// and it will check status of the txn. resolve the locks if txn is expired, Or do nothing.
ResolveLocks(*Backoffer, []*txnlock.Lock, *locate.KeyLocation) (*locate.KeyLocation, error)

// ScanLocks return locks and location with given start key
ScanLocks(*Backoffer, []byte, uint64, uint32) ([]*txnlock.Lock, *locate.KeyLocation, error)

// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver.
GetStore() Storage
}

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const GCScanLockLimit = txnlock.ResolvedCacheSize / 2

func ResolveLocksForRange(
ctx context.Context,
resolver GCLockResolver,
maxVersion uint64,
startKey []byte,
endKey []byte,
createBackoffFn func(context.Context) *Backoffer,
scanLimit uint32,
) (rangetask.TaskStat, error) {
// for scan lock request, we must return all locks even if they are generated
// by the same transaction. because gc worker need to make sure all locks have been
// cleaned.

var stat rangetask.TaskStat
key := startKey
bo := NewGcResolveLockMaxBackoffer(ctx)
for {
select {
case <-ctx.Done():
return stat, errors.New("[gc worker] gc job canceled")
default:
}

locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit)
// create new backoffer for every scan and resolve locks
bo := createBackoffFn(ctx)
locks, loc, err := resolver.ScanLocks(bo, key, maxVersion, scanLimit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TiDB has this test code:

		if w.testingKnobs.scanLocks != nil {
			locks = append(locks, w.testingKnobs.scanLocks(key, loc.Region.GetID(), tryResolveLocksTS)...)
		}

This is used to inject some simulated locks for tests. Did you notice it, and do you have any idea how to support that kind of tests after the change?
This

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,In TiDB test a new test struct will override ScanLock method to generate expected locks.

if err != nil {
return stat, err
}

resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc)
if err1 != nil {
return stat, err1
resolvedLocation, err := resolver.ResolveLocks(bo, locks, loc)
if err != nil {
return stat, err
}
// resolve locks failed since the locks are not in one region anymore, need retry.
if resolvedLocation == nil {
continue
}
Comment on lines 186 to 188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the backoffer is created outside the loop and recreated whenever the code in loop is successfully. Now you made the backoffer recreated every time in the loop, and as a result when this continue is executed, the backoffer is not counted as expected. Please consider keeping the old logic about backoff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. good catch.

if len(locks) < gcScanLockLimit {
if len(locks) < int(scanLimit) {
stat.CompletedRegions++
key = loc.EndKey
logutil.Logger(ctx).Info("[gc worker] one region finshed ",
logutil.Logger(ctx).Debug("[gc worker] one region finshed ",
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)))
} else {
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks",
logutil.Logger(ctx).Info("region has more than limit locks",
zap.String("identifier", resolver.Identifier()),
zap.Int("regionID", int(resolvedLocation.Region.GetID())),
zap.Int("resolvedLocksNum", len(locks)),
zap.Int("scan lock limit", gcScanLockLimit))
zap.Uint32("scan lock limit", scanLimit))
key = locks[len(locks)-1].Key
}

if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = NewGcResolveLockMaxBackoffer(ctx)
}
return stat, nil
}

func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
func scanLocksInRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
for {
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
loc, err := store.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return nil, loc, err
}
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{
MaxVersion: maxVersion,
Limit: gcScanLockLimit,
Limit: limit,
StartKey: startKey,
EndKey: loc.EndKey,
})
resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium)
if err != nil {
return nil, loc, err
}
Expand Down Expand Up @@ -195,10 +248,13 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
// It returns error when meet an unretryable error.
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
// Used it in gcworker only!
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
func batchResolveLocksInARegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
if expectedLoc == nil {
return nil, nil
}
resolvedLocation = expectedLoc
for {
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
if ok {
return resolvedLocation, nil
}
Expand All @@ -209,7 +265,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc
if err != nil {
return nil, err
}
region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key)
region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key)
if err1 != nil {
return nil, err1
}
Expand Down
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s StoreProbe) ScanLocks(ctx context.Context, startKey, endKey []byte, maxV

outerLoop:
for {
locks, loc, err := s.KVStore.scanLocksInRegionWithStartKey(bo, startKey, maxVersion, limit)
locks, loc, err := scanLocksInRegionWithStartKey(bo, s.KVStore, startKey, maxVersion, limit)
if err != nil {
return nil, err
}
Expand Down
Loading