Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: introduce the cluster informer for decoupling the dependencies #6489

Merged
merged 3 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -71,7 +71,7 @@ type Manager struct {
// store is the storage for keyspace related information.
store endpoint.KeyspaceStorage
// rc is the raft cluster of the server.
cluster schedule.Cluster
cluster core.ClusterInformer
// ctx is the context of the manager, to be used in transaction.
ctx context.Context
// config is the configurations of the manager.
Expand All @@ -98,7 +98,7 @@ type CreateKeyspaceRequest struct {
func NewKeyspaceManager(
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster schedule.Cluster,
cluster core.ClusterInformer,
idAllocator id.Allocator,
config Config,
kgm *GroupManager,
Expand Down
11 changes: 9 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
ctx context.Context
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
Expand All @@ -57,20 +58,21 @@ type Cluster struct {
suspectRegions map[uint64]struct{}
*config.StoreConfigManager
*buckets.HotBucketCache
ctx context.Context
storage.Storage
}

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
StoreConfigManager: config.NewTestStoreConfigManager(nil),
ctx: ctx,
Storage: storage.NewStorageWithMemoryBackend(),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand All @@ -96,6 +98,11 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -37,7 +38,7 @@ var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues(

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
opController *schedule.OperatorController
learnerChecker *LearnerChecker
Expand All @@ -53,7 +54,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster schedule.Cluster, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
)

// JointStateChecker ensures region is in joint state will leave.
type JointStateChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
}

const jointStateCheckerName = "joint_state_checker"
Expand All @@ -41,7 +41,7 @@ var (
)

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {
func NewJointStateChecker(cluster sche.ClusterInformer) *JointStateChecker {
return &JointStateChecker{
cluster: cluster,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
)

// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
}

var (
Expand All @@ -34,7 +34,7 @@ var (
)

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
func NewLearnerChecker(cluster sche.ClusterInformer) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -76,14 +76,14 @@ var (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster schedule.Cluster, conf config.Config) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.ClusterInformer, conf config.Config) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand Down Expand Up @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIDs()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
)

Expand All @@ -29,13 +29,13 @@ const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster schedule.Cluster, conf config.Config) *PriorityInspector {
func NewPriorityInspector(cluster sche.ClusterInformer, conf config.Config) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -61,13 +61,13 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.ClusterInformer, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"go.uber.org/zap"
)
Expand All @@ -27,7 +27,7 @@ import (
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster schedule.Cluster
cluster sche.ClusterInformer
locationLabels []string
isolationLevel string
region *core.RegionInfo
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -81,7 +81,7 @@ var (
// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
Expand All @@ -91,7 +91,7 @@ type RuleChecker struct {
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster sche.ClusterInformer, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down Expand Up @@ -572,7 +572,7 @@ func (o *recorder) incOfflineLeaderCount(storeID uint64) {
// Offline is triggered manually and only appears when the node makes some adjustments. here is an operator timeout / 2.
var offlineCounterTTL = 5 * time.Minute

func (o *recorder) refresh(cluster schedule.Cluster) {
func (o *recorder) refresh(cluster sche.ClusterInformer) {
// re-count the offlineLeaderCounter if the store is already tombstone or store is gone.
if len(o.offlineLeaderCounter) > 0 && time.Since(o.lastUpdateTime) > offlineCounterTTL {
needClean := false
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/split_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -28,7 +28,7 @@ import (
// SplitChecker splits regions when the key range spans across rule/label boundary.
type SplitChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
ruleManager *placement.RuleManager
labeler *labeler.RegionLabeler
}
Expand All @@ -42,7 +42,7 @@ var (
)

// NewSplitChecker creates a new SplitChecker.
func NewSplitChecker(cluster schedule.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
func NewSplitChecker(cluster sche.ClusterInformer, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
return &SplitChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
34 changes: 24 additions & 10 deletions pkg/schedule/cluster.go → pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package core

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
core.RegionSetInformer
core.StoreSetInformer
core.StoreSetController

// ClusterInformer provides the necessary information of a cluster.
type ClusterInformer interface {
RegionHealthCluster
statistics.RegionStatInformer
statistics.StoreStatInformer
buckets.BucketStatInformer

operator.ClusterInformer

GetBasicCluster() *core.BasicCluster
GetStoreConfig() config.StoreConfig
GetAllocator() id.Allocator
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64)
RecordOpStepWithTTL(regionID uint64)
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
type RegionHealthCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer

GetOpts() config.Config
GetRuleManager() *placement.RuleManager
}
Loading