Skip to content

Commit

Permalink
dr-autosync: do not switch to async when there is not enough peers (#…
Browse files Browse the repository at this point in the history
…4738)

close #4737

Signed-off-by: disksing <i@disksing.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
disksing and ti-chi-bot committed Mar 17, 2022
1 parent 766e568 commit 950b363
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (m *ModeManager) tickDR() {
m.drSwitchToAsync(stores[primaryUp])
}
case drStateSyncRecover:
if !canSync {
if !canSync && hasMajority {
m.drSwitchToAsync(stores[primaryUp])
} else {
m.updateProgress()
Expand Down
55 changes: 29 additions & 26 deletions server/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,15 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
// sync -> async_wait
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
s.setStoreState(cluster, 1, "down")
s.setStoreState(cluster, "down", "up", "up", "up", "up", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)
s.setStoreState(cluster, 2, "down")
s.setStoreState(cluster, 3, "down")
s.setStoreState(cluster, "down", "down", "up", "up", "up", "up")
s.setStoreState(cluster, "down", "down", "down", "up", "up", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync) // cannot guarantee majority, keep sync.

s.setStoreState(cluster, 1, "up")
s.setStoreState(cluster, 2, "up")
s.setStoreState(cluster, 3, "up")
s.setStoreState(cluster, 5, "down")
s.setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsyncWait)
assertStateIDUpdate()
Expand All @@ -257,22 +254,21 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
delete(replicator.errors, 1)

// async_wait -> sync
s.setStoreState(cluster, 5, "up")
s.setStoreState(cluster, "up", "up", "up", "up", "up", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync)

// async_wait -> async_wait
s.setStoreState(cluster, 5, "down")
s.setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsyncWait)
assertStateIDUpdate()
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID))
s.setStoreState(cluster, 1, "down")
s.setStoreState(cluster, "down", "up", "up", "up", "down", "up")
rep.tickDR()
assertStateIDUpdate()
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID))
s.setStoreState(cluster, 2, "down")
s.setStoreState(cluster, 1, "up")
s.setStoreState(cluster, "up", "down", "up", "up", "down", "up")
rep.tickDR()
assertStateIDUpdate()
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID))
Expand All @@ -289,7 +285,7 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID))

// async -> async
s.setStoreState(cluster, 2, "up")
s.setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
// store 2 won't be available before it syncs status.
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID))
Expand All @@ -299,29 +295,34 @@ func (s *testReplicationMode) TestStateSwitch(c *C) {
c.Assert(replicator.lastData[1], Equals, fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID))

// async -> sync_recover
s.setStoreState(cluster, 5, "up")
s.setStoreState(cluster, "up", "up", "up", "up", "up", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
assertStateIDUpdate()
rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5})
s.setStoreState(cluster, 1, "down")
s.setStoreState(cluster, "down", "up", "up", "up", "up", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
assertStateIDUpdate()

// sync_recover -> async
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)
s.setStoreState(cluster, 1, "up")
s.setStoreState(cluster, 5, "down")
s.setStoreState(cluster, "up", "up", "up", "up", "down", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateAsync)
assertStateIDUpdate()
// lost majority, does not switch to async.
rep.drSwitchToSyncRecover()
assertStateIDUpdate()
s.setStoreState(cluster, "down", "down", "up", "up", "down", "up")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSyncRecover)

// sync_recover -> sync
rep.drSwitchToSyncRecover()
assertStateIDUpdate()
s.setStoreState(cluster, 5, "up")
s.setStoreState(cluster, "up", "up", "up", "up", "up", "up")
cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5)
region := cluster.GetRegion(1)

Expand Down Expand Up @@ -410,7 +411,7 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) {
cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1"})
cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone2"})

s.setStoreState(cluster, 3, "down")
s.setStoreState(cluster, "up", "up", "down")
rep.tickDR()
c.Assert(rep.drGetState(), Equals, drStateSync) // cannot switch state due to recently start

Expand All @@ -428,14 +429,16 @@ func (s *testReplicationMode) TestAsynctimeout(c *C) {
c.Assert(rep.drGetState(), Equals, drStateAsyncWait)
}

func (s *testReplicationMode) setStoreState(cluster *mockcluster.Cluster, id uint64, state string) {
store := cluster.GetStore(id)
if state == "down" {
store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano()
} else if state == "up" {
store.GetMeta().LastHeartbeat = time.Now().UnixNano()
func (s *testReplicationMode) setStoreState(cluster *mockcluster.Cluster, states ...string) {
for i, state := range states {
store := cluster.GetStore(uint64(i + 1))
if state == "down" {
store.GetMeta().LastHeartbeat = time.Now().Add(-time.Minute * 10).UnixNano()
} else if state == "up" {
store.GetMeta().LastHeartbeat = time.Now().UnixNano()
}
cluster.PutStore(store)
}
cluster.PutStore(store)
}

func (s *testReplicationMode) TestRecoverProgress(c *C) {
Expand Down

0 comments on commit 950b363

Please sign in to comment.