Skip to content

Commit

Permalink
cluster: save to the region cache when pending-peers or down-peers ch…
Browse files Browse the repository at this point in the history
…ange (#3462)

* cluster: save to the region cache when pending-peers or down-peers change

Signed-off-by: HunDunDM <hundundm@gmail.com>

* add unit test

Signed-off-by: HunDunDM <hundundm@gmail.com>

* fix typo

Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM authored Mar 12, 2021
1 parent 0b7e2e8 commit b1ba2d0
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 6 deletions.
10 changes: 6 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,13 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
saveCache, needSync = true, true
}
if len(region.GetDownPeers()) > 0 || len(region.GetPendingPeers()) > 0 {
saveCache = true
if !core.SortedPeersStatsEqual(region.GetDownPeers(), origin.GetDownPeers()) {
log.Debug("down-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(origin.GetDownPeers()) > 0 || len(origin.GetPendingPeers()) > 0 {
saveCache = true
if !core.SortedPeersEqual(region.GetPendingPeers(), origin.GetPendingPeers()) {
log.Debug("pending-peers changed", zap.Uint64("region-id", region.GetID()))
saveCache, needSync = true, true
}
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
Expand Down
41 changes: 41 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
replicationStatus: heartbeat.GetReplicationStatus(),
}

sort.Sort(peerStatsSlice(region.downPeers))
sort.Sort(peerSlice(region.pendingPeers))

classifyVoterAndLearner(region)
return region
}
Expand Down Expand Up @@ -729,6 +732,44 @@ func (s peerSlice) Less(i, j int) bool {
return s[i].GetId() < s[j].GetId()
}

// SortedPeersEqual judges whether two sorted `peerSlice` are equal
func SortedPeersEqual(peersA, peersB []*metapb.Peer) bool {
if len(peersA) != len(peersB) {
return false
}
for i, peer := range peersA {
if peer.GetId() != peersB[i].GetId() {
return false
}
}
return true
}

type peerStatsSlice []*pdpb.PeerStats

func (s peerStatsSlice) Len() int {
return len(s)
}
func (s peerStatsSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s peerStatsSlice) Less(i, j int) bool {
return s[i].GetPeer().GetId() < s[j].GetPeer().GetId()
}

// SortedPeersStatsEqual judges whether two sorted `peerStatsSlice` are equal
func SortedPeersStatsEqual(peersA, peersB []*pdpb.PeerStats) bool {
if len(peersA) != len(peersB) {
return false
}
for i, peerStats := range peersA {
if peerStats.GetPeer().GetId() != peersB[i].GetPeer().GetId() {
return false
}
}
return true
}

// shouldRemoveFromSubTree return true when the region leader changed, peer transferred,
// new peer was created, learners changed, pendingPeers changed, and so on.
func (r *RegionsInfo) shouldRemoveFromSubTree(region *RegionInfo, origin *RegionInfo) bool {
Expand Down
8 changes: 6 additions & 2 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package core

import (
"sort"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/replication_modepb"
Expand All @@ -28,14 +30,16 @@ type RegionCreateOption func(region *RegionInfo)
// WithDownPeers sets the down peers for the region.
func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption {
return func(region *RegionInfo) {
region.downPeers = downPeers
region.downPeers = append(downPeers[:0:0], downPeers...)
sort.Sort(peerStatsSlice(region.downPeers))
}
}

// WithPendingPeers sets the pending peers for the region.
func WithPendingPeers(pendingPeers []*metapb.Peer) RegionCreateOption {
return func(region *RegionInfo) {
region.pendingPeers = pendingPeers
region.pendingPeers = append(pendingPeers[:0:0], pendingPeers...)
sort.Sort(peerSlice(region.pendingPeers))
}
}

Expand Down
93 changes: 93 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/id"
)
Expand All @@ -30,6 +31,98 @@ func TestCore(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testRegionInfoSuite{})

type testRegionInfoSuite struct{}

func (s *testRegionInfoSuite) TestSortedEqual(c *C) {
testcases := []struct {
idsA []uint64
idsB []uint64
isEqual bool
}{
{
[]uint64{},
[]uint64{},
true,
},
{
[]uint64{},
[]uint64{1, 2},
false,
},
{
[]uint64{1, 2},
[]uint64{1, 2},
true,
},
{
[]uint64{1, 2},
[]uint64{2, 1},
true,
},
{
[]uint64{1, 2},
[]uint64{1, 2, 3},
false,
},
{
[]uint64{1, 2, 3},
[]uint64{2, 3, 1},
true,
},
{
[]uint64{1, 3},
[]uint64{1, 2},
false,
},
}

meta := &metapb.Region{
Id: 100,
Peers: []*metapb.Peer{
{
Id: 1,
StoreId: 10,
},
{
Id: 3,
StoreId: 30,
},
{
Id: 2,
StoreId: 20,
},
{
Id: 4,
StoreId: 40,
},
},
}

region := NewRegionInfo(meta, meta.Peers[0])

for _, t := range testcases {
downPeersA := make([]*pdpb.PeerStats, 0)
downPeersB := make([]*pdpb.PeerStats, 0)
pendingPeersA := make([]*metapb.Peer, 0)
pendingPeersB := make([]*metapb.Peer, 0)
for _, i := range t.idsA {
downPeersA = append(downPeersA, &pdpb.PeerStats{Peer: meta.Peers[i]})
pendingPeersA = append(pendingPeersA, meta.Peers[i])
}
for _, i := range t.idsB {
downPeersB = append(downPeersB, &pdpb.PeerStats{Peer: meta.Peers[i]})
pendingPeersB = append(pendingPeersB, meta.Peers[i])
}

regionA := region.Clone(WithDownPeers(downPeersA), WithPendingPeers(pendingPeersA))
regionB := region.Clone(WithDownPeers(downPeersB), WithPendingPeers(pendingPeersB))
c.Assert(SortedPeersStatsEqual(regionA.GetDownPeers(), regionB.GetDownPeers()), Equals, t.isEqual)
c.Assert(SortedPeersEqual(regionA.GetPendingPeers(), regionB.GetPendingPeers()), Equals, t.isEqual)
}
}

var _ = Suite(&testRegionMapSuite{})

type testRegionMapSuite struct{}
Expand Down

0 comments on commit b1ba2d0

Please sign in to comment.