Skip to content

Commit

Permalink
cherry pick #16413 to release-4.0 (#16949)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored May 6, 2020
1 parent 0dd4a08 commit d551807
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 58 deletions.
135 changes: 79 additions & 56 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,6 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren

// First try resolve locks with physical scan
err := w.resolveLocksPhysical(ctx, safePoint)

if err == nil {
return nil
}
Expand Down Expand Up @@ -1071,27 +1070,31 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
zap.Uint64("safePoint", safePoint))
startTime := time.Now()

stores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}

defer func() {
w.removeLockObservers(ctx, safePoint, stores)
}()
registeredStores := make(map[uint64]*metapb.Store)
defer w.removeLockObservers(ctx, safePoint, registeredStores)

err = w.registerLockObservers(ctx, safePoint, stores)
dirtyStores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}

for retry := 0; retry < 3; retry++ {
resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, stores)
err = w.registerLockObservers(ctx, safePoint, dirtyStores)
if err != nil {
return errors.Trace(err)
}
for id, store := range dirtyStores {
registeredStores[id] = store
}

resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, dirtyStores)
if err != nil {
return errors.Trace(err)
}

stores, err = w.getUpStoresMapForGC(ctx)
failpoint.Inject("beforeCheckLockObservers", func() {})

stores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1101,22 +1104,38 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
return errors.Trace(err)
}

// Remove clean stores from the set
for resolvedStore := range resolvedStores {
// Only stores that are both resolved and checked is clean.
// For each clean store, remove it from the stores set.
if _, ok := checkedStores[resolvedStore]; ok {
delete(stores, resolvedStore)
for store := range stores {
if _, ok := checkedStores[store]; ok {
// The store is resolved and checked.
if _, ok := resolvedStores[store]; ok {
delete(stores, store)
}
// The store is checked and has been resolved before.
if _, ok := dirtyStores[store]; !ok {
delete(stores, store)
}
// If the store is checked and not resolved, we can retry to resolve it again, so leave it in dirtyStores.
} else if _, ok := registeredStores[store]; ok {
// The store has been registered and it's dirty due to too many collected locks. Fall back to legacy mode.
// We can't remove the lock observer from the store and retry the whole procedure because if the store
// receives duplicated remove and register requests during resolving locks, the store will be cleaned
// when checking but the lock observer drops some locks. It may results in missing locks.
return errors.Errorf("store %v is dirty", store)
}
}
dirtyStores = stores

// If there are still dirty stores, continue the loop to clean them again.
// Only dirty stores will be scanned in the next loop.
if len(stores) == 0 {
if len(dirtyStores) == 0 {
break
}
}

if len(dirtyStores) != 0 {
return errors.Errorf("still has %d dirty stores after physical resolve locks", len(dirtyStores))
}

logutil.Logger(ctx).Info("[gc worker] finish resolve locks with physical scan locks",
zap.String("uuid", w.uuid),
zap.Uint64("safePoint", safePoint),
Expand All @@ -1141,7 +1160,9 @@ func (w *GCWorker) registerLockObservers(ctx context.Context, safePoint uint64,
if err != nil {
return errors.Trace(err)
}

if resp.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}
errStr := resp.Resp.(*kvrpcpb.RegisterLockObserverResponse).Error
if len(errStr) > 0 {
return errors.Errorf("register lock observer on store %v returns error: %v", store.Id, errStr)
Expand All @@ -1161,31 +1182,41 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto
req := tikvrpc.NewRequest(tikvrpc.CmdCheckLockObserver, &kvrpcpb.CheckLockObserverRequest{
MaxTs: safePoint,
})

cleanStores := make(map[uint64]interface{}, len(stores))

logError := func(store *metapb.Store, err error) {
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
}

// When error occurs, this function doesn't fail immediately, but continues without adding the failed store to
// cleanStores set.

for _, store := range stores {
address := store.Address

resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
if resp.Resp == nil {
logError(store, tikv.ErrBodyMissing)
continue
}

respInner := resp.Resp.(*kvrpcpb.CheckLockObserverResponse)
if len(respInner.Error) > 0 {
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
logError(store, err)
continue
}

// No need to resolve observed locks on uncleaned stores.
if !respInner.IsClean {
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
zap.Any("store", store))
continue
}

Expand All @@ -1202,21 +1233,11 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto

if err != nil {
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
}

if respInner.IsClean {
cleanStores[store.Id] = nil
} else {
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
zap.String("uuid", w.uuid),
zap.Any("store", store))
}
cleanStores[store.Id] = nil
}

return cleanStores, nil
Expand All @@ -1231,25 +1252,29 @@ func (w *GCWorker) removeLockObservers(ctx context.Context, safePoint uint64, st
MaxTs: safePoint,
})

logError := func(store *metapb.Store, err error) {
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
}

for _, store := range stores {
address := store.Address

resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
if err != nil {
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
continue
}
if resp.Resp == nil {
logError(store, tikv.ErrBodyMissing)
continue
}

errStr := resp.Resp.(*kvrpcpb.RemoveLockObserverResponse).Error
if len(errStr) > 0 {
err = errors.Errorf("remove lock observer on store %v returns error: %v", store.Id, errStr)
logutil.Logger(ctx).Error("[gc worker] failed to remove lock observer from store",
zap.String("uuid", w.uuid),
zap.Any("store", store),
zap.Error(err))
logError(store, err)
}
}
}
Expand Down Expand Up @@ -1982,12 +2007,10 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo
if err != nil {
return errors.Trace(err)
}

resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
if resp == nil {
return errors.Errorf("physical scan lock response is nil")
if response.Resp == nil {
return errors.Trace(tikv.ErrBodyMissing)
}

resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
if len(resp.Error) > 0 {
return errors.Errorf("physical scan lock received error from store: %v", resp.Error)
}
Expand Down
Loading

0 comments on commit d551807

Please sign in to comment.