Skip to content

Commit

Permalink
dr-autosync: retry replicate state file (#4403)
Browse files Browse the repository at this point in the history
* dr-autosync: retry replicate state file

close #4327

Signed-off-by: disksing <i@disksing.com>

* fix ci

Signed-off-by: disksing <i@disksing.com>

* fix test

Signed-off-by: disksing <i@disksing.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
disksing and ti-chi-bot authored Dec 15, 2021
1 parent d13fc44 commit e7c795f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 97 deletions.
3 changes: 2 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Server interface {
GetHBStreams() *hbstream.HeartbeatStreams
GetRaftCluster() *RaftCluster
GetBasicCluster() *core.BasicCluster
ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
}

// RaftCluster is used for cluster config management.
Expand Down
84 changes: 60 additions & 24 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
pb "github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/opt"
Expand All @@ -50,7 +52,8 @@ func modeToPB(m string) pb.ReplicationMode {
// FileReplicater is the interface that can save important data to all cluster
// nodes.
type FileReplicater interface {
ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
}

const drStatusFile = "DR_STATE"
Expand All @@ -62,10 +65,11 @@ type ModeManager struct {
initTime time.Time

sync.RWMutex
config config.ReplicationModeConfig
storage *core.Storage
cluster opt.Cluster
fileReplicater FileReplicater
config config.ReplicationModeConfig
storage *core.Storage
cluster opt.Cluster
fileReplicater FileReplicater
replicatedMembers []uint64

drAutoSync drAutoSyncStatus
// intermediate states of the recovery process
Expand Down Expand Up @@ -256,9 +260,7 @@ func (m *ModeManager) drSwitchToAsyncWithLock() error {
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -282,9 +284,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
}
now := time.Now()
dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -304,9 +304,7 @@ func (m *ModeManager) drSwitchToSync() error {
return err
}
dr := drAutoSyncStatus{State: drStateSync, StateID: id}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -316,23 +314,48 @@ func (m *ModeManager) drSwitchToSync() error {
return nil
}

func (m *ModeManager) drPersistStatus(status drAutoSyncStatus) error {
if m.fileReplicater != nil {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()
data, _ := json.Marshal(status)
if err := m.fileReplicater.ReplicateFileToAllMembers(ctx, drStatusFile, data); err != nil {
func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()

members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}

data, _ := json.Marshal(status)

m.replicatedMembers = m.replicatedMembers[:0]
for _, member := range members {
if err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data); err != nil {
log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err))
// Throw away the error to make it possible to switch to async when
// primary and dr DC are disconnected. This will result in the
// inability to accurately determine whether data is fully
// synchronized when using dr DC to disaster recovery.
// TODO: introduce PD's leader-follower connection timeout to solve
// this issue. More details: https://github.com/tikv/pd/issues/2490
return nil
// Since the member will not be in `replicatedMembers` list, PD will
// try to replicate state file later.
} else {
m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId())
}
}
return nil
}

func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool {
m.RLock()
defer m.RUnlock()
return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list
return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated
return m.replicatedMembers[j] == members[i].GetMemberId()
})
})
}

func (m *ModeManager) drPersistStatus() {
m.Lock()
defer m.Unlock()
m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID})
}

func (m *ModeManager) drGetState() string {
Expand Down Expand Up @@ -418,6 +441,8 @@ func (m *ModeManager) tickDR() {
m.updateRecoverProgress(progress)
}
}

m.checkReplicateFile()
}

func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primaryUpCount, drUpCount int) {
Expand All @@ -444,6 +469,17 @@ func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primary
return
}

func (m *ModeManager) checkReplicateFile() {
members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
if m.drCheckNeedPersistStatus(members) {
m.drPersistStatus()
}
}

var (
regionScanBatchSize = 1024
regionMinSampleSize = 512
Expand Down
93 changes: 77 additions & 16 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/pdpb"
pb "github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/typeutil"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *testReplicationMode) TestInitial(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeMajority}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{Mode: pb.ReplicationMode_MAJORITY})

