Skip to content

Commit

Permalink
*: introduce the cluster informer for decoupling the dependencies (#6489
Browse files Browse the repository at this point in the history
)

ref #5839, ref #6418

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored May 22, 2023
1 parent b2275f6 commit 4725351
Show file tree
Hide file tree
Showing 45 changed files with 268 additions and 241 deletions.
6 changes: 3 additions & 3 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -71,7 +71,7 @@ type Manager struct {
// store is the storage for keyspace related information.
store endpoint.KeyspaceStorage
// rc is the raft cluster of the server.
cluster schedule.Cluster
cluster core.ClusterInformer
// ctx is the context of the manager, to be used in transaction.
ctx context.Context
// config is the configurations of the manager.
Expand All @@ -98,7 +98,7 @@ type CreateKeyspaceRequest struct {
func NewKeyspaceManager(
ctx context.Context,
store endpoint.KeyspaceStorage,
cluster schedule.Cluster,
cluster core.ClusterInformer,
idAllocator id.Allocator,
config Config,
kgm *GroupManager,
Expand Down
11 changes: 9 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
ctx context.Context
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
Expand All @@ -57,20 +58,21 @@ type Cluster struct {
suspectRegions map[uint64]struct{}
*config.StoreConfigManager
*buckets.HotBucketCache
ctx context.Context
storage.Storage
}

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
StoreConfigManager: config.NewTestStoreConfigManager(nil),
ctx: ctx,
Storage: storage.NewStorageWithMemoryBackend(),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand All @@ -96,6 +98,11 @@ func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down
5 changes: 3 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -37,7 +38,7 @@ var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues(

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
opController *schedule.OperatorController
learnerChecker *LearnerChecker
Expand All @@ -53,7 +54,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster schedule.Cluster, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
)

// JointStateChecker ensures region is in joint state will leave.
type JointStateChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
}

const jointStateCheckerName = "joint_state_checker"
Expand All @@ -41,7 +41,7 @@ var (
)

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {
func NewJointStateChecker(cluster sche.ClusterInformer) *JointStateChecker {
return &JointStateChecker{
cluster: cluster,
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
)

// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
}

var (
Expand All @@ -34,7 +34,7 @@ var (
)

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
func NewLearnerChecker(cluster sche.ClusterInformer) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
Expand Down Expand Up @@ -76,14 +76,14 @@ var (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster schedule.Cluster, conf config.Config) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster sche.ClusterInformer, conf config.Config) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand Down Expand Up @@ -306,7 +306,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster sche.ClusterInformer, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIDs()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/placement"
)

Expand All @@ -29,13 +29,13 @@ const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster schedule.Cluster, conf config.Config) *PriorityInspector {
func NewPriorityInspector(cluster sche.ClusterInformer, conf config.Config) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
conf: conf,
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -61,13 +61,13 @@ var (
// Location management, mainly used for cross data center deployment.
type ReplicaChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
conf config.Config
regionWaitingList cache.Cache
}

// NewReplicaChecker creates a replica checker.
func NewReplicaChecker(cluster schedule.Cluster, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
func NewReplicaChecker(cluster sche.ClusterInformer, conf config.Config, regionWaitingList cache.Cache) *ReplicaChecker {
return &ReplicaChecker{
cluster: cluster,
conf: conf,
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"go.uber.org/zap"
)
Expand All @@ -27,7 +27,7 @@ import (
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
checkerName string // replica-checker / rule-checker
cluster schedule.Cluster
cluster sche.ClusterInformer
locationLabels []string
isolationLevel string
region *core.RegionInfo
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -81,7 +81,7 @@ var (
// RuleChecker fix/improve region by placement rules.
type RuleChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
ruleManager *placement.RuleManager
name string
regionWaitingList cache.Cache
Expand All @@ -91,7 +91,7 @@ type RuleChecker struct {
}

// NewRuleChecker creates a checker instance.
func NewRuleChecker(ctx context.Context, cluster schedule.Cluster, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
func NewRuleChecker(ctx context.Context, cluster sche.ClusterInformer, ruleManager *placement.RuleManager, regionWaitingList cache.Cache) *RuleChecker {
return &RuleChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down Expand Up @@ -572,7 +572,7 @@ func (o *recorder) incOfflineLeaderCount(storeID uint64) {
// Offline is triggered manually and only appears when the node makes some adjustments. here is an operator timeout / 2.
var offlineCounterTTL = 5 * time.Minute

func (o *recorder) refresh(cluster schedule.Cluster) {
func (o *recorder) refresh(cluster sche.ClusterInformer) {
// re-count the offlineLeaderCounter if the store is already tombstone or store is gone.
if len(o.offlineLeaderCounter) > 0 && time.Since(o.lastUpdateTime) > offlineCounterTTL {
needClean := false
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/checker/split_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -28,7 +28,7 @@ import (
// SplitChecker splits regions when the key range spans across rule/label boundary.
type SplitChecker struct {
PauseController
cluster schedule.Cluster
cluster sche.ClusterInformer
ruleManager *placement.RuleManager
labeler *labeler.RegionLabeler
}
Expand All @@ -42,7 +42,7 @@ var (
)

// NewSplitChecker creates a new SplitChecker.
func NewSplitChecker(cluster schedule.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
func NewSplitChecker(cluster sche.ClusterInformer, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler) *SplitChecker {
return &SplitChecker{
cluster: cluster,
ruleManager: ruleManager,
Expand Down
34 changes: 24 additions & 10 deletions pkg/schedule/cluster.go → pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package core

import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
core.RegionSetInformer
core.StoreSetInformer
core.StoreSetController

// ClusterInformer provides the necessary information of a cluster.
type ClusterInformer interface {
RegionHealthCluster
statistics.RegionStatInformer
statistics.StoreStatInformer
buckets.BucketStatInformer

operator.ClusterInformer

GetBasicCluster() *core.BasicCluster
GetStoreConfig() config.StoreConfig
GetAllocator() id.Allocator
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
RemoveScheduler(name string) error
AddSuspectRegions(ids ...uint64)
SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64)
RecordOpStepWithTTL(regionID uint64)
}

// RegionHealthCluster is an aggregate interface that wraps multiple interfaces
type RegionHealthCluster interface {
core.StoreSetInformer
core.StoreSetController
core.RegionSetInformer

GetOpts() config.Config
GetRuleManager() *placement.RuleManager
}
Loading

0 comments on commit 4725351

Please sign in to comment.