diff --git a/server/cluster/unsafe_recovery_controller.go b/server/cluster/unsafe_recovery_controller.go index 3b32279c785..60c7d3eeb4f 100644 --- a/server/cluster/unsafe_recovery_controller.go +++ b/server/cluster/unsafe_recovery_controller.go @@ -160,17 +160,22 @@ func (u *unsafeRecoveryController) reset() { func (u *unsafeRecoveryController) IsRunning() bool { u.RLock() defer u.RUnlock() + return u.isRunningLocked() +} + +func (u *unsafeRecoveryController) isRunningLocked() bool { return u.stage != idle && u.stage != finished && u.stage != failed } // RemoveFailedStores removes failed stores from the cluster. func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64) error { - if u.IsRunning() { - return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() - } u.Lock() defer u.Unlock() + if u.isRunningLocked() { + return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs() + } + if len(failedStores) == 0 { return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no store specified") } @@ -216,7 +221,9 @@ func (u *unsafeRecoveryController) Show() []StageOutput { if u.stage == idle { return []StageOutput{{Info: "No on-going recovery."}} } - u.checkTimeout() + if err := u.checkTimeout(); err != nil { + u.HandleErr(err) + } status := u.output if u.stage != finished && u.stage != failed { status = append(status, u.getReportStatus()) @@ -251,17 +258,15 @@ func (u *unsafeRecoveryController) getReportStatus() StageOutput { return status } -func (u *unsafeRecoveryController) checkTimeout() bool { +func (u *unsafeRecoveryController) checkTimeout() error { if u.stage == finished || u.stage == failed { - return false + return nil } if time.Now().After(u.timeout) { - ret := u.HandleErr(errors.Errorf("Exceeds timeout %v", u.timeout)) - u.timeout = time.Now().Add(storeRequestInterval * 2) - return ret + return errors.Errorf("Exceeds timeout %v", u.timeout) } - return false + return nil } func (u *unsafeRecoveryController) HandleErr(err error) bool { @@ -270,128 +275,139 @@ func (u *unsafeRecoveryController) HandleErr(err error) bool { u.err = err } if u.stage == exitForceLeader { + // We already tried to exit force leader, and it still failed. + // We turn into failed stage directly. TiKV will step down force leader + // automatically after being for a long time. u.changeStage(failed) return true } + // When encountering an error for the first time, we will try to exit force + // leader before turning into failed stage to avoid the leaking force leaders + // blocks reads and writes. + u.storePlanExpires = make(map[uint64]time.Time) + u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan) + u.timeout = time.Now().Add(storeRequestInterval * 2) + // empty recovery plan would trigger exit force leader + u.changeStage(exitForceLeader) return false } // HandleStoreHeartbeat handles the store heartbeat requests and checks whether the stores need to // send detailed report back. func (u *unsafeRecoveryController) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest, resp *pdpb.StoreHeartbeatResponse) { - if !u.IsRunning() { - // no recovery in progress, do nothing - return - } u.Lock() defer u.Unlock() - if u.checkTimeout() { + if !u.isRunningLocked() { + // no recovery in progress, do nothing return } - allCollected := u.collectReport(heartbeat) + done, err := func() (bool, error) { + if err := u.checkTimeout(); err != nil { + return false, err + } - if allCollected { - newestRegionTree, peersMap, buildErr := u.buildUpFromReports() - if buildErr != nil && u.HandleErr(buildErr) { - return + allCollected, err := u.collectReport(heartbeat) + if err != nil { + return false, err } - // clean up previous plan - u.storePlanExpires = make(map[uint64]time.Time) - u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan) + if allCollected { + newestRegionTree, peersMap, err := u.buildUpFromReports() + if err != nil { + return false, err + } - var stage unsafeRecoveryStage - if u.err == nil { - stage = u.stage - } else { - stage = exitForceLeader + return u.generatePlan(newestRegionTree, peersMap) } - reCheck := false - hasPlan := false - var err error - for { - switch stage { - case collectReport: - fallthrough - case tombstoneTiFlashLearner: - if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil { - u.changeStage(tombstoneTiFlashLearner) - break - } - if err != nil { - break - } - fallthrough - case forceLeaderForCommitMerge: - if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil { - u.changeStage(forceLeaderForCommitMerge) - break - } - if err != nil { - break - } - fallthrough - case forceLeader: - if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil { - u.changeStage(forceLeader) - break - } - if err != nil { - break - } - fallthrough - case demoteFailedVoter: - if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan { - u.changeStage(demoteFailedVoter) - break - } else if !reCheck { - reCheck = true - stage = tombstoneTiFlashLearner - continue - } - fallthrough - case createEmptyRegion: - if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil { - u.changeStage(createEmptyRegion) - break - } - if err != nil { - break - } - fallthrough - case exitForceLeader: - // no need to generate plan, empty recovery plan triggers exit force leader on TiKV side - if hasPlan = u.generateExitForceLeaderPlan(); hasPlan { - u.changeStage(exitForceLeader) - } - default: - panic("unreachable") - } + return false, nil + }() + if done || (err != nil && u.HandleErr(err)) { + return + } + u.dispatchPlan(heartbeat, resp) +} + +func (u *unsafeRecoveryController) generatePlan(newestRegionTree *regionTree, peersMap map[uint64][]*regionItem) (bool, error) { + // clean up previous plan + u.storePlanExpires = make(map[uint64]time.Time) + u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan) + + stage := u.stage + reCheck := false + hasPlan := false + var err error + for { + switch stage { + case collectReport: + fallthrough + case tombstoneTiFlashLearner: + if hasPlan, err = u.generateTombstoneTiFlashLearnerPlan(newestRegionTree, peersMap); hasPlan && err == nil { + u.changeStage(tombstoneTiFlashLearner) + break + } if err != nil { - if u.HandleErr(err) { - return - } - u.storePlanExpires = make(map[uint64]time.Time) - u.storeRecoveryPlans = make(map[uint64]*pdpb.RecoveryPlan) - // Clear the reports etc. + break + } + fallthrough + case forceLeaderForCommitMerge: + if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, true); hasPlan && err == nil { + u.changeStage(forceLeaderForCommitMerge) + break + } + if err != nil { + break + } + fallthrough + case forceLeader: + if hasPlan, err = u.generateForceLeaderPlan(newestRegionTree, peersMap, false); hasPlan && err == nil { + u.changeStage(forceLeader) + break + } + if err != nil { + break + } + fallthrough + case demoteFailedVoter: + if hasPlan = u.generateDemoteFailedVoterPlan(newestRegionTree, peersMap); hasPlan { + u.changeStage(demoteFailedVoter) + break + } else if !reCheck { + reCheck = true + stage = tombstoneTiFlashLearner + continue + } + fallthrough + case createEmptyRegion: + if hasPlan, err = u.generateCreateEmptyRegionPlan(newestRegionTree, peersMap); hasPlan && err == nil { + u.changeStage(createEmptyRegion) + break + } + if err != nil { + break + } + fallthrough + case exitForceLeader: + if hasPlan = u.generateExitForceLeaderPlan(); hasPlan { u.changeStage(exitForceLeader) - return - } else if !hasPlan { - if u.err != nil { - u.changeStage(failed) - } else { - u.changeStage(finished) - } - return } - break + default: + panic("unreachable") } + break } - u.dispatchPlan(heartbeat, resp) + if err == nil && !hasPlan { + if u.err != nil { + u.changeStage(failed) + } else { + u.changeStage(finished) + } + return true, nil + } + return false, err } // It dispatches recovery plan if any. @@ -417,22 +433,21 @@ func (u *unsafeRecoveryController) dispatchPlan(heartbeat *pdpb.StoreHeartbeatRe } // It collects and checks if store reports have been fully collected. -func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) bool { +func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatRequest) (bool, error) { storeID := heartbeat.Stats.StoreId if _, isFailedStore := u.failedStores[storeID]; isFailedStore { - u.HandleErr(errors.Errorf("Receive heartbeat from failed store %d", storeID)) - return false + return false, errors.Errorf("Receive heartbeat from failed store %d", storeID) } if heartbeat.StoreReport == nil { - return false + return false, nil } if heartbeat.StoreReport.GetStep() != u.step { log.Info("Unsafe recovery receives invalid store report", zap.Uint64("store-id", storeID), zap.Uint64("expected-step", u.step), zap.Uint64("obtained-step", heartbeat.StoreReport.GetStep())) // invalid store report, ignore - return false + return false, nil } if report, exists := u.storeReports[storeID]; exists { @@ -441,11 +456,11 @@ func (u *unsafeRecoveryController) collectReport(heartbeat *pdpb.StoreHeartbeatR if report == nil { u.numStoresReported++ if u.numStoresReported == len(u.storeReports) { - return true + return true, nil } } } - return false + return false, nil } // Gets the stage of the current unsafe recovery. @@ -1172,6 +1187,7 @@ func (u *unsafeRecoveryController) generateExitForceLeaderPlan() bool { for storeID, storeReport := range u.storeReports { for _, peerReport := range storeReport.PeerReports { if peerReport.IsForceLeader { + // empty recovery plan triggers exit force leader on TiKV side _ = u.getRecoveryPlan(storeID) hasPlan = true break diff --git a/server/cluster/unsafe_recovery_controller_test.go b/server/cluster/unsafe_recovery_controller_test.go index adceeb8a1ad..d71e145509b 100644 --- a/server/cluster/unsafe_recovery_controller_test.go +++ b/server/cluster/unsafe_recovery_controller_test.go @@ -301,14 +301,23 @@ func (s *testUnsafeRecoverySuite) TestFailed(c *C) { req := newStoreHeartbeat(2, nil) resp := &pdpb.StoreHeartbeatResponse{} recoveryController.HandleStoreHeartbeat(req, resp) - c.Assert(resp.RecoveryPlan, IsNil) + c.Assert(resp.RecoveryPlan, Equals, exitForceLeader) for storeID, report := range reports { req := newStoreHeartbeat(storeID, report) req.StoreReport = report resp := &pdpb.StoreHeartbeatResponse{} recoveryController.HandleStoreHeartbeat(req, resp) - c.Assert(resp.RecoveryPlan, IsNil) + c.Assert(resp.RecoveryPlan, NotNil) + applyRecoveryPlan(c, storeID, reports, resp) + } + + for storeID, report := range reports { + req := newStoreHeartbeat(storeID, report) + req.StoreReport = report + resp := &pdpb.StoreHeartbeatResponse{} + recoveryController.HandleStoreHeartbeat(req, resp) + applyRecoveryPlan(c, storeID, reports, resp) } c.Assert(recoveryController.GetStage(), Equals, failed) } @@ -961,7 +970,7 @@ func (s *testUnsafeRecoverySuite) TestJointState(c *C) { } } -func (s *testUnsafeRecoverySuite) TestTimeout(c *C) { +func (s *testUnsafeRecoverySuite) TestExecutionTimeout(c *C) { _, opt, _ := newTestScheduleConfig() cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) @@ -977,10 +986,35 @@ func (s *testUnsafeRecoverySuite) TestTimeout(c *C) { time.Sleep(time.Second) req := newStoreHeartbeat(1, nil) - req.StoreReport = &pdpb.StoreReport{Step: 1} resp := &pdpb.StoreHeartbeatResponse{} recoveryController.HandleStoreHeartbeat(req, resp) - c.Assert(recoveryController.GetStage(), Equals, failed) + c.Assert(exitForceLeader, Equals, recoveryController.GetStage()) + req.StoreReport = &pdpb.StoreReport{Step: 2} + recoveryController.HandleStoreHeartbeat(req, resp) + c.Assert(failed, Equals, recoveryController.GetStage()) + + output := recoveryController.Show() + c.Assert(len(output), Equals, 3) + c.Assert(output[1].Details[0], Matches, ".*triggered by error: Exceeds timeout.*") +} + +func (s *testUnsafeRecoverySuite) TestNoHeartbeatTimeout(c *C) { + _, opt, _ := newTestScheduleConfig() + cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = newCoordinator(s.ctx, cluster, hbstream.NewTestHeartbeatStreams(s.ctx, cluster.meta.GetId(), cluster, true)) + cluster.coordinator.run() + for _, store := range newTestStores(3, "6.0.0") { + c.Assert(cluster.PutStore(store.GetMeta()), IsNil) + } + recoveryController := newUnsafeRecoveryController(cluster) + c.Assert(recoveryController.RemoveFailedStores(map[uint64]struct{}{ + 2: {}, + 3: {}, + }, 1), IsNil) + + time.Sleep(time.Second) + recoveryController.Show() + c.Assert(exitForceLeader, Equals, recoveryController.GetStage()) } func (s *testUnsafeRecoverySuite) TestExitForceLeader(c *C) {