Skip to content

Commit

Permalink
use individual package for splitter and scatter
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jun 2, 2023
1 parent 672f4dc commit eba0a98
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 42 deletions.
14 changes: 8 additions & 6 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -78,8 +80,8 @@ type Coordinator struct {
cluster sche.ClusterInformer
prepareChecker *prepareChecker
checkers *checker.Controller
regionScatterer *RegionScatterer
regionSplitter *RegionSplitter
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
schedulers map[string]*scheduleController
opController *operator.Controller
hbStreams *hbstream.HeartbeatStreams
Expand All @@ -98,8 +100,8 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
cluster: cluster,
prepareChecker: newPrepareChecker(),
checkers: checker.NewController(ctx, cluster, cluster.GetOpts(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController),
regionScatterer: NewRegionScatterer(ctx, cluster, opController),
regionSplitter: NewRegionSplitter(cluster, NewSplitRegionsHandler(cluster, opController)),
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController)),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
Expand Down Expand Up @@ -878,12 +880,12 @@ func (c *Coordinator) IsCheckerPaused(name string) (bool, error) {
}

// GetRegionScatterer returns the region scatterer.
func (c *Coordinator) GetRegionScatterer() *RegionScatterer {
func (c *Coordinator) GetRegionScatterer() *scatter.RegionScatterer {
return c.regionScatterer
}

// GetRegionSplitter returns the region splitter.
func (c *Coordinator) GetRegionSplitter() *RegionSplitter {
func (c *Coordinator) GetRegionSplitter() *splitter.RegionSplitter {
return c.regionSplitter
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@ package schedule
import "github.com/prometheus/client_golang/prometheus"

var (
scatterCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_operators_count",
Help: "Counter of region scatter operators.",
}, []string{"type", "event"})

scatterDistributionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})

hotSpotStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand Down Expand Up @@ -67,8 +51,6 @@ var (
)

func init() {
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(schedulerStatusGauge)
prometheus.MustRegister(hotSpotStatusGauge)
prometheus.MustRegister(regionListGauge)
Expand Down
26 changes: 26 additions & 0 deletions pkg/schedule/scatter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package scatter

import "github.com/prometheus/client_golang/prometheus"

var (
scatterCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_operators_count",
Help: "Counter of region scatter operators.",
}, []string{"type", "event"})

scatterDistributionCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})
)

func init() {
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package scatter

import (
"context"
Expand Down Expand Up @@ -54,6 +54,12 @@ var (
scatterSuccessCounter = scatterCounter.WithLabelValues("success", "")
)

const (
maxSleepDuration = time.Minute
initialSleepDuration = 100 * time.Millisecond
maxRetryLimit = 30
)

type selectedStores struct {
mu syncutil.RWMutex
groupDistribution *cache.TTLString // value type: map[uint64]uint64, group -> StoreID -> count
Expand Down Expand Up @@ -171,10 +177,6 @@ func newEngineContext(ctx context.Context, filterFuncs ...filterFunc) engineCont
}
}

const maxSleepDuration = time.Minute
const initialSleepDuration = 100 * time.Millisecond
const maxRetryLimit = 30

// ScatterRegionsByRange directly scatter regions by ScatterRegions
func (r *RegionScatterer) ScatterRegionsByRange(startKey, endKey []byte, group string, retryLimit int) (int, map[uint64]error, error) {
regions := r.cluster.ScanRegions(startKey, endKey, -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package scatter

import (
"context"
Expand Down Expand Up @@ -439,9 +439,9 @@ func TestScatterForManyRegion(t *testing.T) {
}
failures := map[uint64]error{}
group := "group"
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`))
scatterer.scatterRegions(regions, failures, group, 3)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain"))
re.Len(failures, 0)
}

Expand Down Expand Up @@ -470,7 +470,7 @@ func TestScattersGroup(t *testing.T) {
failure: false,
},
}
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain", `return(true)`))
for id, testCase := range testCases {
group := fmt.Sprintf("gourp-%d", id)
t.Log(testCase.name)
Expand All @@ -481,7 +481,7 @@ func TestScattersGroup(t *testing.T) {
}
failures := map[uint64]error{}
if testCase.failure {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatterFail", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail", `return(true)`))
}

scatterer.scatterRegions(regions, failures, group, 3)
Expand All @@ -505,12 +505,12 @@ func TestScattersGroup(t *testing.T) {
re.Len(failures, 1)
_, ok := failures[1]
re.True(ok)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterFail"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterFail"))
} else {
re.Empty(failures)
}
}
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatterHbStreamsDrain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/scatter/scatterHbStreamsDrain"))
}

func TestSelectedStoreGC(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package splitter

import (
"bytes"
Expand All @@ -34,8 +34,10 @@ import (
)

const (
watchInterval = 100 * time.Millisecond
timeout = time.Minute
watchInterval = 100 * time.Millisecond
timeout = time.Minute
maxSleepDuration = time.Minute
initialSleepDuration = 100 * time.Millisecond
)

// SplitRegionsHandler used to handle region splitting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package splitter

import (
"bytes"
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
Expand Down Expand Up @@ -656,12 +658,12 @@ func (c *RaftCluster) SetPrepared() {
}

// GetRegionScatter returns the region scatter.
func (c *RaftCluster) GetRegionScatter() *schedule.RegionScatterer {
func (c *RaftCluster) GetRegionScatter() *scatter.RegionScatterer {
return c.coordinator.GetRegionScatterer()
}

// GetRegionSplitter returns the region splitter
func (c *RaftCluster) GetRegionSplitter() *schedule.RegionSplitter {
func (c *RaftCluster) GetRegionSplitter() *splitter.RegionSplitter {
return c.coordinator.GetRegionSplitter()
}

Expand Down

0 comments on commit eba0a98

Please sign in to comment.