-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gc: add resolve locks interface for tidb gc_worker #945
Changes from all commits
e8dd618
7bc27ef
ef73d7a
39e4a87
b2f7321
1cd747c
fe522b5
948f18b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,9 @@ import ( | |
zap "go.uber.org/zap" | ||
) | ||
|
||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. | ||
const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 | ||
|
||
// GC does garbage collection (GC) of the TiKV cluster. | ||
// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee | ||
// | ||
|
@@ -81,8 +84,9 @@ func WithConcurrency(concurrency int) GCOpt { | |
} | ||
|
||
func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { | ||
lockResolver := NewRegionLockResolver("gc-client-go-api", s) | ||
handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { | ||
return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey) | ||
return ResolveLocksForRange(ctx, lockResolver, safePoint, r.StartKey, r.EndKey, NewGcResolveLockMaxBackoffer, GCScanLockLimit) | ||
} | ||
|
||
runner := rangetask.NewRangeTaskRunner("resolve-locks-runner", s, concurrency, handler) | ||
|
@@ -94,72 +98,131 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc | |
return nil | ||
} | ||
|
||
// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. | ||
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2 | ||
type BaseRegionLockResolver struct { | ||
identifier string | ||
store Storage | ||
} | ||
|
||
func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { | ||
return &BaseRegionLockResolver{ | ||
identifier: identifier, | ||
store: store, | ||
} | ||
} | ||
|
||
func (l *BaseRegionLockResolver) Identifier() string { | ||
return l.identifier | ||
} | ||
|
||
func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) { | ||
func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { | ||
return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) | ||
} | ||
|
||
func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { | ||
return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) | ||
} | ||
|
||
func (l *BaseRegionLockResolver) GetStore() Storage { | ||
return l.store | ||
} | ||
|
||
// RegionLockResolver is used for GCWorker and log backup advancer to resolve locks in a region. | ||
type RegionLockResolver interface { | ||
// Identifier represents the name of this resolver. | ||
Identifier() string | ||
|
||
// ResolveLocksInOneRegion tries to resolve expired locks for one region. | ||
// 1. For GCWorker it will scan locks before *safepoint*, | ||
// and force remove these locks. rollback the txn, no matter the lock is expired of not. | ||
// 2. For log backup advancer, it will scan all locks for a small range. | ||
// and it will check status of the txn. resolve the locks if txn is expired, Or do nothing. | ||
// | ||
// regionLocation should return if resolve locks succeed. if regionLocation return nil, | ||
// which means not all locks are resolved in someway. the caller should retry scan locks. | ||
// ** the locks are assumed sorted by key in ascending order ** | ||
ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, regionLocation *locate.KeyLocation) (*locate.KeyLocation, error) | ||
|
||
// ScanLocksInOneRegion return locks and location with given start key in a region. | ||
// The return result ([]*Lock, *KeyLocation, error) represents the all locks in a regionLocation. | ||
// which will used by ResolveLocksInOneRegion later. | ||
ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) | ||
|
||
// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. | ||
GetStore() Storage | ||
} | ||
|
||
func ResolveLocksForRange( | ||
ctx context.Context, | ||
resolver RegionLockResolver, | ||
maxVersion uint64, | ||
startKey []byte, | ||
endKey []byte, | ||
createBackoffFn func(context.Context) *Backoffer, | ||
scanLimit uint32, | ||
) (rangetask.TaskStat, error) { | ||
// for scan lock request, we must return all locks even if they are generated | ||
// by the same transaction. because gc worker need to make sure all locks have been | ||
// cleaned. | ||
|
||
var stat rangetask.TaskStat | ||
key := startKey | ||
bo := NewGcResolveLockMaxBackoffer(ctx) | ||
// create new backoffer for every scan and resolve locks | ||
bo := createBackoffFn(ctx) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return stat, errors.New("[gc worker] gc job canceled") | ||
default: | ||
} | ||
|
||
locks, loc, err := s.scanLocksInRegionWithStartKey(bo, key, safePoint, gcScanLockLimit) | ||
locks, loc, err := resolver.ScanLocksInOneRegion(bo, key, maxVersion, scanLimit) | ||
if err != nil { | ||
return stat, err | ||
} | ||
|
||
resolvedLocation, err1 := s.batchResolveLocksInARegion(bo, locks, loc) | ||
if err1 != nil { | ||
return stat, err1 | ||
resolvedLocation, err := resolver.ResolveLocksInOneRegion(bo, locks, loc) | ||
if err != nil { | ||
return stat, err | ||
} | ||
// resolve locks failed since the locks are not in one region anymore, need retry. | ||
if resolvedLocation == nil { | ||
continue | ||
} | ||
Comment on lines
186
to
188
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously the backoffer is created outside the loop and recreated whenever the code in loop is successfully. Now you made the backoffer recreated every time in the loop, and as a result when this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. good catch. |
||
if len(locks) < gcScanLockLimit { | ||
if len(locks) < int(scanLimit) { | ||
stat.CompletedRegions++ | ||
key = loc.EndKey | ||
logutil.Logger(ctx).Info("[gc worker] one region finshed ", | ||
logutil.Logger(ctx).Debug("resolve one region finshed ", | ||
zap.String("identifier", resolver.Identifier()), | ||
zap.Int("regionID", int(resolvedLocation.Region.GetID())), | ||
zap.Int("resolvedLocksNum", len(locks))) | ||
} else { | ||
logutil.Logger(ctx).Info("[gc worker] region has more than limit locks", | ||
logutil.Logger(ctx).Info("region has more than limit locks", | ||
zap.String("identifier", resolver.Identifier()), | ||
zap.Int("regionID", int(resolvedLocation.Region.GetID())), | ||
zap.Int("resolvedLocksNum", len(locks)), | ||
zap.Int("scan lock limit", gcScanLockLimit)) | ||
zap.Uint32("scan lock limit", scanLimit)) | ||
key = locks[len(locks)-1].Key | ||
} | ||
|
||
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) { | ||
break | ||
} | ||
bo = NewGcResolveLockMaxBackoffer(ctx) | ||
bo = createBackoffFn(ctx) | ||
} | ||
return stat, nil | ||
} | ||
|
||
func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { | ||
func scanLocksInOneRegionWithStartKey(bo *retry.Backoffer, store Storage, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) { | ||
for { | ||
loc, err := s.GetRegionCache().LocateKey(bo, startKey) | ||
loc, err := store.GetRegionCache().LocateKey(bo, startKey) | ||
if err != nil { | ||
return nil, loc, err | ||
} | ||
req := tikvrpc.NewRequest(tikvrpc.CmdScanLock, &kvrpcpb.ScanLockRequest{ | ||
MaxVersion: maxVersion, | ||
Limit: gcScanLockLimit, | ||
Limit: limit, | ||
StartKey: startKey, | ||
EndKey: loc.EndKey, | ||
}) | ||
resp, err := s.SendReq(bo, req, loc.Region, ReadTimeoutMedium) | ||
resp, err := store.SendReq(bo, req, loc.Region, ReadTimeoutMedium) | ||
if err != nil { | ||
return nil, loc, err | ||
} | ||
|
@@ -190,15 +253,18 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey [] | |
} | ||
} | ||
|
||
// batchResolveLocksInARegion resolves locks in a region. | ||
// batchResolveLocksInOneRegion resolves locks in a region. | ||
// It returns the real location of the resolved locks if resolve locks success. | ||
// It returns error when meet an unretryable error. | ||
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err. | ||
// Used it in gcworker only! | ||
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { | ||
func batchResolveLocksInOneRegion(bo *Backoffer, store Storage, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) { | ||
if expectedLoc == nil { | ||
return nil, nil | ||
} | ||
resolvedLocation = expectedLoc | ||
for { | ||
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) | ||
ok, err := store.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region) | ||
if ok { | ||
return resolvedLocation, nil | ||
} | ||
|
@@ -209,7 +275,7 @@ func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Loc | |
if err != nil { | ||
return nil, err | ||
} | ||
region, err1 := s.GetRegionCache().LocateKey(bo, locks[0].Key) | ||
region, err1 := store.GetRegionCache().LocateKey(bo, locks[0].Key) | ||
if err1 != nil { | ||
return nil, err1 | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please emphasize in comments that
locks
is assumed to be sorted.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated. PTAL