From 950b3636d9a0ac005b577cf7a19d42a455861c38 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 17 Mar 2022 11:26:30 +0800 Subject: [PATCH] dr-autosync: do not switch to async when there is not enough peers (#4738) close tikv/pd#4737 Signed-off-by: disksing Co-authored-by: Ti Chi Robot --- server/replication/replication_mode.go | 2 +- server/replication/replication_mode_test.go | 55 +++++++++++---------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/server/replication/replication_mode.go b/server/replication/replication_mode.go index 5dd62c18fd3..ea01e7a1713 100644 --- a/server/replication/replication_mode.go +++ b/server/replication/replication_mode.go @@ -482,7 +482,7 @@ func (m *ModeManager) tickDR() { m.drSwitchToAsync(stores[primaryUp]) } case drStateSyncRecover: - if !canSync { + if !canSync && hasMajority { m.drSwitchToAsync(stores[primaryUp]) } else { m.updateProgress() diff --git a/server/replication/replication_mode_test.go b/server/replication/replication_mode_test.go index ea7d00b3aca..74b68d89a92 100644 --- a/server/replication/replication_mode_test.go +++ b/server/replication/replication_mode_test.go @@ -233,18 +233,15 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { // sync -> async_wait rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync) - s.setStoreState(cluster, 1, "down") + s.setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync) - s.setStoreState(cluster, 2, "down") - s.setStoreState(cluster, 3, "down") + s.setStoreState(cluster, "down", "down", "up", "up", "up", "up") + s.setStoreState(cluster, "down", "down", "down", "up", "up", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync) // cannot guarantee majority, keep sync. - s.setStoreState(cluster, 1, "up") - s.setStoreState(cluster, 2, "up") - s.setStoreState(cluster, 3, "up") - s.setStoreState(cluster, 5, "down") + s.setStoreState(cluster, "up", "up", "up", "up", "down", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateAsyncWait) assertStateIDUpdate() @@ -257,22 +254,21 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { delete(replicator.errors, 1) // async_wait -> sync - s.setStoreState(cluster, 5, "up") + s.setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync) // async_wait -> async_wait - s.setStoreState(cluster, 5, "down") + s.setStoreState(cluster, "up", "up", "up", "up", "down", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateAsyncWait) assertStateIDUpdate() c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID)) - s.setStoreState(cluster, 1, "down") + s.setStoreState(cluster, "down", "up", "up", "up", "down", "up") rep.tickDR() assertStateIDUpdate() c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID)) - s.setStoreState(cluster, 2, "down") - s.setStoreState(cluster, 1, "up") + s.setStoreState(cluster, "up", "down", "up", "up", "down", "up") rep.tickDR() assertStateIDUpdate() c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID)) @@ -289,7 +285,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID)) // async -> async - s.setStoreState(cluster, 2, "up") + s.setStoreState(cluster, "up", "up", "up", "up", "down", "up") rep.tickDR() // store 2 won't be available before it syncs status. c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID)) @@ -299,12 +295,12 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID)) // async -> sync_recover - s.setStoreState(cluster, 5, "up") + s.setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSyncRecover) assertStateIDUpdate() rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) - s.setStoreState(cluster, 1, "down") + s.setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSyncRecover) assertStateIDUpdate() @@ -312,16 +308,21 @@ func (s *testReplicationMode) TestStateSwitch(c *C) { // sync_recover -> async rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSyncRecover) - s.setStoreState(cluster, 1, "up") - s.setStoreState(cluster, 5, "down") + s.setStoreState(cluster, "up", "up", "up", "up", "down", "up") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateAsync) assertStateIDUpdate() + // lost majority, does not switch to async. + rep.drSwitchToSyncRecover() + assertStateIDUpdate() + s.setStoreState(cluster, "down", "down", "up", "up", "down", "up") + rep.tickDR() + c.Assert(rep.drGetState(), Equals, drStateSyncRecover) // sync_recover -> sync rep.drSwitchToSyncRecover() assertStateIDUpdate() - s.setStoreState(cluster, 5, "up") + s.setStoreState(cluster, "up", "up", "up", "up", "up", "up") cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) region := cluster.GetRegion(1) @@ -410,7 +411,7 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) { cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"}) cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone2"}) - s.setStoreState(cluster, 3, "down") + s.setStoreState(cluster, "up", "up", "down") rep.tickDR() c.Assert(rep.drGetState(), Equals, drStateSync) // cannot switch state due to recently start @@ -428,14 +429,16 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) { c.Assert(rep.drGetState(), Equals, drStateAsyncWait) } -func (s *testReplicationMode) setStoreState(cluster *mockcluster.Cluster, id uint64, state string) { - store := cluster.GetStore(id) - if state == "down" { - store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano() - } else if state == "up" { - store.GetMeta().LastHeartbeat = time.Now().UnixNano() +func (s *testReplicationMode) setStoreState(cluster *mockcluster.Cluster, states ...string) { + for i, state := range states { + store := cluster.GetStore(uint64(i + 1)) + if state == "down" { + store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano() + } else if state == "up" { + store.GetMeta().LastHeartbeat = time.Now().UnixNano() + } + cluster.PutStore(store) } - cluster.PutStore(store) } func (s *testReplicationMode) TestRecoverProgress(c *C) {