diff --git a/server/core/region.go b/server/core/region.go index 9dc93c53a1c..b1917d3fd58 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -550,7 +550,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Once flow has changed, will update the cache. // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || - region.GetRoundBytesRead() != origin.GetRoundBytesRead() { + region.GetRoundBytesRead() != origin.GetRoundBytesRead() || + region.flowRoundDivisor < origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_option.go b/server/core/region_option.go index a51bd562a84..1cab90444c9 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -38,8 +38,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption { // WithFlowRoundByDigit set the digit, which use to round to the nearest number func WithFlowRoundByDigit(digit int) RegionCreateOption { + flowRoundDivisor := uint64(math.Pow10(digit)) return func(region *RegionInfo) { - region.flowRoundDivisor = uint64(math.Pow10(digit)) + region.flowRoundDivisor = flowRoundDivisor } } diff --git a/server/core/region_test.go b/server/core/region_test.go index 9b8948596dc..70acdfe3f41 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -204,6 +204,91 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) { } } +var _ = Suite(&testRegionGuideSuite{}) + +type testRegionGuideSuite struct { + RegionGuide RegionGuideFunc +} + +func (s *testRegionGuideSuite) SetUpSuite(c *C) { + s.RegionGuide = GenerateRegionGuideFunc(false) +} + +func (s *testRegionGuideSuite) TestNeedSync(c *C) { + meta := &metapb.Region{ + Id: 1000, + StartKey: []byte("a"), + EndKey: []byte("z"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter}, + }, + } + region := NewRegionInfo(meta, meta.Peers[0]) + + testcases := []struct { + optionsA []RegionCreateOption + optionsB []RegionCreateOption + needSync bool + }{ + { + optionsB: []RegionCreateOption{WithLeader(nil)}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + needSync: true, + }, + } + + for _, t := range testcases { + regionA := region.Clone(t.optionsA...) + regionB := region.Clone(t.optionsB...) + _, _, _, needSync := s.RegionGuide(regionA, regionB) + c.Assert(needSync, Equals, t.needSync) + } +} + var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} diff --git a/server/grpc_service.go b/server/grpc_service.go index edf1ff8d37a..585bd434f7a 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -501,9 +501,9 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { - server := &heartbeatServer{stream: stream} - FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit var ( + server = &heartbeatServer{stream: stream} + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) forwardStream pdpb.PD_RegionHeartbeatClient cancel context.CancelFunc lastForwardedHost string @@ -585,11 +585,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit)) + region := core.RegionFromHeartbeat(request, flowRoundOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()