Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dr-autosync: retry replicate state file #4403

Merged
merged 6 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Server interface {
GetHBStreams() *hbstream.HeartbeatStreams
GetRaftCluster() *RaftCluster
GetBasicCluster() *core.BasicCluster
ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
}

// RaftCluster is used for cluster config management.
Expand Down
84 changes: 60 additions & 24 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
pb "github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/opt"
Expand All @@ -50,7 +52,8 @@ func modeToPB(m string) pb.ReplicationMode {
// FileReplicater is the interface that can save important data to all cluster
// nodes.
type FileReplicater interface {
ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
}

const drStatusFile = "DR_STATE"
Expand All @@ -62,10 +65,11 @@ type ModeManager struct {
initTime time.Time

sync.RWMutex
config config.ReplicationModeConfig
storage *core.Storage
cluster opt.Cluster
fileReplicater FileReplicater
config config.ReplicationModeConfig
storage *core.Storage
cluster opt.Cluster
fileReplicater FileReplicater
replicatedMembers []uint64

drAutoSync drAutoSyncStatus
// intermediate states of the recovery process
Expand Down Expand Up @@ -256,9 +260,7 @@ func (m *ModeManager) drSwitchToAsyncWithLock() error {
return err
}
dr := drAutoSyncStatus{State: drStateAsync, StateID: id}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -282,9 +284,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
}
now := time.Now()
dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -304,9 +304,7 @@ func (m *ModeManager) drSwitchToSync() error {
return err
}
dr := drAutoSyncStatus{State: drStateSync, StateID: id}
if err := m.drPersistStatus(dr); err != nil {
return err
}
m.drPersistStatusWithLock(dr)
if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -316,23 +314,48 @@ func (m *ModeManager) drSwitchToSync() error {
return nil
}

func (m *ModeManager) drPersistStatus(status drAutoSyncStatus) error {
if m.fileReplicater != nil {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()
data, _ := json.Marshal(status)
if err := m.fileReplicater.ReplicateFileToAllMembers(ctx, drStatusFile, data); err != nil {
func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) {
ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout)
defer cancel()

members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}

data, _ := json.Marshal(status)

m.replicatedMembers = m.replicatedMembers[:0]
for _, member := range members {
if err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data); err != nil {
log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err))
// Throw away the error to make it possible to switch to async when
// primary and dr DC are disconnected. This will result in the
// inability to accurately determine whether data is fully
// synchronized when using dr DC to disaster recovery.
// TODO: introduce PD's leader-follower connection timeout to solve
// this issue. More details: https://github.com/tikv/pd/issues/2490
return nil
// Since the member will not be in `replicatedMembers` list, PD will
// try to replicate state file later.
} else {
m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId())
}
}
return nil
}

func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool {
m.RLock()
defer m.RUnlock()
return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list
return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated
return m.replicatedMembers[j] == members[i].GetMemberId()
})
})
}

func (m *ModeManager) drPersistStatus() {
m.Lock()
defer m.Unlock()
m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID})
}

func (m *ModeManager) drGetState() string {
Expand Down Expand Up @@ -418,6 +441,8 @@ func (m *ModeManager) tickDR() {
m.updateRecoverProgress(progress)
}
}

m.checkReplicateFile()
}

func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primaryUpCount, drUpCount int) {
Expand All @@ -444,6 +469,17 @@ func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primary
return
}

func (m *ModeManager) checkReplicateFile() {
members, err := m.fileReplicater.GetMembers()
if err != nil {
log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync))
return
}
if m.drCheckNeedPersistStatus(members) {
m.drPersistStatus()
}
}

var (
regionScanBatchSize = 1024
regionMinSampleSize = 512
Expand Down
93 changes: 77 additions & 16 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/pdpb"
pb "github.com/pingcap/kvproto/pkg/replication_modepb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/typeutil"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (s *testReplicationMode) TestInitial(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeMajority}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{Mode: pb.ReplicationMode_MAJORITY})

