diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ad200cd1401..9ccf0f86ad7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -218,7 +218,7 @@ func (c *RaftCluster) Start(s Server) error { } } - c.replicateMode, err = replicate.NewReplicateModeManager(s.GetConfig().ReplicateMode, s.GetStorage(), s.GetAllocator()) + c.replicateMode, err = replicate.NewReplicateModeManager(s.GetConfig().ReplicateMode, s.GetStorage(), s.GetAllocator(), cluster) if err != nil { return err } @@ -228,13 +228,14 @@ func (c *RaftCluster) Start(s Server) error { c.limiter = NewStoreLimiter(c.coordinator.opController) c.quit = make(chan struct{}) - c.wg.Add(3) + c.wg.Add(4) go c.runCoordinator() failpoint.Inject("highFrequencyClusterJobs", func() { backgroundJobInterval = 100 * time.Microsecond }) go c.runBackgroundJobs(backgroundJobInterval) go c.syncRegions() + go c.runReplicateMode() c.running = true return nil @@ -316,6 +317,12 @@ func (c *RaftCluster) syncRegions() { c.regionSyncer.RunServer(c.changedRegionNotifier(), c.quit) } +func (c *RaftCluster) runReplicateMode() { + defer logutil.LogPanic() + defer c.wg.Done() + c.replicateMode.Run(c.quit) +} + // Stop stops the cluster. func (c *RaftCluster) Stop() { c.Lock() diff --git a/server/replicate/replicate.go b/server/replicate/replicate.go index 78829b668ae..c4345b6ab61 100644 --- a/server/replicate/replicate.go +++ b/server/replicate/replicate.go @@ -16,11 +16,14 @@ package replicate import ( "strings" "sync" + "time" pb "github.com/pingcap/kvproto/pkg/replicate_mode" + "github.com/pingcap/log" "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/id" + "go.uber.org/zap" ) const ( @@ -35,16 +38,18 @@ type ModeManager struct { config config.ReplicateModeConfig storage *core.Storage idAlloc id.Allocator + stores core.StoreSetInformer drAutosync drAutosyncStatus } // NewReplicateModeManager creates the replicate mode manager. -func NewReplicateModeManager(config config.ReplicateModeConfig, storage *core.Storage, idAlloc id.Allocator) (*ModeManager, error) { +func NewReplicateModeManager(config config.ReplicateModeConfig, storage *core.Storage, idAlloc id.Allocator, stores core.StoreSetInformer) (*ModeManager, error) { m := &ModeManager{ config: config, storage: storage, idAlloc: idAlloc, + stores: stores, } switch config.ReplicateMode { case modeMajority: @@ -79,6 +84,12 @@ func (m *ModeManager) GetReplicateStatus() *pb.ReplicateStatus { return p } +func (m *ModeManager) getModeName() string { + m.RLock() + defer m.RUnlock() + return m.config.ReplicateMode +} + const ( drStateSync = "sync" drStateAsync = "async" @@ -86,8 +97,9 @@ const ( ) type drAutosyncStatus struct { - State string `json:"state,omitempty"` - RecoverID uint64 `json:"recover_id,omitempty"` + State string `json:"state,omitempty"` + RecoverID uint64 `json:"recover_id,omitempty"` + RecoverStartTime time.Time `json:"recover_start,omitempty"` } func (m *ModeManager) loadDRAutosync() error { @@ -107,9 +119,11 @@ func (m *ModeManager) drSwitchToAsync() error { defer m.Unlock() dr := drAutosyncStatus{State: drStateAsync} if err := m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil { + log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err)) return err } m.drAutosync = dr + log.Info("switched to async state", zap.String("replicate-mode", modeDRAutosync)) return nil } @@ -118,13 +132,16 @@ func (m *ModeManager) drSwitchToSyncRecover() error { defer m.Unlock() id, err := m.idAlloc.Alloc() if err != nil { + log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err)) return err } - dr := drAutosyncStatus{State: drStateSyncRecover, RecoverID: id} + dr := drAutosyncStatus{State: drStateSyncRecover, RecoverID: id, RecoverStartTime: time.Now()} if err = m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil { + log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err)) return err } m.drAutosync = dr + log.Info("switched to sync_recover state", zap.String("replicate-mode", modeDRAutosync)) return nil } @@ -133,8 +150,90 @@ func (m *ModeManager) drSwitchToSync() error { defer m.Unlock() dr := drAutosyncStatus{State: drStateSync} if err := m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil { + log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err)) return err } m.drAutosync = dr + log.Info("switched to sync state", zap.String("replicate-mode", modeDRAutosync)) return nil } + +func (m *ModeManager) drGetState() string { + m.RLock() + defer m.RUnlock() + return m.drAutosync.State +} + +const ( + idleTimeout = time.Minute + tickInterval = time.Second * 10 +) + +// Run starts the background job. +func (m *ModeManager) Run(quit chan struct{}) { + select { + case <-time.After(idleTimeout): + case <-quit: + return + } + for { + select { + case <-time.After(tickInterval): + case <-quit: + return + } + m.tickDR() + } +} + +func (m *ModeManager) tickDR() { + if m.getModeName() != modeDRAutosync { + return + } + + canSync := m.checkCanSync() + + if !canSync && m.drGetState() != drStateAsync { + m.drSwitchToAsync() + } + + if canSync && m.drGetState() == drStateAsync { + m.drSwitchToSyncRecover() + } + + if m.drGetState() == drStateSyncRecover { + if current, total := m.recoverProgress(); current >= total { + m.drSwitchToSync() + } + } +} + +func (m *ModeManager) checkCanSync() bool { + m.RLock() + defer m.RUnlock() + var countPrimary, countDR int + for _, s := range m.stores.GetStores() { + if !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration { + labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey) + if labelValue == m.config.DRAutoSync.Primary { + countPrimary++ + } + if labelValue == m.config.DRAutoSync.DR { + countDR++ + } + } + } + return countPrimary < m.config.DRAutoSync.PrimaryReplicas && countDR < m.config.DRAutoSync.DRReplicas +} + +func (m *ModeManager) recoverProgress() (current, total int) { + // FIXME: only a placeholder now. (done in 30s) + m.RLock() + defer m.RUnlock() + total = 300 + current = int(time.Since(m.drAutosync.RecoverStartTime).Seconds() * 10) + if current > total { + current = total + } + return +} diff --git a/server/replicate/replicate_test.go b/server/replicate/replicate_test.go index 765dbec7df6..f1a326cf127 100644 --- a/server/replicate/replicate_test.go +++ b/server/replicate/replicate_test.go @@ -19,7 +19,9 @@ import ( . "github.com/pingcap/check" pb "github.com/pingcap/kvproto/pkg/replicate_mode" + "github.com/pingcap/pd/v4/pkg/mock/mockcluster" "github.com/pingcap/pd/v4/pkg/mock/mockid" + "github.com/pingcap/pd/v4/pkg/mock/mockoption" "github.com/pingcap/pd/v4/pkg/typeutil" "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/server/core" @@ -38,7 +40,8 @@ func (s *testReplicateMode) TestInitial(c *C) { store := core.NewStorage(kv.NewMemoryKV()) id := mockid.NewIDAllocator() conf := config.ReplicateModeConfig{ReplicateMode: modeMajority} - rep, err := NewReplicateModeManager(conf, store, id) + cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions()) + rep, err := NewReplicateModeManager(conf, store, id, cluster) c.Assert(err, IsNil) c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{Mode: pb.ReplicateStatus_MAJORITY}) @@ -51,7 +54,7 @@ func (s *testReplicateMode) TestInitial(c *C) { WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - rep, err = NewReplicateModeManager(conf, store, id) + rep, err = NewReplicateModeManager(conf, store, id, cluster) c.Assert(err, IsNil) c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{ Mode: pb.ReplicateStatus_DR_AUTOSYNC, @@ -69,7 +72,8 @@ func (s *testReplicateMode) TestStatus(c *C) { LabelKey: "dr-label", WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - rep, err := NewReplicateModeManager(conf, store, id) + cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions()) + rep, err := NewReplicateModeManager(conf, store, id, cluster) c.Assert(err, IsNil) c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{ Mode: pb.ReplicateStatus_DR_AUTOSYNC, @@ -103,7 +107,7 @@ func (s *testReplicateMode) TestStatus(c *C) { }) // test reload - rep, err = NewReplicateModeManager(conf, store, id) + rep, err = NewReplicateModeManager(conf, store, id, cluster) c.Assert(err, IsNil) c.Assert(rep.drAutosync.State, Equals, drStateSyncRecover) @@ -117,3 +121,78 @@ func (s *testReplicateMode) TestStatus(c *C) { }, }) } + +func (s *testReplicateMode) TestStateSwitch(c *C) { + store := core.NewStorage(kv.NewMemoryKV()) + id := mockid.NewIDAllocator() + conf := config.ReplicateModeConfig{ReplicateMode: modeDRAutosync, DRAutoSync: config.DRAutoSyncReplicateConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + PrimaryReplicas: 2, + DRReplicas: 1, + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions()) + rep, err := NewReplicateModeManager(conf, store, id, cluster) + c.Assert(err, IsNil) + + cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) + cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) + cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1"}) + cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"}) + cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) + + // initial state is sync + c.Assert(rep.drGetState(), Equals, drStateSync) + + // sync -> async + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSync) + s.setStoreState(cluster, 1, "down") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSync) + s.setStoreState(cluster, 2, "down") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateAsync) + rep.drSwitchToSync() + s.setStoreState(cluster, 1, "up") + s.setStoreState(cluster, 2, "up") + s.setStoreState(cluster, 5, "down") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateAsync) + + // async -> sync_recover + s.setStoreState(cluster, 5, "up") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSyncRecover) + rep.drSwitchToAsync() + s.setStoreState(cluster, 1, "down") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSyncRecover) + + // sync_recover -> async + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSyncRecover) + s.setStoreState(cluster, 4, "down") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateAsync) + + // sync_recover -> sync + rep.drSwitchToSyncRecover() + s.setStoreState(cluster, 4, "up") + rep.drAutosync.RecoverStartTime = time.Now().Add(-time.Hour) + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSync) +} + +func (s *testReplicateMode) setStoreState(cluster *mockcluster.Cluster, id uint64, state string) { + store := cluster.GetStore(id) + if state == "down" { + store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano() + } else if state == "up" { + store.GetMeta().LastHeartbeat = time.Now().UnixNano() + } + cluster.PutStore(store) +}