From eba0a984b8c82faf799f7f8e73953bf789315da5 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 2 Jun 2023 14:02:06 +0800 Subject: [PATCH] use individual package for splitter and scatter Signed-off-by: Ryan Leung --- pkg/schedule/coordinator.go | 14 +++++----- pkg/schedule/metrics.go | 18 ------------- pkg/schedule/scatter/metrics.go | 26 +++++++++++++++++++ .../{ => scatter}/region_scatterer.go | 12 +++++---- .../{ => scatter}/region_scatterer_test.go | 14 +++++----- .../{ => splitter}/region_splitter.go | 8 +++--- .../{ => splitter}/region_splitter_test.go | 2 +- server/cluster/cluster.go | 6 +++-- 8 files changed, 58 insertions(+), 42 deletions(-) create mode 100644 pkg/schedule/scatter/metrics.go rename pkg/schedule/{ => scatter}/region_scatterer.go (99%) rename pkg/schedule/{ => scatter}/region_scatterer_test.go (99%) rename pkg/schedule/{ => splitter}/region_splitter.go (97%) rename pkg/schedule/{ => splitter}/region_splitter_test.go (99%) diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index c1f36bdf7d89..c0a1d3a9004d 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -35,7 +35,9 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/logutil" @@ -78,8 +80,8 @@ type Coordinator struct { cluster sche.ClusterInformer prepareChecker *prepareChecker checkers *checker.Controller - regionScatterer *RegionScatterer - regionSplitter *RegionSplitter + regionScatterer *scatter.RegionScatterer + regionSplitter *splitter.RegionSplitter schedulers map[string]*scheduleController opController *operator.Controller hbStreams *hbstream.HeartbeatStreams @@ -98,8 +100,8 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams cluster: cluster, prepareChecker: newPrepareChecker(), checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController), - regionScatterer: NewRegionScatterer(ctx, cluster, opController), - regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)), + regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController), + regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController)), schedulers: schedulers, opController: opController, hbStreams: hbStreams, @@ -878,12 +880,12 @@ func (c *Coordinator) IsCheckerPaused(name string) (bool, error) { } // GetRegionScatterer returns the region scatterer. -func (c *Coordinator) GetRegionScatterer() *RegionScatterer { +func (c *Coordinator) GetRegionScatterer() *scatter.RegionScatterer { return c.regionScatterer } // GetRegionSplitter returns the region splitter. -func (c *Coordinator) GetRegionSplitter() *RegionSplitter { +func (c *Coordinator) GetRegionSplitter() *splitter.RegionSplitter { return c.regionSplitter } diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 9f45554ccac2..dd85156fb090 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -17,22 +17,6 @@ package schedule import "github.com/prometheus/client_golang/prometheus" var ( - scatterCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "scatter_operators_count", - Help: "Counter of region scatter operators.", - }, []string{"type", "event"}) - - scatterDistributionCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "scatter_distribution", - Help: "Counter of the distribution in scatter.", - }, []string{"store", "is_leader", "engine"}) - hotSpotStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -67,8 +51,6 @@ var ( ) func init() { - prometheus.MustRegister(scatterCounter) - prometheus.MustRegister(scatterDistributionCounter) prometheus.MustRegister(schedulerStatusGauge) prometheus.MustRegister(hotSpotStatusGauge) prometheus.MustRegister(regionListGauge) diff --git a/pkg/schedule/scatter/metrics.go b/pkg/schedule/scatter/metrics.go new file mode 100644 index 000000000000..68f2a952e4f2 --- /dev/null +++ b/pkg/schedule/scatter/metrics.go @@ -0,0 +1,26 @@ +package scatter + +import "github.com/prometheus/client_golang/prometheus" + +var ( + scatterCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "scatter_operators_count", + Help: "Counter of region scatter operators.", + }, []string{"type", "event"}) + + scatterDistributionCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "scatter_distribution", + Help: "Counter of the distribution in scatter.", + }, []string{"store", "is_leader", "engine"}) +) + +func init() { + prometheus.MustRegister(scatterCounter) + prometheus.MustRegister(scatterDistributionCounter) +} diff --git a/pkg/schedule/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go similarity index 99% rename from pkg/schedule/region_scatterer.go rename to pkg/schedule/scatter/region_scatterer.go index ed6acb3f478f..54fc291b363f 100644 --- a/pkg/schedule/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package scatter import ( "context" @@ -54,6 +54,12 @@ var ( scatterSuccessCounter = scatterCounter.WithLabelValues("success", "") ) +const ( + maxSleepDuration = time.Minute + initialSleepDuration = 100 * time.Millisecond + maxRetryLimit = 30 +) + type selectedStores struct { mu syncutil.RWMutex groupDistribution *cache.TTLString // value type: map[uint64]uint64, group -> StoreID -> count @@ -171,10 +177,6 @@ func newEngineContext(ctx context.Context, filterFuncs ...filterFunc) engineCont } } -const maxSleepDuration = time.Minute -const initialSleepDuration = 100 * time.Millisecond -const maxRetryLimit = 30 - // ScatterRegionsByRange directly scatter regions by ScatterRegions func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group string, retryLimit int) (int, map[uint64]error, error) { regions := r.cluster.ScanRegions(startKey, endKey, -1) diff --git a/pkg/schedule/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go similarity index 99% rename from pkg/schedule/region_scatterer_test.go rename to pkg/schedule/scatter/region_scatterer_test.go index ad0031f9cdf1..a895f5377dab 100644 --- a/pkg/schedule/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package scatter import ( "context" @@ -439,9 +439,9 @@ func TestScatterForManyRegion(t *testing.T) { } failures := map[uint64]error{} group := "group" - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`)) scatterer.scatterRegions(regions, failures, group, 3) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain")) re.Len(failures, 0) } @@ -470,7 +470,7 @@ func TestScattersGroup(t *testing.T) { failure: false, }, } - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`)) for id, testCase := range testCases { group := fmt.Sprintf("gourp-%d", id) t.Log(testCase.name) @@ -481,7 +481,7 @@ func TestScattersGroup(t *testing.T) { } failures := map[uint64]error{} if testCase.failure { - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterFail", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail", `return(true)`)) } scatterer.scatterRegions(regions, failures, group, 3) @@ -505,12 +505,12 @@ func TestScattersGroup(t *testing.T) { re.Len(failures, 1) _, ok := failures[1] re.True(ok) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterFail")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail")) } else { re.Empty(failures) } } - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain")) } func TestSelectedStoreGC(t *testing.T) { diff --git a/pkg/schedule/region_splitter.go b/pkg/schedule/splitter/region_splitter.go similarity index 97% rename from pkg/schedule/region_splitter.go rename to pkg/schedule/splitter/region_splitter.go index 6af588ee4c0e..835113933f94 100644 --- a/pkg/schedule/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package splitter import ( "bytes" @@ -34,8 +34,10 @@ import ( ) const ( - watchInterval = 100 * time.Millisecond - timeout = time.Minute + watchInterval = 100 * time.Millisecond + timeout = time.Minute + maxSleepDuration = time.Minute + initialSleepDuration = 100 * time.Millisecond ) // SplitRegionsHandler used to handle region splitting diff --git a/pkg/schedule/region_splitter_test.go b/pkg/schedule/splitter/region_splitter_test.go similarity index 99% rename from pkg/schedule/region_splitter_test.go rename to pkg/schedule/splitter/region_splitter_test.go index eb91421d3426..7173d4e2d369 100644 --- a/pkg/schedule/region_splitter_test.go +++ b/pkg/schedule/splitter/region_splitter_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package splitter import ( "bytes" diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ab66d4ba0fdd..b54aeefb1977 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -47,7 +47,9 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -656,12 +658,12 @@ func (c *RaftCluster) SetPrepared() { } // GetRegionScatter returns the region scatter. -func (c *RaftCluster) GetRegionScatter() *schedule.RegionScatterer { +func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer { return c.coordinator.GetRegionScatterer() } // GetRegionSplitter returns the region splitter -func (c *RaftCluster) GetRegionSplitter() *schedule.RegionSplitter { +func (c *RaftCluster) GetRegionSplitter() *splitter.RegionSplitter { return c.coordinator.GetRegionSplitter() }