Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Add queue size prometheus metric #49

Merged
merged 2 commits into from
Apr 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion discovery/pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
const (
kubesystemNamespace = "kube-system"
kubesystemService = "kubernetes"
clusterType = "kubernetes"
)

// Controller receives notifications from the Kubernetes API and translates those
Expand All @@ -53,7 +54,7 @@ func NewController(log *logrus.Logger, gimbalKubeClient kubernetes.Interface, ku

c := &Controller{
Logger: log,
syncqueue: sync.NewQueue(log, clusterName, gimbalKubeClient, threadiness, metrics),
syncqueue: sync.NewQueue(log, clusterName, clusterType, gimbalKubeClient, threadiness, metrics),
servicesSynced: serviceInformer.Informer().HasSynced,
endpointsSynced: endpointsInformer.Informer().HasSynced,
clusterName: clusterName,
Expand Down
9 changes: 8 additions & 1 deletion discovery/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func NewMetrics() DiscovererMetrics {
func (d *DiscovererMetrics) RegisterPrometheus() {
// Register with Prometheus's default registry
for _, v := range d.metrics {

prometheus.MustRegister(v)
}
}
Expand Down Expand Up @@ -154,3 +153,11 @@ func (d *DiscovererMetrics) EndpointsEventTimestampMetric(namespace, clusterName
m.WithLabelValues(namespace, clusterName, name).Set(float64(timestamp))
}
}

// QueueSizeGaugeMetric records the queue size prometheus metric
func (d *DiscovererMetrics) QueueSizeGaugeMetric(namespace, clusterName, clusterType string, size int) {
m, ok := d.metrics[QueueSizeGauge].(*prometheus.GaugeVec)
if ok {
m.WithLabelValues(namespace, clusterName, clusterType).Set(float64(size))
}
}
4 changes: 3 additions & 1 deletion discovery/pkg/openstack/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"k8s.io/client-go/kubernetes"
)

const clusterType = "openstack"

type ProjectLister interface {
ListProjects() ([]projects.Project, error)
}
Expand Down Expand Up @@ -69,7 +71,7 @@ func NewReconciler(clusterName string, gimbalKubeClient kubernetes.Interface, sy
ProjectLister: projectLister,
Logger: log,
Metrics: metrics,
syncqueue: sync.NewQueue(log, clusterName, gimbalKubeClient, queueWorkers, metrics),
syncqueue: sync.NewQueue(log, clusterName, clusterType, gimbalKubeClient, queueWorkers, metrics),
}
}

Expand Down
5 changes: 5 additions & 0 deletions discovery/pkg/sync/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type endpointsAction struct {
endpoints *v1.Endpoints
}

// ObjectMeta returns the objectMeta piece of the Action interface object
func (action endpointsAction) ObjectMeta() *metav1.ObjectMeta {
return &action.endpoints.ObjectMeta
}

// Sync performs the action on the given Endpoints resource
func (action endpointsAction) Sync(kubeClient kubernetes.Interface, metrics localmetrics.DiscovererMetrics, clusterName string) error {

Expand Down
7 changes: 6 additions & 1 deletion discovery/pkg/sync/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

localmetrics "github.com/heptio/gimbal/discovery/pkg/metrics"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand All @@ -40,10 +41,11 @@ type Queue struct {
Threadiness int
Metrics localmetrics.DiscovererMetrics
ClusterName string
ClusterType string
}

// NewQueue returns an initialized sync.Queue for syncing resources with a Gimbal cluster.
func NewQueue(logger *logrus.Logger, clusterName string, kubeClient kubernetes.Interface,
func NewQueue(logger *logrus.Logger, clusterName, clusterType string, kubeClient kubernetes.Interface,
threadiness int, metrics localmetrics.DiscovererMetrics) Queue {
return Queue{
KubeClient: kubeClient,
Expand All @@ -52,17 +54,20 @@ func NewQueue(logger *logrus.Logger, clusterName string, kubeClient kubernetes.I
Threadiness: threadiness,
Metrics: metrics,
ClusterName: clusterName,
ClusterType: clusterType,
}
}

// Action that is added to the queue for processing
type Action interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this should be called "Resource" or something... Not this PRs concern though

Sync(kube kubernetes.Interface, lm localmetrics.DiscovererMetrics, clusterName string) error
ObjectMeta() *metav1.ObjectMeta
}

// Enqueue adds a new resource action to the worker queue
func (sq *Queue) Enqueue(action Action) {
sq.Workqueue.AddRateLimited(action)
sq.Metrics.QueueSizeGaugeMetric(action.ObjectMeta().GetNamespace(), sq.ClusterName, sq.ClusterType, sq.Workqueue.Len())
}

// Run starts the queue workers. It blocks until the stopCh is closed.
Expand Down
5 changes: 5 additions & 0 deletions discovery/pkg/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ type serviceAction struct {
service *v1.Service
}

// ObjectMeta returns the objectMeta piece of the Action interface object
func (action serviceAction) ObjectMeta() *metav1.ObjectMeta {
return &action.service.ObjectMeta
}

// Sync performs the action on the given service
func (action serviceAction) Sync(kubeClient kubernetes.Interface, metrics localmetrics.DiscovererMetrics, clusterName string) error {

Expand Down