Skip to content

Commit

Permalink
add metrics for scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: junqian <junqian@tencent.com>
  • Loading branch information
qianjun1993 committed Sep 28, 2021
1 parent fd89085 commit 72ef4cc
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 3 deletions.
7 changes: 5 additions & 2 deletions cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -47,7 +48,7 @@ func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command {

func run(opts *options.Options, stopChan <-chan struct{}) error {
klog.Infof("karmada-scheduler version: %s", version.Get())
go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))
go serveHealthzAndMetrics(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))

restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
Expand Down Expand Up @@ -111,11 +112,13 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
return nil
}

func serveHealthz(address string) {
func serveHealthzAndMetrics(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})

http.Handle("/metrics", promhttp.Handler())

klog.Fatal(http.ListenAndServe(address, nil))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/kr/pretty v0.3.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
github.com/prometheus/client_golang v1.11.0
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
golang.org/x/tools v0.1.2
Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"sort"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
Expand Down Expand Up @@ -92,6 +94,8 @@ func (g *genericScheduler) findClustersThatFit(
placement *policyv1alpha1.Placement,
resource *workv1alpha1.ObjectReference,
clusterInfo *cache.Snapshot) ([]*clusterv1alpha1.Cluster, error) {
defer metrics.ScheduleStep(metrics.ScheduleStepFilter, time.Now())

var out []*clusterv1alpha1.Cluster
clusters := clusterInfo.GetReadyClusters()
for _, c := range clusters {
Expand All @@ -113,6 +117,8 @@ func (g *genericScheduler) prioritizeClusters(
fwk framework.Framework,
placement *policyv1alpha1.Placement,
clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) {
defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now())

scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters)
if err != nil {
return result, err
Expand All @@ -130,6 +136,8 @@ func (g *genericScheduler) prioritizeClusters(
}

func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, spreadConstraints []policyv1alpha1.SpreadConstraint, clusters []*clusterv1alpha1.Cluster) []*clusterv1alpha1.Cluster {
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now())

if len(spreadConstraints) != 0 {
return g.matchSpreadConstraints(clusters, spreadConstraints)
}
Expand Down Expand Up @@ -200,6 +208,8 @@ func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*c
}

func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, replicaSchedulingStrategy *policyv1alpha1.ReplicaSchedulingStrategy, object *workv1alpha1.ResourceBindingSpec) ([]workv1alpha1.TargetCluster, error) {
defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, time.Now())

if len(clusters) == 0 {
return nil, fmt.Errorf("no clusters available to schedule")
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package metrics

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// SchedulerSubsystem - subsystem name used by scheduler
const SchedulerSubsystem = "karmada_scheduler"

var (
scheduleAttempts = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule resourceBinding",
}, []string{"result", "scheduleType"})

e2eSchedulingLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "e2e_scheduling_duration_seconds",
Help: "E2e scheduling latency in seconds",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"result", "scheduleType"})

schedulingAlgorithmLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "scheduling_algorithm_duration_seconds",
Help: "Scheduling algorithm latency in seconds(exclude scale scheduler)",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"scheduleStep"})

schedulerQueueIncomingBindings = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "queue_incoming_bindings_total",
Help: "Number of bindings added to scheduling queues by event type.",
}, []string{"event"})

metricsList = []prometheus.Collector{
scheduleAttempts,
e2eSchedulingLatency,
schedulingAlgorithmLatency,
schedulerQueueIncomingBindings,
}
)

var registerMetrics sync.Once

// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
RegisterMetrics(metricsList...)
})
}

// RegisterMetrics registers a list of metrics.
// This function is exported because it is intended to be used by out-of-tree plugins to register their custom metrics.
func RegisterMetrics(extraMetrics ...prometheus.Collector) {
for _, metric := range extraMetrics {
prometheus.MustRegister(metric)
}
}

// SinceInSeconds gets the time since the specified start in seconds.
func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
57 changes: 57 additions & 0 deletions pkg/scheduler/metrics/metrics_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package metrics

import "time"

const (
scheduledResult = "scheduled"
errorResult = "error"
)

