Skip to content

Commit

Permalink
replicate: add state switch (#2313)
Browse files Browse the repository at this point in the history
Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Apr 2, 2020
1 parent d654fd8 commit fccb3d4
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 10 deletions.
11 changes: 9 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *RaftCluster) Start(s Server) error {
}
}

c.replicateMode, err = replicate.NewReplicateModeManager(s.GetConfig().ReplicateMode, s.GetStorage(), s.GetAllocator())
c.replicateMode, err = replicate.NewReplicateModeManager(s.GetConfig().ReplicateMode, s.GetStorage(), s.GetAllocator(), cluster)
if err != nil {
return err
}
Expand All @@ -228,13 +228,14 @@ func (c *RaftCluster) Start(s Server) error {
c.limiter = NewStoreLimiter(c.coordinator.opController)
c.quit = make(chan struct{})

c.wg.Add(3)
c.wg.Add(4)
go c.runCoordinator()
failpoint.Inject("highFrequencyClusterJobs", func() {
backgroundJobInterval = 100 * time.Microsecond
})
go c.runBackgroundJobs(backgroundJobInterval)
go c.syncRegions()
go c.runReplicateMode()
c.running = true

return nil
Expand Down Expand Up @@ -316,6 +317,12 @@ func (c *RaftCluster) syncRegions() {
c.regionSyncer.RunServer(c.changedRegionNotifier(), c.quit)
}

func (c *RaftCluster) runReplicateMode() {
defer logutil.LogPanic()
defer c.wg.Done()
c.replicateMode.Run(c.quit)
}

// Stop stops the cluster.
func (c *RaftCluster) Stop() {
c.Lock()
Expand Down
107 changes: 103 additions & 4 deletions server/replicate/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ package replicate
import (
"strings"
"sync"
"time"

pb "github.com/pingcap/kvproto/pkg/replicate_mode"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/id"
"go.uber.org/zap"
)

const (
Expand All @@ -35,16 +38,18 @@ type ModeManager struct {
config config.ReplicateModeConfig
storage *core.Storage
idAlloc id.Allocator
stores core.StoreSetInformer

drAutosync drAutosyncStatus
}

// NewReplicateModeManager creates the replicate mode manager.
func NewReplicateModeManager(config config.ReplicateModeConfig, storage *core.Storage, idAlloc id.Allocator) (*ModeManager, error) {
func NewReplicateModeManager(config config.ReplicateModeConfig, storage *core.Storage, idAlloc id.Allocator, stores core.StoreSetInformer) (*ModeManager, error) {
m := &ModeManager{
config: config,
storage: storage,
idAlloc: idAlloc,
stores: stores,
}
switch config.ReplicateMode {
case modeMajority:
Expand Down Expand Up @@ -79,15 +84,22 @@ func (m *ModeManager) GetReplicateStatus() *pb.ReplicateStatus {
return p
}

func (m *ModeManager) getModeName() string {
m.RLock()
defer m.RUnlock()
return m.config.ReplicateMode
}

const (
drStateSync = "sync"
drStateAsync = "async"
drStateSyncRecover = "sync_recover"
)

type drAutosyncStatus struct {
State string `json:"state,omitempty"`
RecoverID uint64 `json:"recover_id,omitempty"`
State string `json:"state,omitempty"`
RecoverID uint64 `json:"recover_id,omitempty"`
RecoverStartTime time.Time `json:"recover_start,omitempty"`
}

func (m *ModeManager) loadDRAutosync() error {
Expand All @@ -107,9 +119,11 @@ func (m *ModeManager) drSwitchToAsync() error {
defer m.Unlock()
dr := drAutosyncStatus{State: drStateAsync}
if err := m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err))
return err
}
m.drAutosync = dr
log.Info("switched to async state", zap.String("replicate-mode", modeDRAutosync))
return nil
}

Expand All @@ -118,13 +132,16 @@ func (m *ModeManager) drSwitchToSyncRecover() error {
defer m.Unlock()
id, err := m.idAlloc.Alloc()
if err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err))
return err
}
dr := drAutosyncStatus{State: drStateSyncRecover, RecoverID: id}
dr := drAutosyncStatus{State: drStateSyncRecover, RecoverID: id, RecoverStartTime: time.Now()}
if err = m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err))
return err
}
m.drAutosync = dr
log.Info("switched to sync_recover state", zap.String("replicate-mode", modeDRAutosync))
return nil
}

Expand All @@ -133,8 +150,90 @@ func (m *ModeManager) drSwitchToSync() error {
defer m.Unlock()
dr := drAutosyncStatus{State: drStateSync}
if err := m.storage.SaveReplicateStatus(modeDRAutosync, dr); err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutosync), zap.Error(err))
return err
}
m.drAutosync = dr
log.Info("switched to sync state", zap.String("replicate-mode", modeDRAutosync))
return nil
}

func (m *ModeManager) drGetState() string {
m.RLock()
defer m.RUnlock()
return m.drAutosync.State
}

const (
idleTimeout = time.Minute
tickInterval = time.Second * 10
)

// Run starts the background job.
func (m *ModeManager) Run(quit chan struct{}) {
select {
case <-time.After(idleTimeout):
case <-quit:
return
}
for {
select {
case <-time.After(tickInterval):
case <-quit:
return
}
m.tickDR()
}
}

