From 6bc0b3930989861b9de7f83f3b3ef872daf6ee63 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 30 Nov 2021 11:26:17 +0800 Subject: [PATCH 1/3] dr-autosync: retry replicate state file close #4327 Signed-off-by: disksing --- server/cluster/cluster.go | 3 +- server/replication/replication_mode.go | 84 +++++++++++++++------ server/replication/replication_mode_test.go | 81 +++++++++++++++++--- server/server.go | 23 +----- tests/server/server_test.go | 33 -------- 5 files changed, 134 insertions(+), 90 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b8eda111ae3..7227b67de15 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -73,7 +73,8 @@ type Server interface { GetHBStreams() *hbstream.HeartbeatStreams GetRaftCluster() *RaftCluster GetBasicCluster() *core.BasicCluster - ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error + GetMembers() ([]*pdpb.Member, error) + ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error } // RaftCluster is used for cluster config management. diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 182128daa38..ecbe6a9d05b 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -22,10 +22,12 @@ import ( "sync" "time" + "github.com/pingcap/kvproto/pkg/pdpb" pb "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/logutil" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/opt" @@ -50,7 +52,8 @@ func modeToPB(m string) pb.ReplicationMode { // FileReplicater is the interface that can save important data to all cluster // nodes. type FileReplicater interface { - ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error + GetMembers() ([]*pdpb.Member, error) + ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error } const drStatusFile = "DR_STATE" @@ -62,10 +65,11 @@ type ModeManager struct { initTime time.Time sync.RWMutex - config config.ReplicationModeConfig - storage *core.Storage - cluster opt.Cluster - fileReplicater FileReplicater + config config.ReplicationModeConfig + storage *core.Storage + cluster opt.Cluster + fileReplicater FileReplicater + replicatedMembers []uint64 drAutoSync drAutoSyncStatus // intermediate states of the recovery process @@ -256,9 +260,7 @@ func (m *ModeManager) drSwitchToAsyncWithLock() error { return err } dr := drAutoSyncStatus{State: drStateAsync, StateID: id} - if err := m.drPersistStatus(dr); err != nil { - return err - } + m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -282,9 +284,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error { } now := time.Now() dr := drAutoSyncStatus{State: drStateSyncRecover, StateID: id, RecoverStartTime: &now} - if err := m.drPersistStatus(dr); err != nil { - return err - } + m.drPersistStatusWithLock(dr) if err = m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -304,9 +304,7 @@ func (m *ModeManager) drSwitchToSync() error { return err } dr := drAutoSyncStatus{State: drStateSync, StateID: id} - if err := m.drPersistStatus(dr); err != nil { - return err - } + m.drPersistStatusWithLock(dr) if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -316,23 +314,48 @@ func (m *ModeManager) drSwitchToSync() error { return nil } -func (m *ModeManager) drPersistStatus(status drAutoSyncStatus) error { - if m.fileReplicater != nil { - ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) - defer cancel() - data, _ := json.Marshal(status) - if err := m.fileReplicater.ReplicateFileToAllMembers(ctx, drStatusFile, data); err != nil { +func (m *ModeManager) drPersistStatusWithLock(status drAutoSyncStatus) { + ctx, cancel := context.WithTimeout(context.Background(), persistFileTimeout) + defer cancel() + + members, err := m.fileReplicater.GetMembers() + if err != nil { + log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) + return + } + + data, _ := json.Marshal(status) + + m.replicatedMembers = m.replicatedMembers[:0] + for _, member := range members { + if err := m.fileReplicater.ReplicateFileToMember(ctx, member, drStatusFile, data); err != nil { log.Warn("failed to switch state", zap.String("replicate-mode", modeDRAutoSync), zap.String("new-state", status.State), errs.ZapError(err)) // Throw away the error to make it possible to switch to async when // primary and dr DC are disconnected. This will result in the // inability to accurately determine whether data is fully // synchronized when using dr DC to disaster recovery. - // TODO: introduce PD's leader-follower connection timeout to solve - // this issue. More details: https://github.com/tikv/pd/issues/2490 - return nil + // Since the member will not be in `replicatedMembers` list, PD will + // try to replicate state file later. + } else { + m.replicatedMembers = append(m.replicatedMembers, member.GetMemberId()) } } - return nil +} + +func (m *ModeManager) drCheckNeedPersistStatus(members []*pdpb.Member) bool { + m.RLock() + defer m.RUnlock() + return slice.AnyOf(members, func(i int) bool { // if there is any member in the new list + return slice.NoneOf(m.replicatedMembers, func(j int) bool { // not replicated + return m.replicatedMembers[j] == members[i].GetMemberId() + }) + }) +} + +func (m *ModeManager) drPersistStatus() { + m.Lock() + defer m.Unlock() + m.drPersistStatusWithLock(drAutoSyncStatus{State: m.drAutoSync.State, StateID: m.drAutoSync.StateID}) } func (m *ModeManager) drGetState() string { @@ -418,6 +441,8 @@ func (m *ModeManager) tickDR() { m.updateRecoverProgress(progress) } } + + m.checkReplicateFile() } func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primaryUpCount, drUpCount int) { @@ -444,6 +469,17 @@ func (m *ModeManager) checkStoreStatus() (primaryDownCount, drDownCount, primary return } +func (m *ModeManager) checkReplicateFile() { + members, err := m.fileReplicater.GetMembers() + if err != nil { + log.Warn("failed to get members", zap.String("replicate-mode", modeDRAutoSync)) + return + } + if m.drCheckNeedPersistStatus(members) { + m.drPersistStatus() + } +} + var ( regionScanBatchSize = 1024 regionMinSampleSize = 512 diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 4e087d447b5..55a12d818b4 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -22,6 +22,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/pdpb" pb "github.com/pingcap/kvproto/pkg/replication_modepb" "github.com/tikv/pd/pkg/mock/mockcluster" "github.com/tikv/pd/pkg/typeutil" @@ -142,13 +143,33 @@ func (s *testReplicationMode) TestStatus(c *C) { } type mockFileReplicator struct { - lastData string - err error + memberIDs []uint64 + lastData map[uint64]string + errors map[uint64]error } -func (rep *mockFileReplicator) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error { - rep.lastData = string(data) - return rep.err +func (rep *mockFileReplicator) GetMembers() ([]*pdpb.Member, error) { + var members []*pdpb.Member + for _, id := range rep.memberIDs { + members = append(members, &pdpb.Member{MemberId: id}) + } + return members, nil +} + +func (rep *mockFileReplicator) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error { + if err := rep.errors[member.GetMemberId()]; err != nil { + return err + } + rep.lastData[member.GetMemberId()] = string(data) + return nil +} + +func newMockReplicator(ids []uint64) *mockFileReplicator { + return &mockFileReplicator{ + memberIDs: ids, + lastData: make(map[uint64]string), + errors: make(map[uint64]error), + } } func (s *testReplicationMode) TestStateSwitch(c *C) { @@ -163,8 +184,8 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) - var replicator mockFileReplicator - rep, err := NewReplicationModeManager(conf, store, cluster, &replicator) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) c.Assert(err, IsNil) cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1"}) @@ -175,7 +196,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { c.Assert(rep.drGetState(), Equals, drStateSync) stateID := rep.drAutoSync.StateID c.Assert(stateID, Not(Equals), uint64(0)) - c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) + c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) assertStateIDUpdate := func() { c.Assert(rep.drAutoSync.StateID, Not(Equals), stateID) stateID = rep.drAutoSync.StateID @@ -185,7 +206,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateAsync) assertStateIDUpdate() - c.Assert(replicator.lastData, Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID)) + c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, stateID)) // add new store in dr zone. cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone2"}) @@ -213,7 +234,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { c.Assert(rep.drGetState(), Equals, drStateAsync) assertStateIDUpdate() rep.drSwitchToSync() - replicator.err = errors.New("fail to replicate") + replicator.errors[1] = errors.New("fail to replicate") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateAsync) assertStateIDUpdate() @@ -269,6 +290,46 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { assertStateIDUpdate() } +func (s *testReplicationMode) TestReplicateState(c *C) { + store := core.NewStorage(kv.NewMemoryKV()) + conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ + LabelKey: "zone", + Primary: "zone1", + DR: "zone2", + PrimaryReplicas: 2, + DRReplicas: 1, + WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, + WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, + }} + cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) + replicator := newMockReplicator([]uint64{1}) + rep, err := NewReplicationModeManager(conf, store, cluster, replicator) + c.Assert(err, IsNil) + + stateID := rep.drAutoSync.StateID + // replicate after initialized + c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) + + // repliate state to new member + replicator.memberIDs = append(replicator.memberIDs, 2, 3) + rep.checkReplicateFile() + c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) + c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) + + // inject error + replicator.errors[2] = errors.New("failed to persist") + rep.tickDR() // switch async since there is only one zone + newStateID := rep.drAutoSync.StateID + c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID)) + c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID)) + c.Assert(replicator.lastData[3], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID)) + + // clear error, replicate to node 2 next time + delete(replicator.errors, 2) + rep.checkReplicateFile() + c.Assert(replicator.lastData[2], Equals, fmt.Sprintf(`{"state":"async","state_id":%d}`, newStateID)) +} + func (s *testReplicationMode) TestAsynctimeout(c *C) { store := core.NewStorage(kv.NewMemoryKV()) conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{ diff --git a/server/server.go b/server/server.go index c3c39281f89..74f237b3b97 100644 --- a/server/server.go +++ b/server/server.go @@ -1362,28 +1362,7 @@ func (s *Server) reloadConfigFromKV() error { // ReplicateFileToAllMembers is used to synchronize state among all members. // Each member will write `data` to a local file named `name`. // For security reason, data should be in JSON format. -func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, data []byte) error { - members, err := s.GetMembers() - if err != nil { - return err - } - var errs []error - for _, member := range members { - if err := s.replicateFileToMember(ctx, member, name, data); err != nil { - errs = append(errs, err) - } - } - if len(errs) == 0 { - return nil - } - // join all error messages - for _, e := range errs[1:] { - errs[0] = errors.Wrap(errs[0], e.Error()) - } - return errs[0] -} - -func (s *Server) replicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error { +func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error { clientUrls := member.GetClientUrls() if len(clientUrls) == 0 { log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName())) diff --git a/tests/server/server_test.go b/tests/server/server_test.go index 37dc0f18693..e0e2dff7b8e 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -16,8 +16,6 @@ package server_test import ( "context" - "io/ioutil" - "path/filepath" "testing" . "github.com/pingcap/check" @@ -145,34 +143,3 @@ func (s *serverTestSuite) TestLeader(c *C) { return leader != leader1 }) } - -func (s *serverTestSuite) TestPersistFile(c *C) { - cluster, err := tests.NewTestCluster(s.ctx, 3) - defer cluster.Destroy() - c.Assert(err, IsNil) - - err = cluster.RunInitialServers() - c.Assert(err, IsNil) - - leader := cluster.WaitLeader() - c.Assert(leader, Not(Equals), "") - - follower := cluster.GetFollower() - err = cluster.GetServer(follower).Stop() - c.Assert(err, IsNil) - - content := `{"bar": 42}` - err = cluster.GetServer(leader).GetServer().ReplicateFileToAllMembers(context.Background(), "foo", []byte(content)) - c.Assert(err, NotNil) - - // check files are persisted except for the killed one. - for _, conf := range cluster.GetConfig().InitialServers { - data, err := ioutil.ReadFile(filepath.Join(conf.DataDir, "foo")) - if conf.Name == follower { - c.Assert(err, NotNil) - } else { - c.Assert(err, IsNil) - c.Assert(data, BytesEquals, []byte(content)) - } - } -} From c235a449a292a068092c194734af98ac0e445a62 Mon Sep 17 00:00:00 2001 From: disksing Date: Tue, 30 Nov 2021 20:16:33 +0800 Subject: [PATCH 2/3] fix ci Signed-off-by: disksing --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index 74f237b3b97..b1013de5b23 100644 --- a/server/server.go +++ b/server/server.go @@ -1359,7 +1359,7 @@ func (s *Server) reloadConfigFromKV() error { return nil } -// ReplicateFileToAllMembers is used to synchronize state among all members. +// ReplicateFileToMember is used to synchronize state to a member. // Each member will write `data` to a local file named `name`. // For security reason, data should be in JSON format. func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error { From c90e2b9336e9e14b278b49071a962db65b496dc6 Mon Sep 17 00:00:00 2001 From: disksing Date: Wed, 1 Dec 2021 14:22:44 +0800 Subject: [PATCH 3/3] fix test Signed-off-by: disksing --- server/replication/replication_mode_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index 55a12d818b4..84dffef17f8 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -54,7 +54,7 @@ func (s *testReplicationMode) TestInitial(c *C) { store := core.NewStorage(kv.NewMemoryKV()) conf := config.ReplicationModeConfig{ReplicationMode: modeMajority} cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) - rep, err := NewReplicationModeManager(conf, store, cluster, nil) + rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{Mode: pb.ReplicationMode_MAJORITY}) @@ -67,7 +67,7 @@ func (s *testReplicationMode) TestInitial(c *C) { WaitStoreTimeout: typeutil.Duration{Duration: time.Minute}, WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} - rep, err = NewReplicationModeManager(conf, store, cluster, nil) + rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, @@ -87,7 +87,7 @@ func (s *testReplicationMode) TestStatus(c *C) { WaitSyncTimeout: typeutil.Duration{Duration: time.Minute}, }} cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) - rep, err := NewReplicationModeManager(conf, store, cluster, nil) + rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) c.Assert(rep.GetReplicationStatus(), DeepEquals, &pb.ReplicationStatus{ Mode: pb.ReplicationMode_DR_AUTO_SYNC, @@ -125,7 +125,7 @@ func (s *testReplicationMode) TestStatus(c *C) { }) // test reload - rep, err = NewReplicationModeManager(conf, store, cluster, nil) + rep, err = NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) c.Assert(rep.drAutoSync.State, Equals, drStateSyncRecover) @@ -395,7 +395,7 @@ func (s *testReplicationMode) TestRecoverProgress(c *C) { }} cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) cluster.AddLabelsStore(1, 1, map[string]string{}) - rep, err := NewReplicationModeManager(conf, store, cluster, nil) + rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) prepare := func(n int, asyncRegions []int) { @@ -455,7 +455,7 @@ func (s *testReplicationMode) TestRecoverProgressWithSplitAndMerge(c *C) { }} cluster := mockcluster.NewCluster(s.ctx, config.NewTestOptions()) cluster.AddLabelsStore(1, 1, map[string]string{}) - rep, err := NewReplicationModeManager(conf, store, cluster, nil) + rep, err := NewReplicationModeManager(conf, store, cluster, newMockReplicator([]uint64{1})) c.Assert(err, IsNil) prepare := func(n int, asyncRegions []int) {