Expand All @@ -66,7 +67,7 @@ func (s *testReplicationMode) TestInitial(c *C) {
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
rep, err = NewReplicationModeManager(conf, store, cluster, nil)
rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{
Mode: pb.ReplicationMode_DR_AUTO_SYNC,
Expand All @@ -86,7 +87,7 @@ func (s *testReplicationMode) TestStatus(c *C) {
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{
Mode: pb.ReplicationMode_DR_AUTO_SYNC,
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *testReplicationMode) TestStatus(c *C) {
})

// test reload
rep, err = NewReplicationModeManager(conf, store, cluster, nil)
rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.drAutoSync.State, Equals, drStateSyncRecover)

Expand All @@ -142,13 +143,33 @@ func (s *testReplicationMode) TestStatus(c *C) {
}

type mockFileReplicator struct {
lastData string
err error
memberIDs []uint64
lastData map[uint64]string
errors map[uint64]error
}

func (rep *mockFileReplicator) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error {
rep.lastData = string(data)
return rep.err
func (rep *mockFileReplicator) GetMembers() ([]*pdpb.Member, error) {
var members []*pdpb.Member
for _, id := range rep.memberIDs {
members = append(members, &pdpb.Member{MemberId: id})
}
return members, nil
}

func (rep *mockFileReplicator) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
if err := rep.errors[member.GetMemberId()]; err != nil {
return err
}
rep.lastData[member.GetMemberId()] = string(data)
return nil
}

func newMockReplicator(ids []uint64) *mockFileReplicator {
return &mockFileReplicator{
memberIDs: ids,
lastData: make(map[uint64]string),
errors: make(map[uint64]error),
}
}

func (s *testReplicationMode) TestStateSwitch(c *C) {
Expand All @@ -163,8 +184,8 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
var replicator mockFileReplicator
rep, err := NewReplicationModeManager(conf, store, cluster, &replicator)
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
c.Assert(err, IsNil)

cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"})
Expand All @@ -175,7 +196,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(rep.drGetState(), Equals, drStateSync)
stateID := rep.drAutoSync.StateID
c.Assert(stateID, Not(Equals), uint64(0))
c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
assertStateIDUpdate := func() {
c.Assert(rep.drAutoSync.StateID, Not(Equals), stateID)
stateID = rep.drAutoSync.StateID
Expand All @@ -185,7 +206,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID))
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID))

// add new store in dr zone.
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"})
Expand Down Expand Up @@ -213,7 +234,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
rep.drSwitchToSync()
replicator.err = errors.New("fail to replicate")
replicator.errors[1] = errors.New("fail to replicate")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
Expand Down Expand Up @@ -269,6 +290,46 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
assertStateIDUpdate()
}

func (s *testReplicationMode) TestReplicateState(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
LabelKey: "zone",
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 2,
DRReplicas: 1,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
c.Assert(err, IsNil)

stateID := rep.drAutoSync.StateID
// replicate after initialized
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))

// repliate state to new member
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
rep.checkReplicateFile()
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))

// inject error
replicator.errors[2] = errors.New("failed to persist")
rep.tickDR() // switch async since there is only one zone
newStateID := rep.drAutoSync.StateID
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))

// clear error, replicate to node 2 next time
delete(replicator.errors, 2)
rep.checkReplicateFile()
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))
}

func (s *testReplicationMode) TestAsynctimeout(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
Expand Down Expand Up @@ -334,7 +395,7 @@ func (s *testReplicationMode) TestRecoverProgress(c *C) {
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)

prepare := func(n int, asyncRegions []int) {
Expand Down Expand Up @@ -394,7 +455,7 @@ func (s *testReplicationMode) TestRecoverProgressWithSplitAndMerge(c *C) {
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)

prepare := func(n int, asyncRegions []int) {
Expand Down
25 changes: 2 additions & 23 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,31 +1359,10 @@ func (s *Server) reloadConfigFromKV() error {
return nil
}

// ReplicateFileToAllMembers is used to synchronize state among all members.
// ReplicateFileToMember is used to synchronize state to a member.
// Each member will write `data` to a local file named `name`.
// For security reason, data should be in JSON format.
func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error {
members, err := s.GetMembers()
if err != nil {
return err
}
var errs []error
for _, member := range members {
if err := s.replicateFileToMember(ctx, member, name, data); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
// join all error messages
for _, e := range errs[1:] {
errs[0] = errors.Wrap(errs[0], e.Error())
}
return errs[0]
}

func (s *Server) replicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
clientUrls := member.GetClientUrls()
if len(clientUrls) == 0 {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()))
Expand Down
Loading

0 comments on commit e7c795f

Please sign in to comment.