Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Additional metrics (upstream-services, replicated-services, invalid-s…
Browse files Browse the repository at this point in the history
…ervices, upstream-endpoints, replicated-endpoints, invalid-endpoints)

Signed-off-by: Steve Sloka <steves@heptio.com>
  • Loading branch information
stevesloka committed Jun 7, 2018
1 parent cc9ce58 commit faa5221
Show file tree
Hide file tree
Showing 18 changed files with 575 additions and 186 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.DS_Store
.DS_Store
.vscode/
5 changes: 4 additions & 1 deletion discovery/cmd/kubernetes-discoverer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ func main() {
log.Info("Gimbal Kubernetes Discoverer Starting up...")

// Init prometheus metrics
discovererMetrics = localmetrics.NewMetrics()
discovererMetrics = localmetrics.NewMetrics("kubernetes", backendName)
discovererMetrics.RegisterPrometheus()

// Log info metric
discovererMetrics.DiscovererInfoMetric(buildinfo.Version)

if debug {
log.Level = logrus.DebugLevel
}
Expand Down
5 changes: 4 additions & 1 deletion discovery/cmd/openstack-discoverer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ func main() {
log.Info("Gimbal OpenStack Discoverer Starting up...")

// Init prometheus metrics
discovererMetrics = localmetrics.NewMetrics()
discovererMetrics = localmetrics.NewMetrics("openstack", backendName)
discovererMetrics.RegisterPrometheus()

// Log info metric
discovererMetrics.DiscovererInfoMetric(buildinfo.Version)

// Validate cluster name
if util.IsInvalidBackendName(backendName) {
log.Fatalf("The Kubernetes cluster name must be provided using the `--backend-name` flag or the one passed is invalid")
Expand Down
38 changes: 37 additions & 1 deletion discovery/pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
localmetrics "github.com/heptio/gimbal/discovery/pkg/metrics"
"github.com/heptio/gimbal/discovery/pkg/sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

Expand All @@ -40,6 +42,9 @@ type Controller struct {
syncqueue sync.Queue
servicesSynced cache.InformerSynced
endpointsSynced cache.InformerSynced
serviceLister listers.ServiceLister
endpointsLister listers.EndpointsLister
metrics localmetrics.DiscovererMetrics

backendName string
}
Expand All @@ -54,10 +59,13 @@ func NewController(log *logrus.Logger, gimbalKubeClient kubernetes.Interface, ku

c := &Controller{
Logger: log,
syncqueue: sync.NewQueue(log, backendName, clusterType, gimbalKubeClient, threadiness, metrics),
syncqueue: sync.NewQueue(log, gimbalKubeClient, threadiness, metrics),
servicesSynced: serviceInformer.Informer().HasSynced,
endpointsSynced: endpointsInformer.Informer().HasSynced,
backendName: backendName,
serviceLister: serviceInformer.Lister(),
endpointsLister: endpointsInformer.Lister(),
metrics: metrics,
}

// Set up an event handler for when Service resources change.
Expand Down Expand Up @@ -93,41 +101,47 @@ func (c *Controller) addService(service *v1.Service) {
if !skipProcessing(service.GetName(), service.GetNamespace()) {
svc := translateService(service, c.backendName)
c.syncqueue.Enqueue(sync.AddServiceAction(svc))
c.writeServiceMetrics(service)
}
}

func (c *Controller) updateService(service *v1.Service) {
if !skipProcessing(service.GetName(), service.GetNamespace()) {
svc := translateService(service, c.backendName)
c.syncqueue.Enqueue(sync.UpdateServiceAction(svc))
c.writeServiceMetrics(service)
}
}

func (c *Controller) deleteService(service *v1.Service) {
if !skipProcessing(service.GetName(), service.GetNamespace()) {
svc := translateService(service, c.backendName)
c.syncqueue.Enqueue(sync.DeleteServiceAction(svc))
c.writeServiceMetrics(service)
}
}

func (c *Controller) addEndpoints(endpoints *v1.Endpoints) {
if !skipProcessing(endpoints.GetName(), endpoints.GetNamespace()) {
svc := translateEndpoints(endpoints, c.backendName)
c.syncqueue.Enqueue(sync.AddEndpointsAction(svc))
c.writeEndpointsMetrics(endpoints)
}
}

func (c *Controller) updateEndpoints(endpoints *v1.Endpoints) {
if !skipProcessing(endpoints.GetName(), endpoints.GetNamespace()) {
svc := translateEndpoints(endpoints, c.backendName)
c.syncqueue.Enqueue(sync.UpdateEndpointsAction(svc))
c.writeEndpointsMetrics(endpoints)
}
}

func (c *Controller) deleteEndpoints(endpoints *v1.Endpoints) {
if !skipProcessing(endpoints.GetName(), endpoints.GetNamespace()) {
svc := translateEndpoints(endpoints, c.backendName)
c.syncqueue.Enqueue(sync.DeleteEndpointsAction(svc))
c.writeEndpointsMetrics(endpoints)
}
}

Expand All @@ -139,6 +153,28 @@ func skipProcessing(name, namespace string) bool {
return false
}

func (c *Controller) writeServiceMetrics(svc *v1.Service) error {
upstreamServices, err := c.serviceLister.Services(svc.GetNamespace()).List(labels.Everything())
if err != nil {
return err
}
c.metrics.DiscovererUpstreamServicesMetric(svc.GetNamespace(), len(upstreamServices))
return nil
}

func (c *Controller) writeEndpointsMetrics(ep *v1.Endpoints) error {
upstreamEndpoints, err := c.endpointsLister.Endpoints(ep.GetNamespace()).Get(ep.GetName())
if err != nil {
return err
}
total := 0
for _, sub := range upstreamEndpoints.Subsets {
total += len(sub.Addresses)
}
c.metrics.DiscovererUpstreamEndpointsMetric(ep.GetNamespace(), ep.GetName(), total)
return nil
}

// Run gets the party started
func (c *Controller) Run(stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
Expand Down
89 changes: 23 additions & 66 deletions discovery/pkg/k8s/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/heptio/gimbal/discovery/pkg/sync"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
)

Expand Down Expand Up @@ -164,17 +166,7 @@ var endpointTests = []struct {
func TestAddServiceQueue(t *testing.T) {
for _, tc := range serviceTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.addService(tc.service)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
Expand All @@ -185,17 +177,7 @@ func TestAddServiceQueue(t *testing.T) {
func TestUpdateServiceQueue(t *testing.T) {
for _, tc := range serviceTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.updateService(tc.service)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
Expand All @@ -206,17 +188,7 @@ func TestUpdateServiceQueue(t *testing.T) {
func TestDeleteServiceQueue(t *testing.T) {
for _, tc := range serviceTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.deleteService(tc.service)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
Expand All @@ -227,17 +199,7 @@ func TestDeleteServiceQueue(t *testing.T) {
func TestAddEndpointsQueue(t *testing.T) {
for _, tc := range endpointTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.addEndpoints(tc.endpoint)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
Expand All @@ -248,17 +210,7 @@ func TestAddEndpointsQueue(t *testing.T) {
func TestUpdateEndpointsQueue(t *testing.T) {
for _, tc := range endpointTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.updateEndpoints(tc.endpoint)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
Expand All @@ -269,21 +221,26 @@ func TestUpdateEndpointsQueue(t *testing.T) {
func TestDeleteEndpointsQueue(t *testing.T) {
for _, tc := range endpointTests {
t.Run(tc.name, func(t *testing.T) {
c := &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
BackendName: "test",
},
backendName: "test",
}

c := getDefaultController()
c.deleteEndpoints(tc.endpoint)
time.Sleep(100 * time.Millisecond) // Give queue time to process (huh?)
got := c.syncqueue.Workqueue.Len()
assert.Equal(t, tc.expected, got)
})
}
}

func getDefaultController() *Controller {
client := fake.NewSimpleClientset()
informer := kubeinformers.NewSharedInformerFactory(client, time.Second*0)
return &Controller{
Logger: logrus.New(),
syncqueue: sync.Queue{
Logger: logrus.New(),
Workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "syncqueue"),
Threadiness: 1,
},
serviceLister: informer.Core().V1().Services().Lister(),
endpointsLister: informer.Core().V1().Endpoints().Lister(),
}
}
Loading

0 comments on commit faa5221

Please sign in to comment.