Skip to content

Commit

Permalink
replication mode: fix wrong available store list (#7222)
Browse files Browse the repository at this point in the history
close #7221

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Oct 18, 2023
1 parent a85f29c commit cb9c70c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 36 deletions.
49 changes: 21 additions & 28 deletions pkg/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,6 @@ func (m *ModeManager) Run(ctx context.Context) {
wg.Wait()
}

func storeIDs(stores []*core.StoreInfo) []uint64 {
ids := make([]uint64, len(stores))
for i, s := range stores {
ids[i] = s.GetID()
}
return ids
}

func minimalUpVoters(rule *placement.Rule, upStores, downStores []*core.StoreInfo) int {
if rule.Role == placement.Learner {
return 0
Expand Down Expand Up @@ -411,7 +403,7 @@ func (m *ModeManager) tickUpdateState() {

drTickCounter.Inc()

stores := m.checkStoreStatus()
stores, storeIDs := m.checkStoreStatus()

var primaryHasVoter, drHasVoter bool
var totalVoter, totalUpVoter int
Expand Down Expand Up @@ -440,10 +432,10 @@ func (m *ModeManager) tickUpdateState() {
hasMajority := totalUpVoter*2 > totalVoter

log.Debug("replication store status",
zap.Uint64s("up-primary", storeIDs(stores[primaryUp])),
zap.Uint64s("up-dr", storeIDs(stores[drUp])),
zap.Uint64s("down-primary", storeIDs(stores[primaryDown])),
zap.Uint64s("down-dr", storeIDs(stores[drDown])),
zap.Uint64s("up-primary", storeIDs[primaryUp]),
zap.Uint64s("up-dr", storeIDs[drUp]),
zap.Uint64s("down-primary", storeIDs[primaryDown]),
zap.Uint64s("down-dr", storeIDs[drDown]),
zap.Bool("can-sync", canSync),
zap.Bool("has-majority", hasMajority),
)
Expand All @@ -470,31 +462,31 @@ func (m *ModeManager) tickUpdateState() {
case drStateSync:
// If hasMajority is false, the cluster is always unavailable. Switch to async won't help.
if !canSync && hasMajority {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
m.drSwitchToAsyncWait(storeIDs[primaryUp])
}
case drStateAsyncWait:
if canSync {
m.drSwitchToSync()
break
}
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs(stores[primaryUp])) {
m.drSwitchToAsyncWait(storeIDs(stores[primaryUp]))
if oldAvailableStores := m.drGetAvailableStores(); !reflect.DeepEqual(oldAvailableStores, storeIDs[primaryUp]) {
m.drSwitchToAsyncWait(storeIDs[primaryUp])
break
}
if m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateAsync:
if canSync {
m.drSwitchToSyncRecover()
break
}
if !reflect.DeepEqual(m.drGetAvailableStores(), stores[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs(stores[primaryUp])) {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
if !reflect.DeepEqual(m.drGetAvailableStores(), storeIDs[primaryUp]) && m.drCheckStoreStateUpdated(storeIDs[primaryUp]) {
m.drSwitchToAsync(storeIDs[primaryUp])
}
case drStateSyncRecover:
if !canSync && hasMajority {
m.drSwitchToAsync(storeIDs(stores[primaryUp]))
m.drSwitchToAsync(storeIDs[primaryUp])
} else {
m.updateProgress()
progress := m.estimateProgress()
Expand Down Expand Up @@ -569,39 +561,40 @@ const (
storeStatusTypeCount
)

func (m *ModeManager) checkStoreStatus() [][]*core.StoreInfo {
func (m *ModeManager) checkStoreStatus() ([][]*core.StoreInfo, [][]uint64) {
m.RLock()
defer m.RUnlock()
stores := make([][]*core.StoreInfo, storeStatusTypeCount)
stores, storeIDs := make([][]*core.StoreInfo, storeStatusTypeCount), make([][]uint64, storeStatusTypeCount)
for _, s := range m.cluster.GetStores() {
if s.IsRemoved() {
continue
}
// learner peers do not participate in major commit or vote, so it should not count in primary/dr as a normal store.
if s.GetRegionCount() == s.GetLearnerCount() {
continue
}
down := s.DownTime() >= m.config.DRAutoSync.WaitStoreTimeout.Duration
labelValue := s.GetLabelValue(m.config.DRAutoSync.LabelKey)
if labelValue == m.config.DRAutoSync.Primary {
if down {
stores[primaryDown] = append(stores[primaryDown], s)
storeIDs[primaryDown] = append(storeIDs[primaryDown], s.GetID())
} else {
stores[primaryUp] = append(stores[primaryUp], s)
storeIDs[primaryUp] = append(storeIDs[primaryUp], s.GetID())
}
}
if labelValue == m.config.DRAutoSync.DR {
if down {
stores[drDown] = append(stores[drDown], s)
storeIDs[drDown] = append(storeIDs[drDown], s.GetID())
} else {
stores[drUp] = append(stores[drUp], s)
storeIDs[drUp] = append(storeIDs[drUp], s.GetID())
}
}
}
for i := range stores {
sort.Slice(stores[i], func(a, b int) bool { return stores[i][a].GetID() < stores[i][b].GetID() })
sort.Slice(storeIDs[i], func(a, b int) bool { return storeIDs[i][a] < storeIDs[i][b] })
}
return stores
return stores, storeIDs
}

// UpdateStoreDRStatus saves the dr-autosync status of a store.
Expand Down
59 changes: 51 additions & 8 deletions pkg/replication/replication_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ func TestStateSwitch(t *testing.T) {
rep.tickUpdateState()
re.Equal(drStateSync, rep.drGetState())

// once the voter node down, even learner node up, swith to async state.
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
// once zone2 down, swith to async state.
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())

Expand All @@ -264,18 +264,18 @@ func TestStateSwitch(t *testing.T) {
re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit())

// async_wait -> async_wait
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1])
setStoreState(cluster, "down", "up", "up", "up", "down", "up")
setStoreState(cluster, "down", "up", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1])
setStoreState(cluster, "up", "down", "up", "up", "down", "up")
setStoreState(cluster, "up", "down", "up", "up", "down", "down")
rep.tickUpdateState()
assertStateIDUpdate()
rep.tickReplicateStatus()
Expand All @@ -294,7 +294,7 @@ func TestStateSwitch(t *testing.T) {
re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1])

// async -> async
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
// store 2 won't be available before it syncs status.
rep.tickReplicateStatus()
Expand All @@ -319,14 +319,14 @@ func TestStateSwitch(t *testing.T) {
// sync_recover -> async
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())
setStoreState(cluster, "up", "up", "up", "up", "down", "up")
setStoreState(cluster, "up", "up", "up", "up", "down", "down")
rep.tickUpdateState()
re.Equal(drStateAsync, rep.drGetState())
assertStateIDUpdate()
// lost majority, does not switch to async.
rep.drSwitchToSyncRecover()
assertStateIDUpdate()
setStoreState(cluster, "down", "down", "up", "up", "down", "up")
setStoreState(cluster, "down", "down", "up", "up", "down", "down")
rep.tickUpdateState()
re.Equal(drStateSyncRecover, rep.drGetState())

Expand Down Expand Up @@ -636,6 +636,8 @@ func TestComplexPlacementRules(t *testing.T) {
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "down", "up", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1])

// reset to sync
setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up")
Expand Down Expand Up @@ -695,6 +697,47 @@ func TestComplexPlacementRules2(t *testing.T) {
setStoreState(cluster, "up", "up", "up", "up", "down", "down", "up")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
}

func TestComplexPlacementRules3(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
store := storage.NewStorageWithMemoryBackend()
conf := config.ReplicationModeConfig{ReplicationMode: modeDRAutoSync, DRAutoSync: config.DRAutoSyncReplicationConfig{
LabelKey: "zone",
Primary: "zone1",
DR: "zone2",
WaitStoreTimeout: typeutil.Duration{Duration: time.Minute},
}}
cluster := mockcluster.NewCluster(ctx, mockconfig.NewTestOptions())
replicator := newMockReplicator([]uint64{1})
rep, err := NewReplicationModeManager(conf, store, cluster, replicator)
re.NoError(err)
cluster.GetRuleManager().SetAllGroupBundles(
genPlacementRuleConfig([]ruleConfig{
{key: "logic", value: "logic1", role: placement.Voter, count: 2},
{key: "logic", value: "logic2", role: placement.Learner, count: 1},
{key: "logic", value: "logic3", role: placement.Voter, count: 1},
}), true)

cluster.AddLabelsStore(1, 1, map[string]string{"zone": "zone1", "logic": "logic1"})
cluster.AddLabelsStore(2, 1, map[string]string{"zone": "zone1", "logic": "logic1"})
cluster.AddLabelsStore(3, 1, map[string]string{"zone": "zone1", "logic": "logic2"})
cluster.AddLabelsStore(4, 1, map[string]string{"zone": "zone1", "logic": "logic2"})
cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2", "logic": "logic3"})

// initial state is sync
re.Equal(drStateSync, rep.drGetState())

// zone2 down, switch state, available stores should contain logic2 (learner)
setStoreState(cluster, "up", "up", "up", "up", "down")
rep.tickUpdateState()
re.Equal(drStateAsyncWait, rep.drGetState())
rep.tickReplicateStatus()
re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1])
}

func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo {
Expand Down

0 comments on commit cb9c70c

Please sign in to comment.