Skip to content

Commit

Permalink
Merge branch 'split_buckets' of github.com:bufferflies/pd into split_…
Browse files Browse the repository at this point in the history
…buckets
  • Loading branch information
bufferflies committed Jul 31, 2023
2 parents 3a2fe26 + e63aa66 commit 3105068
Show file tree
Hide file tree
Showing 67 changed files with 1,322 additions and 1,164 deletions.
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ func WithExcludeTombstone() GetStoreOption {

// RegionsOp represents available options when operate regions
type RegionsOp struct {
group string
retryLimit uint64
group string
retryLimit uint64
skipStoreLimit bool
}

// RegionsOption configures RegionsOp
Expand All @@ -191,6 +192,11 @@ func WithRetry(retry uint64) RegionsOption {
return func(op *RegionsOp) { op.retryLimit = retry }
}

// WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions
func WithSkipStoreLimit() RegionsOption {
return func(op *RegionsOp) { op.skipStoreLimit = true }
}

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
Expand Down Expand Up @@ -1393,10 +1399,11 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
}
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.ScatterRegionRequest{
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
Header: c.requestHeader(),
Group: options.group,
RegionsId: regionsID,
RetryLimit: options.retryLimit,
SkipStoreLimit: options.skipStoreLimit,
}

ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333 h1:A6Wqgq0uMw51UiRAH27TVN0QlzVR5CVtV6fTQSAmvKM=
github.com/pingcap/kvproto v0.0.0-20230720094213-a3b4a77b4333/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
49 changes: 25 additions & 24 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,131 +17,132 @@ package mockcluster
import (
"time"

sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
)

// SetMaxMergeRegionSize updates the MaxMergeRegionSize configuration.
func (mc *Cluster) SetMaxMergeRegionSize(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionSize = uint64(v) })
}

// SetMaxMergeRegionKeys updates the MaxMergeRegionKeys configuration.
func (mc *Cluster) SetMaxMergeRegionKeys(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxMergeRegionKeys = uint64(v) })
}

// SetSplitMergeInterval updates the SplitMergeInterval configuration.
func (mc *Cluster) SetSplitMergeInterval(v time.Duration) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.SplitMergeInterval = typeutil.NewDuration(v) })
}

// SetEnableOneWayMerge updates the EnableOneWayMerge configuration.
func (mc *Cluster) SetEnableOneWayMerge(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableOneWayMerge = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableOneWayMerge = v })
}

// SetMaxSnapshotCount updates the MaxSnapshotCount configuration.
func (mc *Cluster) SetMaxSnapshotCount(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MaxSnapshotCount = uint64(v) })
}

// SetEnableMakeUpReplica updates the EnableMakeUpReplica configuration.
func (mc *Cluster) SetEnableMakeUpReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableMakeUpReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableMakeUpReplica = v })
}

// SetEnableRemoveExtraReplica updates the EnableRemoveExtraReplica configuration.
func (mc *Cluster) SetEnableRemoveExtraReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveExtraReplica = v })
}

// SetEnableLocationReplacement updates the EnableLocationReplacement configuration.
func (mc *Cluster) SetEnableLocationReplacement(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableLocationReplacement = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableLocationReplacement = v })
}

// SetEnableRemoveDownReplica updates the EnableRemoveDownReplica configuration.
func (mc *Cluster) SetEnableRemoveDownReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableRemoveDownReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableRemoveDownReplica = v })
}

// SetEnableReplaceOfflineReplica updates the EnableReplaceOfflineReplica configuration.
func (mc *Cluster) SetEnableReplaceOfflineReplica(v bool) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.EnableReplaceOfflineReplica = v })
}

// SetLeaderSchedulePolicy updates the LeaderSchedulePolicy configuration.
func (mc *Cluster) SetLeaderSchedulePolicy(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderSchedulePolicy = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderSchedulePolicy = v })
}

// SetTolerantSizeRatio updates the TolerantSizeRatio configuration.
func (mc *Cluster) SetTolerantSizeRatio(v float64) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.TolerantSizeRatio = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.TolerantSizeRatio = v })
}

// SetRegionScoreFormulaVersion updates the RegionScoreFormulaVersion configuration.
func (mc *Cluster) SetRegionScoreFormulaVersion(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
}