const (
// BindingAdd is the event when a new binding is added to API server.
BindingAdd = "BindingAdd"
// BindingUpdate is the event when a new binding is updated to API server.
BindingUpdate = "BindingUpdate"
// ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// PolicyChanged means binding needs to be rescheduled for the policy changed
PolicyChanged = "PolicyChanged"
// ClusterNotReady means binding needs to be rescheduled for cluster is not ready
ClusterNotReady = "ClusterNotReady"
)

const (
// ScheduleStepFilter means the step in generic scheduler to filter clusters
ScheduleStepFilter = "Filter"
// ScheduleStepScore means the step in generic scheduler to score clusters
ScheduleStepScore = "Score"
// ScheduleStepSelect means the step in generic scheduler to select clusters
ScheduleStepSelect = "Select"
// ScheduleStepAssignReplicas means the step in generic scheduler to assign replicas
ScheduleStepAssignReplicas = "AssignReplicas"
)

// BindingSchedule can record a scheduling attempt and the duration
// since `start`.
func BindingSchedule(scheduleType string, duration float64, err error) {
if err != nil {
observeScheduleAttemptAndLatency(errorResult, scheduleType, duration)
} else {
observeScheduleAttemptAndLatency(scheduledResult, scheduleType, duration)
}
}

func observeScheduleAttemptAndLatency(result, scheduleType string, duration float64) {
e2eSchedulingLatency.WithLabelValues(result, scheduleType).Observe(duration)
scheduleAttempts.WithLabelValues(result, scheduleType).Inc()
}

// ScheduleStep can record each scheduling step duration.
func ScheduleStep(action string, startTime time.Time) {
schedulingAlgorithmLatency.WithLabelValues(action).Observe(SinceInSeconds(startTime))
}

// CountSchedulerBindings records the number of binding added to scheduling queues by event type.
func CountSchedulerBindings(event string) {
schedulerQueueIncomingBindings.WithLabelValues(event).Inc()
}
24 changes: 23 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
)

Expand Down Expand Up @@ -135,6 +136,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
}

metrics.Register()

bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.onResourceBindingAdd,
UpdateFunc: sched.onResourceBindingUpdate,
Expand Down Expand Up @@ -195,10 +198,18 @@ func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
}

s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.BindingAdd)
}

func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
s.onResourceBindingAdd(cur)
key, err := cache.MetaNamespaceKeyFunc(cur)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", cur, err)
return
}

s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.BindingUpdate)
}

func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
Expand Down Expand Up @@ -265,6 +276,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
}
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
}
return nil
}
Expand All @@ -285,6 +297,7 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err
}
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
}
return nil
}
Expand Down Expand Up @@ -373,21 +386,27 @@ func (s *Scheduler) scheduleNext() bool {
}
defer s.queue.Done(key)

start := time.Now()

var err error
switch s.getScheduleType(key.(string)) {
case FirstSchedule:
err = s.scheduleOne(key.(string))
klog.Infof("Start scheduling binding(%s)", key.(string))
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
case ReconcileSchedule: // share same logic with first schedule
err = s.scheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as placement changed", key.(string))
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
case ScaleSchedule:
err = s.scaleScheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string))
metrics.BindingSchedule(string(ScaleSchedule), metrics.SinceInSeconds(start), err)
case FailoverSchedule:
if Failover {
err = s.rescheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as cluster failure", key.(string))
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
}
case AvoidSchedule:
klog.Infof("Don't need to schedule binding(%s)", key.(string))
Expand Down Expand Up @@ -496,6 +515,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
}

s.queue.AddRateLimited(key)
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}

func (s *Scheduler) addCluster(obj interface{}) {
Expand Down Expand Up @@ -572,6 +592,7 @@ func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
return
}
s.queue.Add(rescheduleKey)
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
klog.Infof("Add expired ResourceBinding in queue successfully")
}
}
Expand All @@ -592,6 +613,7 @@ func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
return
}
s.queue.Add(rescheduleKey)
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
}
}
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ github.com/pelletier/go-toml
# github.com/pkg/errors v0.9.1
github.com/pkg/errors
# github.com/prometheus/client_golang v1.11.0
## explicit
github.com/prometheus/client_golang/prometheus
github.com/prometheus/client_golang/prometheus/collectors
github.com/prometheus/client_golang/prometheus/internal
Expand Down

0 comments on commit 72ef4cc

Please sign in to comment.