Expand All @@ -66,7 +67,7 @@ func (s *testReplicationMode) TestInitial(c *C) {
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
rep, err = NewReplicationModeManager(conf, store, cluster, nil)
rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{
Mode: pb.ReplicationMode_DR_AUTO_SYNC,
Expand All @@ -86,7 +87,7 @@ func (s *testReplicationMode) TestStatus(c *C) {
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{
Mode: pb.ReplicationMode_DR_AUTO_SYNC,
Expand Down Expand Up @@ -124,7 +125,7 @@ func (s *testReplicationMode) TestStatus(c *C) {
})

// test reload
rep, err = NewReplicationModeManager(conf, store, cluster, nil)
rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)
c.Assert(rep.drAutoSync.State, Equals, drStateSyncRecover)

Expand All @@ -142,13 +143,33 @@ func (s *testReplicationMode) TestStatus(c *C) {
}

type mockFileReplicator struct {
lastData string
err error
memberIDs []uint64
lastData map[uint64]string
errors map[uint64]error
}

func (rep *mockFileReplicator) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error {
rep.lastData = string(data)
return rep.err
func (rep *mockFileReplicator) GetMembers() ([]*pdpb.Member, error) {
var members []*pdpb.Member
for _, id := range rep.memberIDs {
members = append(members, &pdpb.Member{MemberId: id})
}
return members, nil
}

func (rep *mockFileReplicator) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
if err := rep.errors[member.GetMemberId()]; err != nil {
return err
}
rep.lastData[member.GetMemberId()] = string(data)
return nil
}

func newMockReplicator(ids []uint64) *mockFileReplicator {
return &mockFileReplicator{
memberIDs: ids,
lastData: make(map[uint64]string),
errors: make(map[uint64]error),
}
}

func (s *testReplicationMode) TestStateSwitch(c *C) {
Expand All @@ -163,8 +184,8 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
var replicator mockFileReplicator
rep, err := NewReplicationModeManager(conf, store, cluster, &replicator)
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
c.Assert(err, IsNil)

cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"})
Expand All @@ -175,7 +196,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(rep.drGetState(), Equals, drStateSync)
stateID := rep.drAutoSync.StateID
c.Assert(stateID, Not(Equals), uint64(0))
c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
assertStateIDUpdate := func() {
c.Assert(rep.drAutoSync.StateID, Not(Equals), stateID)
stateID = rep.drAutoSync.StateID
Expand All @@ -185,7 +206,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID))
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID))

// add new store in dr zone.
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"})
Expand Down Expand Up @@ -213,7 +234,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
rep.drSwitchToSync()
replicator.err = errors.New("fail to replicate")
replicator.errors[1] = errors.New("fail to replicate")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
Expand Down Expand Up @@ -269,6 +290,46 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
assertStateIDUpdate()
}

func (s *testReplicationMode) TestReplicateState(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
LabelKey: "zone",
Primary: "zone1",
DR: "zone2",
PrimaryReplicas: 2,
DRReplicas: 1,
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
WaitSyncTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
c.Assert(err, IsNil)

stateID := rep.drAutoSync.StateID
// replicate after initialized
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))

// repliate state to new member
replicator.memberIDs = append(replicator.memberIDs, 2, 3)
rep.checkReplicateFile()
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))

// inject error
replicator.errors[2] = errors.New("failed to persist")
rep.tickDR() // switch async since there is only one zone
newStateID := rep.drAutoSync.StateID
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID))
c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))

// clear error, replicate to node 2 next time
delete(replicator.errors, 2)
rep.checkReplicateFile()
c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID))
}

func (s *testReplicationMode) TestAsynctimeout(c *C) {
store := core.NewStorage(kv.NewMemoryKV())
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
Expand Down Expand Up @@ -334,7 +395,7 @@ func (s *testReplicationMode) TestRecoverProgress(c *C) {
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)

prepare := func(n int, asyncRegions []int) {
Expand Down Expand Up @@ -394,7 +455,7 @@ func (s *testReplicationMode) TestRecoverProgressWithSplitAndMerge(c *C) {
}}
cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions())
cluster.AddLabelsStore(1, 1, map[string]string{})
rep, err := NewReplicationModeManager(conf, store, cluster, nil)
rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1}))
c.Assert(err, IsNil)

prepare := func(n int, asyncRegions []int) {
Expand Down
25 changes: 2 additions & 23 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,31 +1359,10 @@ func (s *Server) reloadConfigFromKV() error {
return nil
}

// ReplicateFileToAllMembers is used to synchronize state among all members.
// ReplicateFileToMember is used to synchronize state to a member.
// Each member will write `data` to a local file named `name`.
// For security reason, data should be in JSON format.
func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error {
members, err := s.GetMembers()
if err != nil {
return err
}
var errs []error
for _, member := range members {
if err := s.replicateFileToMember(ctx, member, name, data); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
// join all error messages
for _, e := range errs[1:] {
errs[0] = errors.Wrap(errs[0], e.Error())
}
return errs[0]
}

func (s *Server) replicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
clientUrls := member.GetClientUrls()
if len(clientUrls) == 0 {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()))
Expand Down
Loading