Skip to content

Commit

Permalink
online recovery: fix online recovery timeout mechanism (tikv#6108)
Browse files Browse the repository at this point in the history
close tikv#6107

fix online recovery timeout mechanism

Signed-off-by: Connor1996 <zbk602423539@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
  • Loading branch information
2 people authored and v01dstar committed Mar 29, 2023
1 parent d75733f commit ba782a5
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 117 deletions.
240 changes: 128 additions & 112 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,22 @@ func (u *unsafeRecoveryController) reset() {
func (u *unsafeRecoveryController) IsRunning() bool {
u.RLock()
defer u.RUnlock()
return u.isRunningLocked()
}

func (u *unsafeRecoveryController) isRunningLocked() bool {
return u.stage != idle && u.stage != finished && u.stage != failed
}

// RemoveFailedStores removes failed stores from the cluster.
func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64) error {
if u.IsRunning() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
u.Lock()
defer u.Unlock()

if u.isRunningLocked() {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}

if len(failedStores) == 0 {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified")
}
Expand Down Expand Up @@ -216,7 +221,9 @@ func (u *unsafeRecoveryController) Show() []StageOutput {
if u.stage == idle {
return []StageOutput{{Info: "No on-going recovery."}}
}
u.checkTimeout()
if err := u.checkTimeout(); err != nil {
u.HandleErr(err)
}
status := u.output
if u.stage != finished && u.stage != failed {
status = append(status, u.getReportStatus())
Expand Down Expand Up @@ -251,17 +258,15 @@ func (u *unsafeRecoveryController) getReportStatus() StageOutput {
return status
}

func (u *unsafeRecoveryController) checkTimeout() bool {
func (u *unsafeRecoveryController) checkTimeout() error {
if u.stage == finished || u.stage == failed {
return false
return nil
}

if time.Now().After(u.timeout) {
ret := u.HandleErr(errors.Errorf("Exceeds timeout %v", u.timeout))
u.timeout = time.Now().Add(storeRequestInterval * 2)
return ret
return errors.Errorf("Exceeds timeout %v", u.timeout)
}
return false
return nil
}

func (u *unsafeRecoveryController) HandleErr(err error) bool {
Expand All @@ -270,128 +275,139 @@ func (u *unsafeRecoveryController) HandleErr(err error) bool {
u.err = err
}
if u.stage == exitForceLeader {
// We already tried to exit force leader, and it still failed.
// We turn into failed stage directly. TiKV will step down force leader
// automatically after being for a long time.
u.changeStage(failed)
return true
}
// When encountering an error for the first time, we will try to exit force
// leader before turning into failed stage to avoid the leaking force leaders
// blocks reads and writes.
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
u.timeout = time.Now().Add(storeRequestInterval * 2)
// empty recovery plan would trigger exit force leader
u.changeStage(exitForceLeader)
return false
}

// HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to
// send detailed report back.
func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) {
if !u.IsRunning() {
// no recovery in progress, do nothing
return
}
u.Lock()
defer u.Unlock()

if u.checkTimeout() {
if !u.isRunningLocked() {
// no recovery in progress, do nothing
return
}

allCollected := u.collectReport(heartbeat)
done, err := func() (bool, error) {
if err := u.checkTimeout(); err != nil {
return false, err
}

if allCollected {
newestRegionTree, peersMap, buildErr := u.buildUpFromReports()
if buildErr != nil && u.HandleErr(buildErr) {
return
allCollected, err := u.collectReport(heartbeat)
if err != nil {
return false, err
}

// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
if allCollected {
newestRegionTree, peersMap, err := u.buildUpFromReports()
if err != nil {
return false, err
}

var stage unsafeRecoveryStage
if u.err == nil {
stage = u.stage
} else {
stage = exitForceLeader
return u.generatePlan(newestRegionTree, peersMap)
}
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
// no need to generate plan, empty recovery plan triggers exit force leader on TiKV side
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
}
default:
panic("unreachable")
}
return false, nil
}()

if done || (err != nil && u.HandleErr(err)) {
return
}
u.dispatchPlan(heartbeat, resp)
}

func (u *unsafeRecoveryController) generatePlan(newestRegionTree *regionTree, peersMap map[uint64][]*regionItem) (bool, error) {
// clean up previous plan
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)

stage := u.stage
reCheck := false
hasPlan := false
var err error
for {
switch stage {
case collectReport:
fallthrough
case tombstoneTiFlashLearner:
if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(tombstoneTiFlashLearner)
break
}
if err != nil {
if u.HandleErr(err) {
return
}
u.storePlanExpires = make(map[uint64]time.Time)
u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan)
// Clear the reports etc.
break
}
fallthrough
case forceLeaderForCommitMerge:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil {
u.changeStage(forceLeaderForCommitMerge)
break
}
if err != nil {
break
}
fallthrough
case forceLeader:
if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil {
u.changeStage(forceLeader)
break
}
if err != nil {
break
}
fallthrough
case demoteFailedVoter:
if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan {
u.changeStage(demoteFailedVoter)
break
} else if !reCheck {
reCheck = true
stage = tombstoneTiFlashLearner
continue
}
fallthrough
case createEmptyRegion:
if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil {
u.changeStage(createEmptyRegion)
break
}
if err != nil {
break
}
fallthrough
case exitForceLeader:
if hasPlan = u.generateExitForceLeaderPlan(); hasPlan {
u.changeStage(exitForceLeader)
return
} else if !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return
}
break
default:
panic("unreachable")
}
break
}

u.dispatchPlan(heartbeat, resp)
if err == nil && !hasPlan {
if u.err != nil {
u.changeStage(failed)
} else {
u.changeStage(finished)
}
return true, nil
}
return false, err
}

// It dispatches recovery plan if any.
Expand All @@ -417,22 +433,21 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe
}

// It collects and checks if store reports have been fully collected.
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) bool {
func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool, error) {
storeID := heartbeat.Stats.StoreId
if _, isFailedStore := u.failedStores[storeID]; isFailedStore {
u.HandleErr(errors.Errorf("Receive heartbeat from failed store %d", storeID))
return false
return false, errors.Errorf("Receive heartbeat from failed store %d", storeID)
}

if heartbeat.StoreReport == nil {
return false
return false, nil
}

if heartbeat.StoreReport.GetStep() != u.step {
log.Info("Unsafe recovery receives invalid store report",
zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep()))
// invalid store report, ignore
return false
return false, nil
}

if report, exists := u.storeReports[storeID]; exists {
Expand All @@ -441,11 +456,11 @@ func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatR
if report == nil {
u.numStoresReported++
if u.numStoresReported == len(u.storeReports) {
return true
return true, nil
}
}
}
return false
return false, nil
}

// Gets the stage of the current unsafe recovery.
Expand Down Expand Up @@ -1172,6 +1187,7 @@ func (u *unsafeRecoveryController) generateExitForceLeaderPlan() bool {
for storeID, storeReport := range u.storeReports {
for _, peerReport := range storeReport.PeerReports {
if peerReport.IsForceLeader {
// empty recovery plan triggers exit force leader on TiKV side
_ = u.getRecoveryPlan(storeID)
hasPlan = true
break
Expand Down
Loading

0 comments on commit ba782a5

Please sign in to comment.