From 5f95924f36a65724cb532ea494ec6380094dc2cd Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 14:25:23 +0800 Subject: [PATCH 1/3] cluster: fix the bug that region statistics are not updated after flow-round-by-digit change close #4295 Signed-off-by: HunDunDM --- server/core/region.go | 3 +- server/core/region_test.go | 75 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/server/core/region.go b/server/core/region.go index 9dc93c53a1c..ca43dc63b84 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -550,7 +550,8 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Once flow has changed, will update the cache. // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || - region.GetRoundBytesRead() != origin.GetRoundBytesRead() { + region.GetRoundBytesRead() != origin.GetRoundBytesRead() || + region.flowRoundDivisor != origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_test.go b/server/core/region_test.go index 9b8948596dc..c577f6e2982 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -204,6 +204,81 @@ func (s *testRegionInfoSuite) TestRegionWriteRate(c *C) { } } +var _ = Suite(&testRegionGuideSuite{}) + +type testRegionGuideSuite struct { + RegionGuide RegionGuideFunc +} + +func (s *testRegionGuideSuite) SetUpSuite(c *C) { + s.RegionGuide = GenerateRegionGuideFunc(false) +} + +func (s *testRegionGuideSuite) TestNeedSync(c *C) { + meta := &metapb.Region{ + Id: 1000, + StartKey: []byte("a"), + EndKey: []byte("z"), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 100, Version: 100}, + Peers: []*metapb.Peer{ + {Id: 11, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 12, StoreId: 1, Role: metapb.PeerRole_Voter}, + {Id: 13, StoreId: 1, Role: metapb.PeerRole_Voter}, + }, + } + region := NewRegionInfo(meta, meta.Peers[0]) + + testcases := []struct { + optionsA []RegionCreateOption + optionsB []RegionCreateOption + needSync bool + }{ + { + optionsA: []RegionCreateOption{WithLeader(nil)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithLeader(meta.Peers[1])}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(4)}, + optionsB: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(2)}, + needSync: true, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, + optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: true, + }, + } + + for _, t := range testcases { + regionA := region.Clone(t.optionsA...) + regionB := region.Clone(t.optionsB...) + _, _, _, needSync := s.RegionGuide(regionA, regionB) + c.Assert(needSync, Equals, t.needSync) + } +} + var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} From 32a8acf20fb775dcdeea7d73de8d28fc4660bd3e Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 14:26:10 +0800 Subject: [PATCH 2/3] refine some code Signed-off-by: HunDunDM --- server/core/region_option.go | 3 ++- server/core/region_test.go | 5 +++++ server/grpc_service.go | 18 +++++++++--------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/server/core/region_option.go b/server/core/region_option.go index a51bd562a84..1cab90444c9 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -38,8 +38,9 @@ func WithDownPeers(downPeers []*pdpb.PeerStats) RegionCreateOption { // WithFlowRoundByDigit set the digit, which use to round to the nearest number func WithFlowRoundByDigit(digit int) RegionCreateOption { + flowRoundDivisor := uint64(math.Pow10(digit)) return func(region *RegionInfo) { - region.flowRoundDivisor = uint64(math.Pow10(digit)) + region.flowRoundDivisor = flowRoundDivisor } } diff --git a/server/core/region_test.go b/server/core/region_test.go index c577f6e2982..531c4c93099 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -254,6 +254,11 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(2)}, needSync: true, }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(250), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(349), WithFlowRoundByDigit(2)}, + needSync: false, + }, { optionsA: []RegionCreateOption{SetWrittenBytes(200), WithFlowRoundByDigit(4)}, optionsB: []RegionCreateOption{SetWrittenBytes(300), WithFlowRoundByDigit(4)}, diff --git a/server/grpc_service.go b/server/grpc_service.go index edf1ff8d37a..9a9cea86c32 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -501,14 +501,14 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { - server := &heartbeatServer{stream: stream} - FlowRoundByDigit := s.persistOptions.GetPDServerConfig().FlowRoundByDigit var ( - forwardStream pdpb.PD_RegionHeartbeatClient - cancel context.CancelFunc - lastForwardedHost string - lastBind time.Time - errCh chan error + server = &heartbeatServer{stream: stream} + regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + forwardStream pdpb.PD_RegionHeartbeatClient + cancel context.CancelFunc + lastForwardedHost string + lastBind time.Time + errCh chan error ) defer func() { // cancel the forward stream @@ -585,11 +585,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - FlowRoundByDigit = s.persistOptions.GetPDServerConfig().FlowRoundByDigit + regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, core.WithFlowRoundByDigit(FlowRoundByDigit)) + region := core.RegionFromHeartbeat(request, regionFlowRoundByDigitOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc() From 8e3329e63a8c7fd49a7fa3598ddec46b786b1564 Mon Sep 17 00:00:00 2001 From: HunDunDM Date: Mon, 8 Nov 2021 15:25:20 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: HunDunDM --- server/core/region.go | 2 +- server/core/region_test.go | 13 +++++++++---- server/grpc_service.go | 18 +++++++++--------- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/server/core/region.go b/server/core/region.go index ca43dc63b84..b1917d3fd58 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -551,7 +551,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { // Because keys and bytes are strongly related, only bytes are judged. if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() || region.GetRoundBytesRead() != origin.GetRoundBytesRead() || - region.flowRoundDivisor != origin.flowRoundDivisor { + region.flowRoundDivisor < origin.flowRoundDivisor { saveCache, needSync = true, true } diff --git a/server/core/region_test.go b/server/core/region_test.go index 531c4c93099..70acdfe3f41 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -234,19 +234,19 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { needSync bool }{ { - optionsA: []RegionCreateOption{WithLeader(nil)}, + optionsB: []RegionCreateOption{WithLeader(nil)}, needSync: true, }, { - optionsA: []RegionCreateOption{WithLeader(meta.Peers[1])}, + optionsB: []RegionCreateOption{WithLeader(meta.Peers[1])}, needSync: true, }, { - optionsA: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, + optionsB: []RegionCreateOption{WithPendingPeers(meta.Peers[1:2])}, needSync: true, }, { - optionsA: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, + optionsB: []RegionCreateOption{WithDownPeers([]*pdpb.PeerStats{{Peer: meta.Peers[1], DownSeconds: 600}})}, needSync: true, }, { @@ -272,6 +272,11 @@ func (s *testRegionGuideSuite) TestNeedSync(c *C) { { optionsA: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, optionsB: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + needSync: false, + }, + { + optionsA: []RegionCreateOption{SetWrittenBytes(0), WithFlowRoundByDigit(2)}, + optionsB: []RegionCreateOption{SetWrittenBytes(100000), WithFlowRoundByDigit(127)}, needSync: true, }, } diff --git a/server/grpc_service.go b/server/grpc_service.go index 9a9cea86c32..585bd434f7a 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -502,13 +502,13 @@ func (s *heartbeatServer) Recv() (*pdpb.RegionHeartbeatRequest, error) { // RegionHeartbeat implements gRPC PDServer. func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { var ( - server = &heartbeatServer{stream: stream} - regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) - forwardStream pdpb.PD_RegionHeartbeatClient - cancel context.CancelFunc - lastForwardedHost string - lastBind time.Time - errCh chan error + server = &heartbeatServer{stream: stream} + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + forwardStream pdpb.PD_RegionHeartbeatClient + cancel context.CancelFunc + lastForwardedHost string + lastBind time.Time + errCh chan error ) defer func() { // cancel the forward stream @@ -585,11 +585,11 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "bind").Inc() s.hbStreams.BindStream(storeID, server) // refresh FlowRoundByDigit - regionFlowRoundByDigitOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) + flowRoundOption = core.WithFlowRoundByDigit(s.persistOptions.GetPDServerConfig().FlowRoundByDigit) lastBind = time.Now() } - region := core.RegionFromHeartbeat(request, regionFlowRoundByDigitOption) + region := core.RegionFromHeartbeat(request, flowRoundOption) if region.GetLeader() == nil { log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "report", "invalid-leader").Inc()