diff --git a/server/core/region.go b/server/core/region.go index a8d53123f7f..aa403dc7667 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -567,7 +567,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Once flow has changed, will update the cache. // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || - region.GetRoundBytesRead() != origin.GetRoundBytesRead() { + region.GetRoundBytesRead() != origin.GetRoundBytesRead() || + region.flowRoundDivisor < origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_option.go b/server/core/region_option.go index 9b1331ceb4a..fca63eb4992 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -39,8 +39,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption { // WithFlowRoundByDigit set the digit, which use to round to the nearest number func WithFlowRoundByDigit(digit int) RegionCreateOption { + flowRoundDivisor := uint64(math.Pow10(digit)) return func(region *RegionInfo) { - region.flowRoundDivisor = uint64(math.Pow10(digit)) + region.flowRoundDivisor = flowRoundDivisor } } diff --git a/server/core/region_test.go b/server/core/region_test.go index d752d5a7691..1f0dc59d2ce 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -232,6 +232,91 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) { } } +var _ = Suite(&testRegionGuideSuite{}) + +type testRegionGuideSuite struct { + RegionGuide RegionGuideFunc +} + +func (s *testRegionGuideSuite) SetUpSuite(c *C) { + s.RegionGuide = GenerateRegionGuideFunc(false) +} + +func (s *testRegionGuideSuite) TestNeedSync(c *C) { + meta := &metapb.Region{ + Id: 1000, + StartKey: []byte("a"), + EndKey: []byte("z"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter}, + }, + } + region := NewRegionInfo(meta, meta.Peers[0]) + + testcases := []struct { + optionsA []RegionCreateOption + optionsB []RegionCreateOption + needSync bool + }{ + { + optionsB: []RegionCreateOption{WithLeader(nil)}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + needSync: true, + }, + { + optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + needSync: true, + }, + } + + for _, t := range testcases { + regionA := region.Clone(t.optionsA...) + regionB := region.Clone(t.optionsB...) + _, _, _, needSync := s.RegionGuide(regionA, regionB) + c.Assert(needSync, Equals, t.needSync) + } +} + var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} diff --git a/server/grpc_service.go b/server/grpc_service.go index 10bd59bc476..43568032ff3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -666,9 +666,9 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { - server := &heartbeatServer{stream: stream} - FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit var ( + server = &heartbeatServer{stream: stream} + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) forwardStream pdpb.PD_RegionHeartbeatClient cancel context.CancelFunc lastForwardedHost string @@ -750,11 +750,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit)) + region := core.RegionFromHeartbeat(request, flowRoundOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()