diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 90d1cfa4a09..dd11b9a3e37 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -35,7 +35,9 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/plan" + "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/logutil" @@ -78,8 +80,8 @@ type Coordinator struct { cluster sche.ClusterInformer prepareChecker *prepareChecker checkers *checker.Controller - regionScatterer *RegionScatterer - regionSplitter *RegionSplitter + regionScatterer *scatter.RegionScatterer + regionSplitter *splitter.RegionSplitter schedulers map[string]*scheduleController opController *operator.Controller hbStreams *hbstream.HeartbeatStreams @@ -98,8 +100,8 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams cluster: cluster, prepareChecker: newPrepareChecker(), checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController), - regionScatterer: NewRegionScatterer(ctx, cluster, opController), - regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)), + regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController), + regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController)), schedulers: schedulers, opController: opController, hbStreams: hbStreams, @@ -897,12 +899,12 @@ func (c *Coordinator) IsCheckerPaused(name string) (bool, error) { } // GetRegionScatterer returns the region scatterer. -func (c *Coordinator) GetRegionScatterer() *RegionScatterer { +func (c *Coordinator) GetRegionScatterer() *scatter.RegionScatterer { return c.regionScatterer } // GetRegionSplitter returns the region splitter. -func (c *Coordinator) GetRegionSplitter() *RegionSplitter { +func (c *Coordinator) GetRegionSplitter() *splitter.RegionSplitter { return c.regionSplitter } diff --git a/pkg/schedule/metrics.go b/pkg/schedule/metrics.go index 9f45554ccac..dd85156fb09 100644 --- a/pkg/schedule/metrics.go +++ b/pkg/schedule/metrics.go @@ -17,22 +17,6 @@ package schedule import "github.com/prometheus/client_golang/prometheus" var ( - scatterCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "scatter_operators_count", - Help: "Counter of region scatter operators.", - }, []string{"type", "event"}) - - scatterDistributionCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "pd", - Subsystem: "schedule", - Name: "scatter_distribution", - Help: "Counter of the distribution in scatter.", - }, []string{"store", "is_leader", "engine"}) - hotSpotStatusGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", @@ -67,8 +51,6 @@ var ( ) func init() { - prometheus.MustRegister(scatterCounter) - prometheus.MustRegister(scatterDistributionCounter) prometheus.MustRegister(schedulerStatusGauge) prometheus.MustRegister(hotSpotStatusGauge) prometheus.MustRegister(regionListGauge) diff --git a/pkg/schedule/scatter/metrics.go b/pkg/schedule/scatter/metrics.go new file mode 100644 index 00000000000..5fe41550764 --- /dev/null +++ b/pkg/schedule/scatter/metrics.go @@ -0,0 +1,40 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scatter + +import "github.com/prometheus/client_golang/prometheus" + +var ( + scatterCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "scatter_operators_count", + Help: "Counter of region scatter operators.", + }, []string{"type", "event"}) + + scatterDistributionCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "scatter_distribution", + Help: "Counter of the distribution in scatter.", + }, []string{"store", "is_leader", "engine"}) +) + +func init() { + prometheus.MustRegister(scatterCounter) + prometheus.MustRegister(scatterDistributionCounter) +} diff --git a/pkg/schedule/region_scatterer.go b/pkg/schedule/scatter/region_scatterer.go similarity index 99% rename from pkg/schedule/region_scatterer.go rename to pkg/schedule/scatter/region_scatterer.go index ed6acb3f478..54fc291b363 100644 --- a/pkg/schedule/region_scatterer.go +++ b/pkg/schedule/scatter/region_scatterer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package scatter import ( "context" @@ -54,6 +54,12 @@ var ( scatterSuccessCounter = scatterCounter.WithLabelValues("success", "") ) +const ( + maxSleepDuration = time.Minute + initialSleepDuration = 100 * time.Millisecond + maxRetryLimit = 30 +) + type selectedStores struct { mu syncutil.RWMutex groupDistribution *cache.TTLString // value type: map[uint64]uint64, group -> StoreID -> count @@ -171,10 +177,6 @@ func newEngineContext(ctx context.Context, filterFuncs ...filterFunc) engineCont } } -const maxSleepDuration = time.Minute -const initialSleepDuration = 100 * time.Millisecond -const maxRetryLimit = 30 - // ScatterRegionsByRange directly scatter regions by ScatterRegions func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group string, retryLimit int) (int, map[uint64]error, error) { regions := r.cluster.ScanRegions(startKey, endKey, -1) diff --git a/pkg/schedule/region_scatterer_test.go b/pkg/schedule/scatter/region_scatterer_test.go similarity index 99% rename from pkg/schedule/region_scatterer_test.go rename to pkg/schedule/scatter/region_scatterer_test.go index ec2ab77fbe3..519630f4276 100644 --- a/pkg/schedule/region_scatterer_test.go +++ b/pkg/schedule/scatter/region_scatterer_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package scatter import ( "context" @@ -439,9 +439,9 @@ func TestScatterForManyRegion(t *testing.T) { } failures := map[uint64]error{} group := "group" - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`)) scatterer.scatterRegions(regions, failures, group, 3) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain")) re.Len(failures, 0) } @@ -470,7 +470,7 @@ func TestScattersGroup(t *testing.T) { failure: false, }, } - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`)) for id, testCase := range testCases { group := fmt.Sprintf("gourp-%d", id) t.Log(testCase.name) @@ -481,7 +481,7 @@ func TestScattersGroup(t *testing.T) { } failures := map[uint64]error{} if testCase.failure { - re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterFail", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail", `return(true)`)) } scatterer.scatterRegions(regions, failures, group, 3) @@ -505,12 +505,12 @@ func TestScattersGroup(t *testing.T) { re.Len(failures, 1) _, ok := failures[1] re.True(ok) - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterFail")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail")) } else { re.Empty(failures) } } - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain")) } func TestSelectedStoreGC(t *testing.T) { diff --git a/pkg/schedule/region_splitter.go b/pkg/schedule/splitter/region_splitter.go similarity index 97% rename from pkg/schedule/region_splitter.go rename to pkg/schedule/splitter/region_splitter.go index 9f8dcb6db76..a8ea58ee12c 100644 --- a/pkg/schedule/region_splitter.go +++ b/pkg/schedule/splitter/region_splitter.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package splitter import ( "bytes" @@ -34,8 +34,10 @@ import ( ) const ( - watchInterval = 100 * time.Millisecond - timeout = time.Minute + watchInterval = 100 * time.Millisecond + timeout = time.Minute + maxSleepDuration = time.Minute + initialSleepDuration = 100 * time.Millisecond ) // SplitRegionsHandler used to handle region splitting diff --git a/pkg/schedule/region_splitter_test.go b/pkg/schedule/splitter/region_splitter_test.go similarity index 99% rename from pkg/schedule/region_splitter_test.go rename to pkg/schedule/splitter/region_splitter_test.go index fad3576e221..f293446e6cd 100644 --- a/pkg/schedule/region_splitter_test.go +++ b/pkg/schedule/splitter/region_splitter_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package schedule +package splitter import ( "bytes" diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 42e169b93ed..4ac6a13fd70 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -47,7 +47,9 @@ import ( "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/schedule/scatter" "github.com/tikv/pd/pkg/schedule/schedulers" + "github.com/tikv/pd/pkg/schedule/splitter" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/statistics/buckets" @@ -658,12 +660,12 @@ func (c *RaftCluster) SetPrepared() { } // GetRegionScatter returns the region scatter. -func (c *RaftCluster) GetRegionScatter() *schedule.RegionScatterer { +func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer { return c.coordinator.GetRegionScatterer() } // GetRegionSplitter returns the region splitter -func (c *RaftCluster) GetRegionSplitter() *schedule.RegionSplitter { +func (c *RaftCluster) GetRegionSplitter() *splitter.RegionSplitter { return c.coordinator.GetRegionSplitter() }