Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metrics for scheduler #747

Merged
merged 1 commit into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/scheduler/app/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -47,7 +48,7 @@ func NewSchedulerCommand(stopChan <-chan struct{}) *cobra.Command {

func run(opts *options.Options, stopChan <-chan struct{}) error {
klog.Infof("karmada-scheduler version: %s", version.Get())
go serveHealthz(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))
go serveHealthzAndMetrics(fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort))

restConfig, err := clientcmd.BuildConfigFromFlags(opts.Master, opts.KubeConfig)
if err != nil {
Expand Down Expand Up @@ -111,11 +112,13 @@ func run(opts *options.Options, stopChan <-chan struct{}) error {
return nil
}

func serveHealthz(address string) {
func serveHealthzAndMetrics(address string) {
http.HandleFunc("/healthz", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})

http.Handle("/metrics", promhttp.Handler())

klog.Fatal(http.ListenAndServe(address, nil))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/kr/pretty v0.3.0
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.14.0
github.com/prometheus/client_golang v1.11.0
github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
golang.org/x/tools v0.1.2
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"sort"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
Expand Down Expand Up @@ -92,6 +94,8 @@ func (g *genericScheduler) findClustersThatFit(
placement *policyv1alpha1.Placement,
resource *workv1alpha2.ObjectReference,
clusterInfo *cache.Snapshot) ([]*clusterv1alpha1.Cluster, error) {
defer metrics.ScheduleStep(metrics.ScheduleStepFilter, time.Now())

var out []*clusterv1alpha1.Cluster
clusters := clusterInfo.GetReadyClusters()
for _, c := range clusters {
Expand All @@ -113,6 +117,8 @@ func (g *genericScheduler) prioritizeClusters(
fwk framework.Framework,
placement *policyv1alpha1.Placement,
clusters []*clusterv1alpha1.Cluster) (result framework.ClusterScoreList, err error) {
defer metrics.ScheduleStep(metrics.ScheduleStepScore, time.Now())

scoresMap, err := fwk.RunScorePlugins(ctx, placement, clusters)
if err != nil {
return result, err
Expand All @@ -130,6 +136,8 @@ func (g *genericScheduler) prioritizeClusters(
}

func (g *genericScheduler) selectClusters(clustersScore framework.ClusterScoreList, spreadConstraints []policyv1alpha1.SpreadConstraint, clusters []*clusterv1alpha1.Cluster) []*clusterv1alpha1.Cluster {
defer metrics.ScheduleStep(metrics.ScheduleStepSelect, time.Now())

if len(spreadConstraints) != 0 {
return g.matchSpreadConstraints(clusters, spreadConstraints)
}
Expand Down Expand Up @@ -200,6 +208,7 @@ func (g *genericScheduler) chooseSpreadGroup(spreadGroup *util.SpreadGroup) []*c
}

func (g *genericScheduler) assignReplicas(clusters []*clusterv1alpha1.Cluster, replicaSchedulingStrategy *policyv1alpha1.ReplicaSchedulingStrategy, object *workv1alpha2.ResourceBindingSpec) ([]workv1alpha2.TargetCluster, error) {
defer metrics.ScheduleStep(metrics.ScheduleStepAssignReplicas, time.Now())
if len(clusters) == 0 {
return nil, fmt.Errorf("no clusters available to schedule")
}
Expand Down
73 changes: 73 additions & 0 deletions pkg/scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package metrics

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

// SchedulerSubsystem - subsystem name used by scheduler
const SchedulerSubsystem = "karmada_scheduler"

var (
scheduleAttempts = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "schedule_attempts_total",
Help: "Number of attempts to schedule resourceBinding",
}, []string{"result", "scheduleType"})

e2eSchedulingLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "e2e_scheduling_duration_seconds",
Help: "E2e scheduling latency in seconds",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"result", "scheduleType"})

schedulingAlgorithmLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: SchedulerSubsystem,
Name: "scheduling_algorithm_duration_seconds",
Help: "Scheduling algorithm latency in seconds(exclude scale scheduler)",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"scheduleStep"})

schedulerQueueIncomingBindings = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "queue_incoming_bindings_total",
Help: "Number of bindings added to scheduling queues by event type.",
}, []string{"event"})

metricsList = []prometheus.Collector{
scheduleAttempts,
e2eSchedulingLatency,
schedulingAlgorithmLatency,
schedulerQueueIncomingBindings,
}
)

var registerMetrics sync.Once

// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
RegisterMetrics(metricsList...)
})
}

// RegisterMetrics registers a list of metrics.
// This function is exported because it is intended to be used by out-of-tree plugins to register their custom metrics.
func RegisterMetrics(extraMetrics ...prometheus.Collector) {
for _, metric := range extraMetrics {
prometheus.MustRegister(metric)
}
}

// SinceInSeconds gets the time since the specified start in seconds.
func SinceInSeconds(start time.Time) float64 {
return time.Since(start).Seconds()
}
57 changes: 57 additions & 0 deletions pkg/scheduler/metrics/metrics_record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package metrics

import "time"

const (
scheduledResult = "scheduled"
errorResult = "error"
)

const (
// BindingAdd is the event when a new binding is added to API server.
BindingAdd = "BindingAdd"
// BindingUpdate is the event when a new binding is updated to API server.
BindingUpdate = "BindingUpdate"
// ScheduleAttemptFailure is the event when a schedule attempt fails.
ScheduleAttemptFailure = "ScheduleAttemptFailure"
// PolicyChanged means binding needs to be rescheduled for the policy changed
PolicyChanged = "PolicyChanged"
// ClusterNotReady means binding needs to be rescheduled for cluster is not ready
ClusterNotReady = "ClusterNotReady"
)