func (m *ModeManager) tickDR() {
if m.getModeName() != modeDRAutosync {
return
}

canSync := m.checkCanSync()

if !canSync && m.drGetState() != drStateAsync {
m.drSwitchToAsync()
}

if canSync && m.drGetState() == drStateAsync {
m.drSwitchToSyncRecover()
}

if m.drGetState() == drStateSyncRecover {
if current, total := m.recoverProgress(); current >= total {
m.drSwitchToSync()
}
}
}

func (m *ModeManager) checkCanSync() bool {
m.RLock()
defer m.RUnlock()
var countPrimary, countDR int
for _, s := range m.stores.GetStores() {
if !s.IsTombstone() && s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration {
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
countPrimary++
}
if labelValue == m.config.DRAutoSync.DR {
countDR++
}
}
}
return countPrimary < m.config.DRAutoSync.PrimaryReplicas && countDR < m.config.DRAutoSync.DRReplicas
}

func (m *ModeManager) recoverProgress() (current, total int) {
// FIXME: only a placeholder now. (done in 30s)
m.RLock()
defer m.RUnlock()
total = 300
current = int(time.Since(m.drAutosync.RecoverStartTime).Seconds() * 10)
if current > total {
current = total
}
return
}
87 changes: 83 additions & 4 deletions server/replicate/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (

. "github.com/pingcap/check"
pb "github.com/pingcap/kvproto/pkg/replicate_mode"
"github.com/pingcap/pd/v4/pkg/mock/mockcluster"
"github.com/pingcap/pd/v4/pkg/mock/mockid"
"github.com/pingcap/pd/v4/pkg/mock/mockoption"
"github.com/pingcap/pd/v4/pkg/typeutil"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
Expand All @@ -38,7 +40,8 @@ func (s *testReplicateMode) TestInitial(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
id := mockid.NewIDAllocator()
conf := config.ReplicateModeConfig{ReplicateMode: modeMajority}
rep, err := NewReplicateModeManager(conf, store, id)
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
rep, err := NewReplicateModeManager(conf, store, id, cluster)
c.Assert(err, IsNil)
c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{Mode: pb.ReplicateStatus_MAJORITY})

Expand All @@ -51,7 +54,7 @@ func (s *testReplicateMode) TestInitial(c *C) {
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
rep, err = NewReplicateModeManager(conf, store, id)
rep, err = NewReplicateModeManager(conf, store, id, cluster)
c.Assert(err, IsNil)
c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{
Mode: pb.ReplicateStatus_DR_AUTOSYNC,
Expand All @@ -69,7 +72,8 @@ func (s *testReplicateMode) TestStatus(c *C) {
LabelKey: "dr-label",
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
rep, err := NewReplicateModeManager(conf, store, id)
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
rep, err := NewReplicateModeManager(conf, store, id, cluster)
c.Assert(err, IsNil)
c.Assert(rep.GetReplicateStatus(), DeepEquals, &pb.ReplicateStatus{
Mode: pb.ReplicateStatus_DR_AUTOSYNC,
Expand Down Expand Up @@ -103,7 +107,7 @@ func (s *testReplicateMode) TestStatus(c *C) {
})

// test reload
rep, err = NewReplicateModeManager(conf, store, id)
rep, err = NewReplicateModeManager(conf, store, id, cluster)
c.Assert(err, IsNil)
c.Assert(rep.drAutosync.State, Equals, drStateSyncRecover)

Expand All @@ -117,3 +121,78 @@ func (s *testReplicateMode) TestStatus(c *C) {
},
})
}

func (s *testReplicateMode) TestStateSwitch(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
id := mockid.NewIDAllocator()
conf := config.ReplicateModeConfig{ReplicateMode: modeDRAutosync, DRAutoSync: config.DRAutoSyncReplicateConfig{
LabelKey: "zone",
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 2,
DRReplicas: 1,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(mockoption.NewScheduleOptions())
rep, err := NewReplicateModeManager(conf, store, id, cluster)
c.Assert(err, IsNil)

cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"})
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"})

// initial state is sync
c.Assert(rep.drGetState(), Equals, drStateSync)

// sync -> async
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
s.setStoreState(cluster, 1, "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
s.setStoreState(cluster, 2, "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
rep.drSwitchToSync()
s.setStoreState(cluster, 1, "up")
s.setStoreState(cluster, 2, "up")
s.setStoreState(cluster, 5, "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)

// async -> sync_recover
s.setStoreState(cluster, 5, "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
rep.drSwitchToAsync()
s.setStoreState(cluster, 1, "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)

// sync_recover -> async
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
s.setStoreState(cluster, 4, "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)

// sync_recover -> sync
rep.drSwitchToSyncRecover()
s.setStoreState(cluster, 4, "up")
rep.drAutosync.RecoverStartTime = time.Now().Add(-time.Hour)
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
}

func (s *testReplicateMode) setStoreState(cluster *mockcluster.Cluster, id uint64, state string) {
store := cluster.GetStore(id)
if state == "down" {
store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano()
} else if state == "up" {
store.GetMeta().LastHeartbeat = time.Now().UnixNano()
}
cluster.PutStore(store)
}

0 comments on commit fccb3d4

Please sign in to comment.