From 4e2dc59f3bdf7826b769882047a53c734d726a02 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 14:25:23 +0800 Subject: [PATCH 1/3] cluster: fix the bug that region statistics are not updated after flow-round-by-digit change close #4295 Signed-off-by: HunDunDM --- server/core/region.go | 3 +- server/core/region_test.go | 75 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/server/core/region.go b/server/core/region.go index a8d53123f7f..ed8197902fd 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -567,7 +567,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Once flow has changed, will update the cache. // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || - region.GetRoundBytesRead() != origin.GetRoundBytesRead() { + region.GetRoundBytesRead() != origin.GetRoundBytesRead() || + region.flowRoundDivisor != origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_test.go b/server/core/region_test.go index d752d5a7691..d422d34faee 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -232,6 +232,81 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) { } } +var _ = Suite(&testRegionGuideSuite{}) + +type testRegionGuideSuite struct { + RegionGuide RegionGuideFunc +} + +func (s *testRegionGuideSuite) SetUpSuite(c *C) { + s.RegionGuide = GenerateRegionGuideFunc(false) +} + +func (s *testRegionGuideSuite) TestNeedSync(c *C) { + meta := &metapb.Region{ + Id: 1000, + StartKey: []byte("a"), + EndKey: []byte("z"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter}, + }, + } + region := NewRegionInfo(meta, meta.Peers[0]) + + testcases := []struct { + optionsA []RegionCreateOption + optionsB []RegionCreateOption + needSync bool + }{ + { + optionsA: []RegionCreateOption{WithLeader(nil)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithLeader(meta.Peers[1])}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: true, + }, + } + + for _, t := range testcases { + regionA := region.Clone(t.optionsA...) + regionB := region.Clone(t.optionsB...) + _, _, _, needSync := s.RegionGuide(regionA, regionB) + c.Assert(needSync, Equals, t.needSync) + } +} + var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} From 3662a3aab4380b678209cc3465a2dc70230bd2fb Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 14:26:10 +0800 Subject: [PATCH 2/3] refine some code Signed-off-by: HunDunDM --- server/core/region_option.go | 3 ++- server/core/region_test.go | 5 +++++ server/grpc_service.go | 18 +++++++++--------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/server/core/region_option.go b/server/core/region_option.go index 9b1331ceb4a..fca63eb4992 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -39,8 +39,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption { // WithFlowRoundByDigit set the digit, which use to round to the nearest number func WithFlowRoundByDigit(digit int) RegionCreateOption { + flowRoundDivisor := uint64(math.Pow10(digit)) return func(region *RegionInfo) { - region.flowRoundDivisor = uint64(math.Pow10(digit)) + region.flowRoundDivisor = flowRoundDivisor } } diff --git a/server/core/region_test.go b/server/core/region_test.go index d422d34faee..92dfba88ec8 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -282,6 +282,11 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, needSync: true, }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)}, + needSync: false, + }, { optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, diff --git a/server/grpc_service.go b/server/grpc_service.go index 10bd59bc476..02c5d7db6ea 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -666,14 +666,14 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { - server := &heartbeatServer{stream: stream} - FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit var ( - forwardStream pdpb.PD_RegionHeartbeatClient - cancel context.CancelFunc - lastForwardedHost string - lastBind time.Time - errCh chan error + server = &heartbeatServer{stream: stream} + regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + forwardStream pdpb.PD_RegionHeartbeatClient + cancel context.CancelFunc + lastForwardedHost string + lastBind time.Time + errCh chan error ) defer func() { // cancel the forward stream @@ -750,11 +750,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit + regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit)) + region := core.RegionFromHeartbeat(request, regionFlowRoundByDigitOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() From 32652d7cb53688e84edad968a8c82c219f1dcc9c Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 15:25:20 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: HunDunDM --- server/core/region.go | 2 +- server/core/region_test.go | 13 +++++++++---- server/grpc_service.go | 18 +++++++++--------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/server/core/region.go b/server/core/region.go index ed8197902fd..aa403dc7667 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -568,7 +568,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || region.GetRoundBytesRead() != origin.GetRoundBytesRead() || - region.flowRoundDivisor != origin.flowRoundDivisor { + region.flowRoundDivisor < origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_test.go b/server/core/region_test.go index 92dfba88ec8..1f0dc59d2ce 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -262,19 +262,19 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { needSync bool }{ { - optionsA: []RegionCreateOption{WithLeader(nil)}, + optionsB: []RegionCreateOption{WithLeader(nil)}, needSync: true, }, { - optionsA: []RegionCreateOption{WithLeader(meta.Peers[1])}, + optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])}, needSync: true, }, { - optionsA: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, needSync: true, }, { - optionsA: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, needSync: true, }, { @@ -300,6 +300,11 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { { optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, needSync: true, }, } diff --git a/server/grpc_service.go b/server/grpc_service.go index 02c5d7db6ea..43568032ff3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -667,13 +667,13 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { var ( - server = &heartbeatServer{stream: stream} - regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) - forwardStream pdpb.PD_RegionHeartbeatClient - cancel context.CancelFunc - lastForwardedHost string - lastBind time.Time - errCh chan error + server = &heartbeatServer{stream: stream} + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + forwardStream pdpb.PD_RegionHeartbeatClient + cancel context.CancelFunc + lastForwardedHost string + lastBind time.Time + errCh chan error ) defer func() { // cancel the forward stream @@ -750,11 +750,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, regionFlowRoundByDigitOption) + region := core.RegionFromHeartbeat(request, flowRoundOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()