const (
// ScheduleStepFilter means the step in generic scheduler to filter clusters
ScheduleStepFilter = "Filter"
// ScheduleStepScore means the step in generic scheduler to score clusters
ScheduleStepScore = "Score"
// ScheduleStepSelect means the step in generic scheduler to select clusters
ScheduleStepSelect = "Select"
// ScheduleStepAssignReplicas means the step in generic scheduler to assign replicas
ScheduleStepAssignReplicas = "AssignReplicas"
)

// BindingSchedule can record a scheduling attempt and the duration
// since `start`.
func BindingSchedule(scheduleType string, duration float64, err error) {
if err != nil {
observeScheduleAttemptAndLatency(errorResult, scheduleType, duration)
} else {
observeScheduleAttemptAndLatency(scheduledResult, scheduleType, duration)
}
}

func observeScheduleAttemptAndLatency(result, scheduleType string, duration float64) {
e2eSchedulingLatency.WithLabelValues(result, scheduleType).Observe(duration)
scheduleAttempts.WithLabelValues(result, scheduleType).Inc()
}

// ScheduleStep can record each scheduling step duration.
func ScheduleStep(action string, startTime time.Time) {
RainbowMango marked this conversation as resolved.
Show resolved Hide resolved
schedulingAlgorithmLatency.WithLabelValues(action).Observe(SinceInSeconds(startTime))
}

// CountSchedulerBindings records the number of binding added to scheduling queues by event type.
func CountSchedulerBindings(event string) {
schedulerQueueIncomingBindings.WithLabelValues(event).Inc()
}
24 changes: 23 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/apiinstalled"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/clusteraffinity"
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
"github.com/karmada-io/karmada/pkg/util"
)

Expand Down Expand Up @@ -135,6 +136,8 @@ func NewScheduler(dynamicClient dynamic.Interface, karmadaClient karmadaclientse
estimatorclient.RegisterSchedulerEstimator(schedulerEstimator)
}

metrics.Register()

bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sched.onResourceBindingAdd,
UpdateFunc: sched.onResourceBindingUpdate,
Expand Down Expand Up @@ -195,10 +198,18 @@ func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
}

s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.BindingAdd)
}

func (s *Scheduler) onResourceBindingUpdate(old, cur interface{}) {
s.onResourceBindingAdd(cur)
key, err := cache.MetaNamespaceKeyFunc(cur)
if err != nil {
klog.Errorf("couldn't get key for object %#v: %v", cur, err)
return
}

s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.BindingUpdate)
}

func (s *Scheduler) onPropagationPolicyUpdate(old, cur interface{}) {
Expand Down Expand Up @@ -265,6 +276,7 @@ func (s *Scheduler) requeueResourceBindings(selector labels.Selector) error {
}
klog.Infof("Requeue ResourceBinding(%s/%s) as placement changed.", binding.Namespace, binding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
}
return nil
}
Expand All @@ -285,6 +297,7 @@ func (s *Scheduler) requeueClusterResourceBindings(selector labels.Selector) err
}
klog.Infof("Requeue ClusterResourceBinding(%s) as placement changed.", clusterResourceBinding.Name)
s.queue.Add(key)
metrics.CountSchedulerBindings(metrics.PolicyChanged)
}
return nil
}
Expand Down Expand Up @@ -373,21 +386,27 @@ func (s *Scheduler) scheduleNext() bool {
}
defer s.queue.Done(key)

start := time.Now()

var err error
switch s.getScheduleType(key.(string)) {
case FirstSchedule:
err = s.scheduleOne(key.(string))
klog.Infof("Start scheduling binding(%s)", key.(string))
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
case ReconcileSchedule: // share same logic with first schedule
err = s.scheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as placement changed", key.(string))
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
case ScaleSchedule:
err = s.scaleScheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string))
metrics.BindingSchedule(string(ScaleSchedule), metrics.SinceInSeconds(start), err)
case FailoverSchedule:
if Failover {
err = s.rescheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as cluster failure", key.(string))
metrics.BindingSchedule(string(FailoverSchedule), metrics.SinceInSeconds(start), err)
}
case AvoidSchedule:
klog.Infof("Don't need to schedule binding(%s)", key.(string))
Expand Down Expand Up @@ -496,6 +515,7 @@ func (s *Scheduler) handleErr(err error, key interface{}) {
}

s.queue.AddRateLimited(key)
metrics.CountSchedulerBindings(metrics.ScheduleAttemptFailure)
}

func (s *Scheduler) addCluster(obj interface{}) {
Expand Down Expand Up @@ -572,6 +592,7 @@ func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
return
}
s.queue.Add(rescheduleKey)
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
klog.Infof("Add expired ResourceBinding in queue successfully")
}
}
Expand All @@ -592,6 +613,7 @@ func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
return
}
s.queue.Add(rescheduleKey)
metrics.CountSchedulerBindings(metrics.ClusterNotReady)
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
}
}
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ github.com/pelletier/go-toml
# github.com/pkg/errors v0.9.1
github.com/pkg/errors
# github.com/prometheus/client_golang v1.11.0
## explicit
github.com/prometheus/client_golang/prometheus
github.com/prometheus/client_golang/prometheus/collectors
github.com/prometheus/client_golang/prometheus/internal
Expand Down