Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Add queue size prometheus metric
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Sloka <steves@heptio.com>
  • Loading branch information
stevesloka committed Apr 20, 2018
1 parent 091704c commit cf8f22f
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 4 deletions.
4 changes: 3 additions & 1 deletion discovery/cmd/kubernetes-discoverer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ var (
discovererMetrics localmetrics.DiscovererMetrics
)

const clusterType = "kubernetes"

func init() {
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.IntVar(&numProcessThreads, "num-threads", 2, "Specify number of threads to use when processing queue items.")
Expand Down Expand Up @@ -105,7 +107,7 @@ func main() {

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(k8sDiscovererClient, resyncInterval)

c := k8s.NewController(log, gimbalKubeClient, kubeInformerFactory, clusterName, numProcessThreads, discovererMetrics)
c := k8s.NewController(log, gimbalKubeClient, kubeInformerFactory, clusterName, clusterType, numProcessThreads, discovererMetrics)
if err != nil {
log.Fatal("Could not init Controller! ", err)
}
Expand Down
3 changes: 3 additions & 0 deletions discovery/cmd/openstack-discoverer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var (
discovererMetrics localmetrics.DiscovererMetrics
)

const clusterType = "openstack"

func init() {
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.StringVar(&gimbalKubeCfgFile, "gimbal-kubecfg-file", "", "Location of kubecfg file for access to gimbal system kubernetes api, defaults to service account tokens")
Expand Down Expand Up @@ -153,6 +155,7 @@ func main() {
log,
numProcessThreads,
discovererMetrics,
clusterType,
)
stopCh := signals.SetupSignalHandler()

Expand Down
3 changes: 2 additions & 1 deletion discovery/pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Controller struct {

// NewController returns a new NewController
func NewController(log *logrus.Logger, gimbalKubeClient kubernetes.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory,
clusterName string, threadiness int, metrics localmetrics.DiscovererMetrics) *Controller {
clusterName, clusterType string, threadiness int, metrics localmetrics.DiscovererMetrics) *Controller {

// obtain references to shared index informers for the services types.
serviceInformer := kubeInformerFactory.Core().V1().Services()
Expand All @@ -61,6 +61,7 @@ func NewController(log *logrus.Logger, gimbalKubeClient kubernetes.Interface, ku
Threadiness: threadiness,
Metrics: metrics,
ClusterName: clusterName,
ClusterType: clusterType,
},
servicesSynced: serviceInformer.Informer().HasSynced,
endpointsSynced: endpointsInformer.Informer().HasSynced,
Expand Down
9 changes: 8 additions & 1 deletion discovery/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func NewMetrics() DiscovererMetrics {
func (d *DiscovererMetrics) RegisterPrometheus() {
// Register with Prometheus's default registry
for _, v := range d.metrics {

prometheus.MustRegister(v)
}
}
Expand Down Expand Up @@ -154,3 +153,11 @@ func (d *DiscovererMetrics) EndpointsEventTimestampMetric(namespace, clusterName
m.WithLabelValues(namespace, clusterName, name).Set(float64(timestamp))
}
}

// QueueSizeGaugeMetric records the queue size prometheus metric
func (d *DiscovererMetrics) QueueSizeGaugeMetric(namespace, clusterName, clusterType string, size int) {
m, ok := d.metrics[QueueSizeGauge].(*prometheus.GaugeVec)
if ok {
m.WithLabelValues(namespace, clusterName, clusterType).Set(float64(size))
}
}
4 changes: 3 additions & 1 deletion discovery/pkg/openstack/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Reconciler struct {

// NewReconciler returns an OpenStack reconciler
func NewReconciler(clusterName string, gimbalKubeClient kubernetes.Interface, syncPeriod time.Duration, lbLister LoadBalancerLister,
projectLister ProjectLister, log *logrus.Logger, queueWorkers int, metrics localmetrics.DiscovererMetrics) Reconciler {
projectLister ProjectLister, log *logrus.Logger, queueWorkers int, metrics localmetrics.DiscovererMetrics, clusterType string) Reconciler {
return Reconciler{
ClusterName: clusterName,
GimbalKubeClient: gimbalKubeClient,
Expand All @@ -76,6 +76,8 @@ func NewReconciler(clusterName string, gimbalKubeClient kubernetes.Interface, sy
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: queueWorkers,
Metrics: metrics,
ClusterName: clusterName,
ClusterType: clusterType,
},
}
}
Expand Down
5 changes: 5 additions & 0 deletions discovery/pkg/sync/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type endpointsAction struct {
endpoints *v1.Endpoints
}

// ObjectMeta returns the objectMeta piece of the Action interface object
func (action endpointsAction) ObjectMeta() *metav1.ObjectMeta {
return &action.endpoints.ObjectMeta
}

// Sync performs the action on the given Endpoints resource
func (action endpointsAction) Sync(kubeClient kubernetes.Interface, metrics localmetrics.DiscovererMetrics, clusterName string) error {

Expand Down
4 changes: 4 additions & 0 deletions discovery/pkg/sync/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

localmetrics "github.com/heptio/gimbal/discovery/pkg/metrics"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand All @@ -40,16 +41,19 @@ type Queue struct {
Threadiness int
Metrics localmetrics.DiscovererMetrics
ClusterName string
ClusterType string
}

// Action that is added to the queue for processing
type Action interface {
Sync(kube kubernetes.Interface, lm localmetrics.DiscovererMetrics, clusterName string) error
ObjectMeta() *metav1.ObjectMeta
}

// Enqueue adds a new resource action to the worker queue
func (sq *Queue) Enqueue(action Action) {
sq.Workqueue.AddRateLimited(action)
sq.Metrics.QueueSizeGaugeMetric(action.ObjectMeta().GetNamespace(), sq.ClusterName, sq.ClusterType, sq.Workqueue.Len())
}

// Run starts the queue workers. It blocks until the stopCh is closed.
Expand Down
5 changes: 5 additions & 0 deletions discovery/pkg/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type serviceAction struct {
service *v1.Service
}

// ObjectMeta returns the objectMeta piece of the Action interface object
func (action serviceAction) ObjectMeta() *metav1.ObjectMeta {
return &action.service.ObjectMeta
}

// Sync performs the action on the given service
func (action serviceAction) Sync(kubeClient kubernetes.Interface, metrics localmetrics.DiscovererMetrics, clusterName string) error {

Expand Down

0 comments on commit cf8f22f

Please sign in to comment.