Skip to content

Commit

Permalink
close tikv#6107 fix online recovery timeout mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 authored and ti-chi-bot committed Mar 8, 2023
1 parent 51f382c commit 6325590
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 116 deletions.
240 changes: 128 additions & 112 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,22 @@ func (u *unsafeRecoveryController) reset() {
func (u *unsafeRecoveryController) IsRunning() bool {
u.RLock()
defer u.RUnlock()
return u.isRunningLocked()
}

func (u *unsafeRecoveryController) isRunningLocked() bool {
return u.stage != idle && u.stage != finished && u.stage != failed
}

// RemoveFailedStores removes failed stores from the cluster.
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
if u.IsRunning() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
u.Lock()
defer u.Unlock()

if u.isRunningLocked() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}

if !autoDetect {
if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
Expand Down Expand Up @@ -220,7 +225,9 @@ func (u *unsafeRecoveryController) Show() []StageOutput {
if u.stage == idle {
return []StageOutput{{Info: "No on-going recovery."}}
}
u.checkTimeout()
if err := u.checkTimeout(); err != nil {
u.HandleErr(err)
}
status := u.output
if u.stage != finished && u.stage != failed {
status = append(status, u.getReportStatus())
Expand Down Expand Up @@ -255,17 +262,15 @@ func (u *unsafeRecoveryController) getReportStatus() StageOutput {
return status
}

func (u *unsafeRecoveryController) checkTimeout() bool {
func (u *unsafeRecoveryController) checkTimeout() error {
if u.stage == finished || u.stage == failed {
return false
return nil
}

if time.Now().After(u.timeout) {
ret := u.HandleErr(errors.Errorf("Exceeds timeout %v", u.timeout))
u.timeout = time.Now().Add(storeRequestInterval * 2)
return ret
return errors.Errorf("Exceeds timeout %v", u.timeout)
}
return false
return nil
}

func (u *unsafeRecoveryController) HandleErr(err error) bool {
Expand All @@ -274,128 +279,139 @@ func (u *unsafeRecoveryController) HandleErr(err error) bool {
u.err = err
}
if u.stage == exitForceLeader {
// We already tried to exit force leader, and it still failed.
// We turn into failed stage directly. TiKV will step down force leader
// automatically after being for a long time.
u.changeStage(failed)
return true
}
// When encountering an error for the first time, we will try to exit force
// leader before turning into failed stage to avoid the leaking force leaders
// blocks reads and writes.
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
u.timeout = time.Now().Add(storeRequestInterval)
// empty recovery plan would trigger exit force leader
u.changeStage(exitForceLeader)
return false
}

// HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to
// send detailed report back.
func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) {
if !u.IsRunning() {
// no recovery in progress, do nothing
return
}
u.Lock()
defer u.Unlock()

if u.checkTimeout() {
if !u.isRunningLocked() {
// no recovery in progress, do nothing
return
}

allCollected := u.collectReport(heartbeat)
done, err := func() (bool, error) {
if err := u.checkTimeout(); err != nil {
return false, err
}

if allCollected {
newestRegionTree, peersMap, buildErr := u.buildUpFromReports()
if buildErr != nil && u.HandleErr(buildErr) {
return
allCollected, err := u.collectReport(heartbeat)
if err != nil {
return false, err
}

// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
if allCollected {
newestRegionTree, peersMap, err := u.buildUpFromReports()
if err != nil {
return false, err
}

var stage unsafeRecoveryStage
if u.err == nil {
stage = u.stage
} else {
stage = exitForceLeader
return u.generatePlan(newestRegionTree, peersMap)
}
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
// no need to generate plan, empty recovery plan triggers exit force leader on TiKV side
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
}
default:
panic("unreachable")
}
return false, nil
}()

if done || (err != nil && u.HandleErr(err)) {
return
}
u.dispatchPlan(heartbeat, resp)
}

func (u *unsafeRecoveryController) generatePlan(newestRegionTree *regionTree, peersMap map[uint64][]*regionItem) (bool, error) {
// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)

stage := u.stage
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
if u.HandleErr(err) {
return
}
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
// Clear the reports etc.
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
return
} else if !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return
}
break
default:
panic("unreachable")
}
break
}

u.dispatchPlan(heartbeat, resp)
if err == nil && !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return true, nil
}
return false, err
}

// It dispatches recovery plan if any.
Expand All @@ -421,22 +437,21 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe
}

// It collects and checks if store reports have been fully collected.
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) bool {
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool, error) {
storeID := heartbeat.Stats.StoreId
if _, isFailedStore := u.failedStores[storeID]; isFailedStore {
u.HandleErr(errors.Errorf("Receive heartbeat from failed store %d", storeID))
return false
return false, errors.Errorf("Receive heartbeat from failed store %d", storeID)
}

if heartbeat.StoreReport == nil {
return false
return false, nil
}

if heartbeat.StoreReport.GetStep() != u.step {
log.Info("Unsafe recovery receives invalid store report",
zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep()))
// invalid store report, ignore
return false
return false, nil
}

if report, exists := u.storeReports[storeID]; exists {
Expand All @@ -445,11 +460,11 @@ func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatR
if report == nil {
u.numStoresReported++
if u.numStoresReported == len(u.storeReports) {
return true
return true, nil
}
}
}
return false
return false, nil
}

// Gets the stage of the current unsafe recovery.
Expand Down Expand Up @@ -1204,6 +1219,7 @@ func (u *unsafeRecoveryController) generateExitForceLeaderPlan() bool {
for storeID, storeReport := range u.storeReports {
for _, peerReport := range storeReport.PeerReports {
if peerReport.IsForceLeader {
// empty recovery plan triggers exit force leader on TiKV side
_ = u.getRecoveryPlan(storeID)
hasPlan = true
break
Expand Down
Loading

0 comments on commit 6325590

Please sign in to comment.