From e5524877ab669ff6cc4f2fa84e7707a8da34fdc4 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 8 Aug 2024 10:39:03 +0800 Subject: [PATCH 1/2] use ttl cache Signed-off-by: Ryan Leung --- pkg/schedule/checker/checker_controller.go | 13 ++++++------- pkg/schedule/checker/replica_checker.go | 4 ++-- pkg/schedule/checker/replica_checker_test.go | 20 ++++++++++---------- pkg/schedule/checker/rule_checker.go | 4 ++-- pkg/schedule/checker/rule_checker_test.go | 4 ++-- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index 4704996f7d9..da36c8db74f 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -68,7 +68,7 @@ type Controller struct { mergeChecker *MergeChecker jointStateChecker *JointStateChecker priorityInspector *PriorityInspector - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 suspectKeyRanges *cache.TTLString // suspect key-range regions that may need fix // duration is the duration of the last patrol round. @@ -88,7 +88,7 @@ type Controller struct { // NewController create a new Controller. func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller { - pendingProcessedRegions := cache.NewDefaultCache(DefaultPendingRegionCacheSize) + pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute) return &Controller{ ctx: ctx, cluster: cluster, @@ -327,16 +327,15 @@ func (c *Controller) GetRuleChecker() *RuleChecker { // GetPendingProcessedRegions returns the pending processed regions in the cache. func (c *Controller) GetPendingProcessedRegions() []uint64 { - pendingRegions := make([]uint64, 0) - for _, item := range c.pendingProcessedRegions.Elems() { - pendingRegions = append(pendingRegions, item.Key) - } - return pendingRegions + return c.pendingProcessedRegions.GetAllID() } // AddPendingProcessedRegions adds the pending processed region into the cache. func (c *Controller) AddPendingProcessedRegions(ids ...uint64) { for _, id := range ids { + if c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize { + return + } c.pendingProcessedRegions.Put(id, nil) } } diff --git a/pkg/schedule/checker/replica_checker.go b/pkg/schedule/checker/replica_checker.go index 6be5432125b..b0c42e88258 100644 --- a/pkg/schedule/checker/replica_checker.go +++ b/pkg/schedule/checker/replica_checker.go @@ -44,11 +44,11 @@ type ReplicaChecker struct { PauseController cluster sche.CheckerCluster conf config.CheckerConfigProvider - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 } // NewReplicaChecker creates a replica checker. -func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions cache.Cache) *ReplicaChecker { +func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, pendingProcessedRegions *cache.TTLUint64) *ReplicaChecker { return &ReplicaChecker{ cluster: cluster, conf: conf, diff --git a/pkg/schedule/checker/replica_checker_test.go b/pkg/schedule/checker/replica_checker_test.go index a9139ee9804..da04fb6d768 100644 --- a/pkg/schedule/checker/replica_checker_test.go +++ b/pkg/schedule/checker/replica_checker_test.go @@ -51,7 +51,7 @@ func (suite *replicaCheckerTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster = mockcluster.NewCluster(suite.ctx, cfg) suite.cluster.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewDefaultCache(10)) + suite.rc = NewReplicaChecker(suite.cluster, suite.cluster.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) stats := &pdpb.StoreStats{ Capacity: 100, Available: 100, @@ -213,7 +213,7 @@ func (suite *replicaCheckerTestSuite) TestBasic() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetMaxSnapshotCount(2) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) // Add stores 1,2,3,4. tc.AddRegionStore(1, 4) @@ -290,7 +290,7 @@ func (suite *replicaCheckerTestSuite) TestLostStore() { tc.AddRegionStore(1, 1) tc.AddRegionStore(2, 1) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) // now region peer in store 1,2,3.but we just have store 1,2 // This happens only in recovering the PD tc @@ -309,7 +309,7 @@ func (suite *replicaCheckerTestSuite) TestOffline() { tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(3, 3, map[string]string{"zone": "z3", "rack": "r1", "host": "h1"}) @@ -361,7 +361,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore() { tc.SetMaxReplicas(3) tc.SetLocationLabels([]string{"zone", "rack", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 9, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 8, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -441,7 +441,7 @@ func (suite *replicaCheckerTestSuite) TestDistinctScore2() { tc.SetMaxReplicas(5) tc.SetLocationLabels([]string{"zone", "host"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1", "host": "h1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1", "host": "h2"}) @@ -470,7 +470,7 @@ func (suite *replicaCheckerTestSuite) TestStorageThreshold() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetLocationLabels([]string{"zone"}) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.UpdateStorageRatio(1, 0.5, 0.5) @@ -506,7 +506,7 @@ func (suite *replicaCheckerTestSuite) TestOpts() { opt := mockconfig.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddRegionStore(1, 100) tc.AddRegionStore(2, 100) @@ -539,7 +539,7 @@ func (suite *replicaCheckerTestSuite) TestFixDownPeer() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetLocationLabels([]string{"zone"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) @@ -571,7 +571,7 @@ func (suite *replicaCheckerTestSuite) TestFixOfflinePeer() { tc := mockcluster.NewCluster(suite.ctx, opt) tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetLocationLabels([]string{"zone"}) - rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewDefaultCache(10)) + rc := NewReplicaChecker(tc, tc.GetCheckerConfig(), cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index a8acb002951..e29cd2bc05b 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -52,14 +52,14 @@ type RuleChecker struct { PauseController cluster sche.CheckerCluster ruleManager *placement.RuleManager - pendingProcessedRegions cache.Cache + pendingProcessedRegions *cache.TTLUint64 pendingList cache.Cache switchWitnessCache *cache.TTLUint64 record *recorder } // NewRuleChecker creates a checker instance. -func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions cache.Cache) *RuleChecker { +func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManager *placement.RuleManager, pendingProcessedRegions *cache.TTLUint64) *RuleChecker { return &RuleChecker{ cluster: cluster, ruleManager: ruleManager, diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index f99208a988b..b24a95e2ade 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -62,7 +62,7 @@ func (suite *ruleCheckerTestSuite) SetupTest() { suite.cluster.SetEnableWitness(true) suite.cluster.SetEnableUseJointConsensus(false) suite.ruleManager = suite.cluster.RuleManager - suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) } func (suite *ruleCheckerTestSuite) TearDownTest() { @@ -1955,7 +1955,7 @@ func (suite *ruleCheckerTestAdvancedSuite) SetupTest() { suite.cluster.SetEnableWitness(true) suite.cluster.SetEnableUseJointConsensus(true) suite.ruleManager = suite.cluster.RuleManager - suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewDefaultCache(10)) + suite.rc = NewRuleChecker(suite.ctx, suite.cluster, suite.ruleManager, cache.NewIDTTL(suite.ctx, time.Minute, 3*time.Minute)) } func (suite *ruleCheckerTestAdvancedSuite) TearDownTest() { From 779f7bc9e668250ec9d9bca40199dcd2ae588c41 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 8 Aug 2024 12:09:12 +0800 Subject: [PATCH 2/2] address the comment Signed-off-by: Ryan Leung --- pkg/mcs/scheduling/server/grpc_service.go | 2 +- pkg/mock/mockcluster/mockcluster.go | 2 +- pkg/schedule/checker/checker_controller.go | 8 ++++---- pkg/schedule/handler/handler.go | 4 ++-- pkg/schedule/scatter/region_scatterer.go | 6 +++--- pkg/schedule/splitter/region_splitter.go | 6 +++--- server/cluster/cluster_worker.go | 2 +- server/cluster/scheduling_controller.go | 4 ++-- 8 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go index d068aa5c058..7eb2554f7f2 100644 --- a/pkg/mcs/scheduling/server/grpc_service.go +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -324,7 +324,7 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc // If region splits during the scheduling process, regions with abnormal // status may be left, and these regions need to be checked with higher // priority. - c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(recordRegions...) + c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(false, recordRegions...) return &schedulingpb.AskBatchSplitResponse{ Header: s.header(), diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index cb1efeb544b..bbd4fbb6811 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -845,7 +845,7 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) { } // AddPendingProcessedRegions mock method -func (mc *Cluster) AddPendingProcessedRegions(ids ...uint64) { +func (mc *Cluster) AddPendingProcessedRegions(_ bool, ids ...uint64) { for _, id := range ids { mc.pendingProcessedRegions[id] = struct{}{} } diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index da36c8db74f..200ab388e30 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -311,7 +311,7 @@ func (c *Controller) tryAddOperators(region *core.RegionInfo) { c.opController.AddWaitingOperator(ops...) c.RemovePendingProcessedRegion(id) } else { - c.AddPendingProcessedRegions(id) + c.AddPendingProcessedRegions(true, id) } } @@ -331,9 +331,9 @@ func (c *Controller) GetPendingProcessedRegions() []uint64 { } // AddPendingProcessedRegions adds the pending processed region into the cache. -func (c *Controller) AddPendingProcessedRegions(ids ...uint64) { +func (c *Controller) AddPendingProcessedRegions(needCheckLen bool, ids ...uint64) { for _, id := range ids { - if c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize { + if needCheckLen && c.pendingProcessedRegions.Len() > DefaultPendingRegionCacheSize { return } c.pendingProcessedRegions.Put(id, nil) @@ -384,7 +384,7 @@ func (c *Controller) CheckSuspectRanges() { if lastRegion.GetEndKey() != nil && bytes.Compare(lastRegion.GetEndKey(), keyRange[1]) < 0 { c.AddSuspectKeyRange(lastRegion.GetEndKey(), keyRange[1]) } - c.AddPendingProcessedRegions(regionIDList...) + c.AddPendingProcessedRegions(false, regionIDList...) } } } diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 9f9de274278..748a17b87ef 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1124,7 +1124,7 @@ func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string for _, region := range regions { regionsIDList = append(regionsIDList, region.GetID()) } - co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...) + co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...) } return nil } @@ -1151,7 +1151,7 @@ func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys for _, region := range regions { regionsIDList = append(regionsIDList, region.GetID()) } - co.GetCheckerController().AddPendingProcessedRegions(regionsIDList...) + co.GetCheckerController().AddPendingProcessedRegions(false, regionsIDList...) } return nil } diff --git a/pkg/schedule/scatter/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go index 100b9eb764d..efef5439fed 100644 --- a/pkg/schedule/scatter/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -125,12 +125,12 @@ type RegionScatterer struct { ordinaryEngine engineContext specialEngines sync.Map opController *operator.Controller - addSuspectRegions func(regionIDs ...uint64) + addSuspectRegions func(bool, ...uint64) } // NewRegionScatterer creates a region scatterer. // RegionScatter is used for the `Lightning`, it will scatter the specified regions before import data. -func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(regionIDs ...uint64)) *RegionScatterer { +func NewRegionScatterer(ctx context.Context, cluster sche.SharedCluster, opController *operator.Controller, addSuspectRegions func(bool, ...uint64)) *RegionScatterer { return &RegionScatterer{ ctx: ctx, name: regionScatterName, @@ -275,7 +275,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa // in a group level instead of cluster level. func (r *RegionScatterer) Scatter(region *core.RegionInfo, group string, skipStoreLimit bool) (*operator.Operator, error) { if !filter.IsRegionReplicated(r.cluster, region) { - r.addSuspectRegions(region.GetID()) + r.addSuspectRegions(false, region.GetID()) scatterSkipNotReplicatedCounter.Inc() log.Warn("region not replicated during scatter", zap.Uint64("region-id", region.GetID())) return nil, errors.Errorf("region %d is not fully replicated", region.GetID()) diff --git a/pkg/schedule/splitter/region_splitter.go b/pkg/schedule/splitter/region_splitter.go index 124ad935655..37b33dad480 100644 --- a/pkg/schedule/splitter/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -58,11 +58,11 @@ func NewSplitRegionsHandler(cluster sche.ClusterInformer, oc *operator.Controlle type RegionSplitter struct { cluster sche.ClusterInformer handler SplitRegionsHandler - addSuspectRegions func(ids ...uint64) + addSuspectRegions func(bool, ...uint64) } // NewRegionSplitter return a region splitter -func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(ids ...uint64)) *RegionSplitter { +func NewRegionSplitter(cluster sche.ClusterInformer, handler SplitRegionsHandler, addSuspectRegions func(bool, ...uint64)) *RegionSplitter { return &RegionSplitter{ cluster: cluster, handler: handler, @@ -173,7 +173,7 @@ func (r *RegionSplitter) groupKeysByRegion(keys [][]byte) map[uint64]*regionGrou func (r *RegionSplitter) checkRegionValid(region *core.RegionInfo) bool { if !filter.IsRegionReplicated(r.cluster, region) { - r.addSuspectRegions(region.GetID()) + r.addSuspectRegions(false, region.GetID()) return false } if region.GetLeader() == nil { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 2d9bb411995..941282c4aca 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -165,7 +165,7 @@ func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (* // If region splits during the scheduling process, regions with abnormal // status may be left, and these regions need to be checked with higher // priority. - c.AddPendingProcessedRegions(recordRegions...) + c.AddPendingProcessedRegions(false, recordRegions...) resp := &pdpb.AskBatchSplitResponse{Ids: splitIDs} diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index bd515669670..b4c29ceed46 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -404,10 +404,10 @@ func (sc *schedulingController) PauseOrResumeChecker(name string, t int64) error } // AddPendingProcessedRegions adds regions to suspect list. -func (sc *schedulingController) AddPendingProcessedRegions(regionIDs ...uint64) { +func (sc *schedulingController) AddPendingProcessedRegions(needCheckLen bool, regionIDs ...uint64) { sc.mu.RLock() defer sc.mu.RUnlock() - sc.coordinator.GetCheckerController().AddPendingProcessedRegions(regionIDs...) + sc.coordinator.GetCheckerController().AddPendingProcessedRegions(needCheckLen, regionIDs...) } // GetPendingProcessedRegions gets all suspect regions.