// SetLeaderScheduleLimit updates the LeaderScheduleLimit configuration.
func (mc *Cluster) SetLeaderScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
}

// SetRegionScheduleLimit updates the RegionScheduleLimit configuration.
func (mc *Cluster) SetRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.RegionScheduleLimit = uint64(v) })
}

// SetMergeScheduleLimit updates the MergeScheduleLimit configuration.
func (mc *Cluster) SetMergeScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.MergeScheduleLimit = uint64(v) })
}

// SetHotRegionScheduleLimit updates the HotRegionScheduleLimit configuration.
func (mc *Cluster) SetHotRegionScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionScheduleLimit = uint64(v) })
}

// SetHotRegionCacheHitsThreshold updates the HotRegionCacheHitsThreshold configuration.
func (mc *Cluster) SetHotRegionCacheHitsThreshold(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
mc.updateScheduleConfig(func(s *sc.ScheduleConfig) { s.HotRegionCacheHitsThreshold = uint64(v) })
}

// SetEnablePlacementRules updates the EnablePlacementRules configuration.
func (mc *Cluster) SetEnablePlacementRules(v bool) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.EnablePlacementRules = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.EnablePlacementRules = v })
if v {
mc.initRuleManager()
}
}

// SetMaxReplicas updates the maxReplicas configuration.
func (mc *Cluster) SetMaxReplicas(v int) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.MaxReplicas = uint64(v) })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.MaxReplicas = uint64(v) })
}

// SetLocationLabels updates the LocationLabels configuration.
func (mc *Cluster) SetLocationLabels(v []string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.LocationLabels = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.LocationLabels = v })
}

// SetIsolationLevel updates the IsolationLevel configuration.
func (mc *Cluster) SetIsolationLevel(v string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.IsolationLevel = v })
mc.updateReplicationConfig(func(r *sc.ReplicationConfig) { r.IsolationLevel = v })
}

func (mc *Cluster) updateScheduleConfig(f func(*config.ScheduleConfig)) {
func (mc *Cluster) updateScheduleConfig(f func(*sc.ScheduleConfig)) {
s := mc.GetScheduleConfig().Clone()
f(s)
mc.SetScheduleConfig(s)
}

func (mc *Cluster) updateReplicationConfig(f func(*config.ReplicationConfig)) {
func (mc *Cluster) updateReplicationConfig(f func(*sc.ReplicationConfig)) {
r := mc.GetReplicationConfig().Clone()
f(r)
mc.SetReplicationConfig(r)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,22 +84,22 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
}

// GetStoreConfig returns the store config.
func (mc *Cluster) GetStoreConfig() sc.StoreConfig {
func (mc *Cluster) GetStoreConfig() sc.StoreConfigProvider {
return mc.StoreConfigManager.GetStoreConfig()
}

// GetCheckerConfig returns the checker config.
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfig {
func (mc *Cluster) GetCheckerConfig() sc.CheckerConfigProvider {
return mc
}

// GetSchedulerConfig returns the scheduler config.
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfig {
func (mc *Cluster) GetSchedulerConfig() sc.SchedulerConfigProvider {
return mc
}

// GetSharedConfig returns the shared config.
func (mc *Cluster) GetSharedConfig() sc.SharedConfig {
func (mc *Cluster) GetSharedConfig() sc.SharedConfigProvider {
return mc
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockconfig/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// NewTestOptions creates default options for testing.
func NewTestOptions() *config.PersistOptions {
// register default schedulers in case config check fail.
for _, d := range config.DefaultSchedulers {
for _, d := range sc.DefaultSchedulers {
sc.RegisterScheduler(d.Type)
}
c := config.NewConfig()
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("
// Controller is used to manage all checkers.
type Controller struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
Expand All @@ -53,7 +53,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ var (
type MergeChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfig) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ const defaultPriorityQueueSize = 1280
// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfig) *PriorityInspector {
func NewPriorityInspector(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ var (
type ReplicaChecker struct {
PauseController
cluster sche.CheckerCluster
conf config.CheckerConfig
conf config.CheckerConfigProvider
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfig, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
Loading

0 comments on commit 3105068

Please sign in to comment.