Skip to content

Commit

Permalink
*: put the operator related logic together (#6490)
Browse files Browse the repository at this point in the history
ref #5839, ref #6418, ref #6489

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 23, 2023
1 parent 4725351 commit a14f38f
Show file tree
Hide file tree
Showing 47 changed files with 450 additions and 453 deletions.
4 changes: 2 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues(
type Controller struct {
cluster sche.ClusterInformer
conf config.Config
opController *schedule.OperatorController
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
ruleChecker *RuleChecker
Expand All @@ -54,7 +54,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
func NewController(ctx context.Context, cluster sche.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
Expand Down Expand Up @@ -464,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {

mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := schedule.NewOperatorController(suite.ctx, tc, stream)
oc := operator.NewController(suite.ctx, tc, stream)

regions[2] = regions[2].Clone(
core.SetPeers([]*metapb.Peer{
Expand Down
58 changes: 0 additions & 58 deletions pkg/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,6 @@ package schedule
import "github.com/prometheus/client_golang/prometheus"

var (
// TODO: pre-allocate gauge metrics
operatorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operators_count",
Help: "Counter of schedule operators.",
}, []string{"type", "event"})

operatorDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "finish_operators_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished operator.",
Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600},
}, []string{"type"})

operatorSizeHist = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operator_region_size",
Help: "Bucketed histogram of the operator region size.",
Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1MB~1TB
}, []string{"type"})

operatorWaitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operators_waiting_count",
Help: "Counter of schedule waiting operators.",
}, []string{"type", "event"})

operatorWaitDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "waiting_operators_duration_seconds",
Help: "Bucketed histogram of waiting time (s) of operator for being promoted.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
}, []string{"type"})

storeLimitCostCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "store_limit_cost",
Help: "limit rate cost of store.",
}, []string{"store", "limit_type"})

scatterCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -96,13 +44,7 @@ var (
)

func init() {
prometheus.MustRegister(operatorCounter)
prometheus.MustRegister(operatorDuration)
prometheus.MustRegister(operatorWaitDuration)
prometheus.MustRegister(storeLimitCostCounter)
prometheus.MustRegister(operatorWaitCounter)
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(LabelerEventCounter)
}
58 changes: 58 additions & 0 deletions pkg/schedule/operator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ var (
Help: "Counter of operator meeting limit",
}, []string{"type", "name"})

storeLimitCostCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "store_limit_cost",
Help: "limit rate cost of store.",
}, []string{"store", "limit_type"})

// OperatorExceededStoreLimitCounter exposes the counter when operator meet exceeded store limit.
OperatorExceededStoreLimitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -43,10 +51,60 @@ var (
Name: "operator_exceeded_store_limit",
Help: "Counter of operator meeting store limit",
}, []string{"desc"})

// TODO: pre-allocate gauge metrics
operatorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operators_count",
Help: "Counter of schedule operators.",
}, []string{"type", "event"})

operatorDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "finish_operators_duration_seconds",
Help: "Bucketed histogram of processing time (s) of finished operator.",
Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600},
}, []string{"type"})

operatorSizeHist = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operator_region_size",
Help: "Bucketed histogram of the operator region size.",
Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1MB~1TB
}, []string{"type"})

operatorWaitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operators_waiting_count",
Help: "Counter of schedule waiting operators.",
}, []string{"type", "event"})

operatorWaitDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "waiting_operators_duration_seconds",
Help: "Bucketed histogram of waiting time (s) of operator for being promoted.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 16),
}, []string{"type"})
)

func init() {
prometheus.MustRegister(operatorStepDuration)
prometheus.MustRegister(OperatorLimitCounter)
prometheus.MustRegister(OperatorExceededStoreLimitCounter)
prometheus.MustRegister(operatorCounter)
prometheus.MustRegister(operatorDuration)
prometheus.MustRegister(operatorWaitDuration)
prometheus.MustRegister(operatorWaitCounter)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(storeLimitCostCounter)
}
Loading

0 comments on commit a14f38f

Please sign in to comment.