From 0a39425e8f9c1c2acc1054b9c0789340d6bab433 Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Thu, 7 Mar 2019 21:20:34 -0300 Subject: [PATCH 1/6] Refactor status update --- internal/ingress/controller/nginx.go | 36 +++++-- internal/ingress/controller/status.go | 133 ++++++++++++++++++++++++++ internal/ingress/status/status.go | 133 ++++---------------------- 3 files changed, 180 insertions(+), 122 deletions(-) create mode 100644 internal/ingress/controller/status.go diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 234f776400..00f83c4952 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -49,7 +49,6 @@ import ( "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" - "k8s.io/ingress-nginx/internal/ingress/annotations/class" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/process" "k8s.io/ingress-nginx/internal/ingress/controller/store" @@ -115,6 +114,7 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File if err != nil { klog.Fatalf("unexpected error obtaining pod information: %v", err) } + n.podInfo = pod n.store = store.New( config.EnableSSLChainCompletion, @@ -132,15 +132,13 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File config.DisableCatchAll) n.syncQueue = task.NewTaskQueue(n.syncIngress) + if config.UpdateStatus { - n.syncStatus = status.NewStatusSyncer(status.Config{ + n.syncStatus = status.NewStatusSyncer(pod, status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, - ElectionID: config.ElectionID, - IngressClass: class.IngressClass, - DefaultIngressClass: class.DefaultClass, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) @@ -215,13 +213,15 @@ Error loading new template: %v // NGINXController describes a NGINX Ingress controller. type NGINXController struct { + podInfo *k8s.PodInfo + cfg *Configuration recorder record.EventRecorder syncQueue *task.Queue - syncStatus status.Sync + syncStatus status.Syncer syncRateLimiter flowcontrol.RateLimiter @@ -262,9 +262,27 @@ func (n *NGINXController) Start() { n.store.Run(n.stopCh) - if n.syncStatus != nil { - go n.syncStatus.Run() - } + setupLeaderElection(&leaderElectionConfig{ + Client: n.cfg.Client, + ElectionID: n.cfg.ElectionID, + OnStartedLeading: func(stopCh chan struct{}) { + if n.syncStatus != nil { + go n.syncStatus.Run(stopCh) + } + }, + OnStoppedLeading: func() { + // Remove prometheus metrics related to SSL certificates + srvs := sets.NewString() + for _, s := range n.runningConfig.Servers { + if !srvs.Has(s.Hostname) { + srvs.Insert(s.Hostname) + } + } + n.metricCollector.RemoveMetrics(nil, srvs.List()) + }, + PodName: n.podInfo.Name, + PodNamespace: n.podInfo.Namespace, + }) cmd := nginxExecCommand() diff --git a/internal/ingress/controller/status.go b/internal/ingress/controller/status.go new file mode 100644 index 0000000000..187de3e4e5 --- /dev/null +++ b/internal/ingress/controller/status.go @@ -0,0 +1,133 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "os" + "time" + + "k8s.io/klog" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-nginx/internal/ingress/annotations/class" +) + +type leaderElectionConfig struct { + PodName string + PodNamespace string + + Client clientset.Interface + + ElectionID string + + OnStartedLeading func(chan struct{}) + OnStoppedLeading func() +} + +func setupLeaderElection(config *leaderElectionConfig) { + // we need to use the defined ingress class to allow multiple leaders + // in order to update information about ingress status + electionID := fmt.Sprintf("%v-%v", config.ElectionID, class.DefaultClass) + if class.IngressClass != "" { + electionID = fmt.Sprintf("%v-%v", config.ElectionID, class.IngressClass) + } + + var elector *leaderelection.LeaderElector + + // start a new context + ctx := context.Background() + + var cancelContext context.CancelFunc + + var newLeaderCtx = func(ctx context.Context) context.CancelFunc { + // allow to cancel the context in case we stop being the leader + leaderCtx, cancel := context.WithCancel(ctx) + go elector.Run(leaderCtx) + return cancel + } + + var stopCh chan struct{} + callbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.V(2).Infof("I am the new leader") + stopCh = make(chan struct{}) + + if config.OnStartedLeading != nil { + config.OnStartedLeading(stopCh) + } + }, + OnStoppedLeading: func() { + klog.V(2).Info("I am not leader anymore") + close(stopCh) + + // cancel the context + cancelContext() + + cancelContext = newLeaderCtx(ctx) + + if config.OnStoppedLeading != nil { + config.OnStoppedLeading() + } + }, + OnNewLeader: func(identity string) { + klog.Infof("new leader elected: %v", identity) + }, + } + + broadcaster := record.NewBroadcaster() + hostname, _ := os.Hostname() + + recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ + Component: "ingress-leader-elector", + Host: hostname, + }) + + lock := resourcelock.ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: electionID}, + Client: config.Client.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: config.PodName, + EventRecorder: recorder, + }, + } + + ttl := 30 * time.Second + var err error + + elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ + Lock: &lock, + LeaseDuration: ttl, + RenewDeadline: ttl / 2, + RetryPeriod: ttl / 4, + + Callbacks: callbacks, + }) + if err != nil { + klog.Fatalf("unexpected error starting leader election: %v", err) + } + + cancelContext = newLeaderCtx(ctx) +} diff --git a/internal/ingress/status/status.go b/internal/ingress/status/status.go index 635bb5e6ac..399e3e39a9 100644 --- a/internal/ingress/status/status.go +++ b/internal/ingress/status/status.go @@ -17,10 +17,8 @@ limitations under the License. package status import ( - "context" "fmt" "net" - "os" "sort" "strings" "time" @@ -34,10 +32,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/ingress-nginx/internal/ingress" @@ -49,9 +43,10 @@ const ( updateInterval = 60 * time.Second ) -// Sync ... -type Sync interface { - Run() +// Syncer ... +type Syncer interface { + Run(chan struct{}) + Shutdown() } @@ -68,16 +63,11 @@ type Config struct { PublishStatusAddress string - ElectionID string - UpdateStatusOnShutdown bool UseNodeInternalIP bool IngressLister ingressLister - - DefaultIngressClass string - IngressClass string } // statusSync keeps the status IP in each Ingress rule updated executing a periodic check @@ -89,109 +79,35 @@ type Config struct { // two flags are set, the source is the IP/s of the node/s type statusSync struct { Config + // pod contains runtime information about this pod pod *k8s.PodInfo - elector *leaderelection.LeaderElector - // workqueue used to keep in sync the status IP/s // in the Ingress rules syncQueue *task.Queue } -// Run starts the loop to keep the status in sync -func (s statusSync) Run() { - // we need to use the defined ingress class to allow multiple leaders - // in order to update information about ingress status - electionID := fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.DefaultIngressClass) - if s.Config.IngressClass != "" { - electionID = fmt.Sprintf("%v-%v", s.Config.ElectionID, s.Config.IngressClass) - } - - // start a new context - ctx := context.Background() - - var cancelContext context.CancelFunc - - var newLeaderCtx = func(ctx context.Context) context.CancelFunc { - // allow to cancel the context in case we stop being the leader - leaderCtx, cancel := context.WithCancel(ctx) - go s.elector.Run(leaderCtx) - return cancel - } - - var stopCh chan struct{} - callbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - klog.V(2).Infof("I am the new status update leader") - stopCh = make(chan struct{}) - go s.syncQueue.Run(time.Second, stopCh) - // trigger initial sync - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - // when this instance is the leader we need to enqueue - // an item to trigger the update of the Ingress status. - wait.PollUntil(updateInterval, func() (bool, error) { - s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - return false, nil - }, stopCh) - }, - OnStoppedLeading: func() { - klog.V(2).Info("I am not status update leader anymore") - close(stopCh) - - // cancel the context - cancelContext() - - cancelContext = newLeaderCtx(ctx) - }, - OnNewLeader: func(identity string) { - klog.Infof("new leader elected: %v", identity) - }, - } - - broadcaster := record.NewBroadcaster() - hostname, _ := os.Hostname() - - recorder := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ - Component: "ingress-leader-elector", - Host: hostname, - }) +// Start starts the loop to keep the status in sync +func (s statusSync) Run(stopCh chan struct{}) { + go s.syncQueue.Run(time.Second, stopCh) - lock := resourcelock.ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{Namespace: s.pod.Namespace, Name: electionID}, - Client: s.Config.Client.CoreV1(), - LockConfig: resourcelock.ResourceLockConfig{ - Identity: s.pod.Name, - EventRecorder: recorder, - }, - } + // trigger initial sync + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) - ttl := 30 * time.Second - var err error - s.elector, err = leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: &lock, - LeaseDuration: ttl, - RenewDeadline: ttl / 2, - RetryPeriod: ttl / 4, - Callbacks: callbacks, - }) - if err != nil { - klog.Fatalf("unexpected error starting leader election: %v", err) - } - - cancelContext = newLeaderCtx(ctx) + // when this instance is the leader we need to enqueue + // an item to trigger the update of the Ingress status. + wait.PollUntil(updateInterval, func() (bool, error) { + s.syncQueue.EnqueueTask(task.GetDummyObject("sync status")) + return false, nil + }, stopCh) } -// Shutdown stop the sync. In case the instance is the leader it will remove the current IP +// Shutdown stops the sync. In case the instance is the leader it will remove the current IP // if there is no other instances running. func (s statusSync) Shutdown() { go s.syncQueue.Shutdown() - // remove IP from Ingress - if s.elector != nil && !s.elector.IsLeader() { - return - } - if !s.UpdateStatusOnShutdown { klog.Warningf("skipping update of status of Ingress rules") return @@ -226,10 +142,6 @@ func (s *statusSync) sync(key interface{}) error { return nil } - if s.elector != nil && !s.elector.IsLeader() { - return fmt.Errorf("i am not the current leader. Skiping status update") - } - addrs, err := s.runningAddresses() if err != nil { return err @@ -243,15 +155,10 @@ func (s statusSync) keyfunc(input interface{}) (interface{}, error) { return input, nil } -// NewStatusSyncer returns a new Sync instance -func NewStatusSyncer(config Config) Sync { - pod, err := k8s.GetPodDetails(config.Client) - if err != nil { - klog.Fatalf("unexpected error obtaining pod information: %v", err) - } - +// NewStatusSyncer returns a new Syncer instance +func NewStatusSyncer(podInfo *k8s.PodInfo, config Config) Syncer { st := statusSync{ - pod: pod, + pod: podInfo, Config: config, } From 476d0106d6d61709e377a01644e7943a87d2607e Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Sun, 10 Mar 2019 13:46:44 -0300 Subject: [PATCH 2/6] Fix status tests --- internal/ingress/status/status_test.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/ingress/status/status_test.go b/internal/ingress/status/status_test.go index 463f830b98..c070dc21f3 100644 --- a/internal/ingress/status/status_test.go +++ b/internal/ingress/status/status_test.go @@ -287,12 +287,16 @@ func TestStatusActions(t *testing.T) { Client: buildSimpleClientSet(), PublishService: "", IngressLister: buildIngressLister(), - DefaultIngressClass: "nginx", - IngressClass: "", UpdateStatusOnShutdown: true, } // create object - fkSync := NewStatusSyncer(c) + fkSync := NewStatusSyncer(&k8s.PodInfo{ + Name: "foo_base_pod", + Namespace: apiv1.NamespaceDefault, + Labels: map[string]string{ + "lable_sig": "foo_pod", + }, + }, c) if fkSync == nil { t.Fatalf("expected a valid Sync") } @@ -300,7 +304,10 @@ func TestStatusActions(t *testing.T) { fk := fkSync.(statusSync) // start it and wait for the election and syn actions - go fk.Run() + stopCh := make(chan struct{}) + defer close(stopCh) + + go fk.Run(stopCh) // wait for the election time.Sleep(100 * time.Millisecond) // execute sync From 7c717cabcf7d1b3c30ef5e5474f88d0f5a9dc737 Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Sun, 10 Mar 2019 19:12:33 -0300 Subject: [PATCH 3/6] Add promehteus metric about leader election status --- internal/ingress/controller/nginx.go | 4 ++++ .../ingress/metric/collectors/controller.go | 19 +++++++++++++++++++ internal/ingress/metric/dummy.go | 6 ++++++ internal/ingress/metric/main.go | 13 +++++++++++++ 4 files changed, 42 insertions(+) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 00f83c4952..72b0da1b58 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -269,8 +269,12 @@ func (n *NGINXController) Start() { if n.syncStatus != nil { go n.syncStatus.Run(stopCh) } + + n.metricCollector.OnStartedLeading(n.cfg.ElectionID) }, OnStoppedLeading: func() { + n.metricCollector.OnStoppedLeading(n.cfg.ElectionID) + // Remove prometheus metrics related to SSL certificates srvs := sets.NewString() for _, s := range n.runningConfig.Servers { diff --git a/internal/ingress/metric/collectors/controller.go b/internal/ingress/metric/collectors/controller.go index c24caaa45e..8aeeaffe97 100644 --- a/internal/ingress/metric/collectors/controller.go +++ b/internal/ingress/metric/collectors/controller.go @@ -46,6 +46,8 @@ type Controller struct { constLabels prometheus.Labels labels prometheus.Labels + + leaderElection *prometheus.GaugeVec } // NewController creates a new prometheus collector for the @@ -112,6 +114,13 @@ func NewController(pod, namespace, class string) *Controller { }, sslLabelHost, ), + leaderElection: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "leader_election_status", + Help: "Gauge reporting status of the leader election, 0 indicates follower, 1 indicates leader. 'name' is the string used to identify the lease", + }, + []string{"name"}, + ), } return cm @@ -127,6 +136,16 @@ func (cm *Controller) IncReloadErrorCount() { cm.reloadOperationErrors.With(cm.constLabels).Inc() } +// OnStartedLeading indicates the pod is not the current leader +func (cm *Controller) OnStartedLeading(electionID string) { + cm.leaderElection.WithLabelValues(electionID).Set(0) +} + +// OnStoppedLeading indicates the pod is not the current leader +func (cm *Controller) OnStoppedLeading(electionID string) { + cm.leaderElection.WithLabelValues(electionID).Set(1.0) +} + // ConfigSuccess set a boolean flag according to the output of the controller configuration reload func (cm *Controller) ConfigSuccess(hash uint64, success bool) { if success { diff --git a/internal/ingress/metric/dummy.go b/internal/ingress/metric/dummy.go index 4c6872195a..46ce3f7a8e 100644 --- a/internal/ingress/metric/dummy.go +++ b/internal/ingress/metric/dummy.go @@ -52,3 +52,9 @@ func (dc DummyCollector) SetSSLExpireTime([]*ingress.Server) {} // SetHosts ... func (dc DummyCollector) SetHosts(hosts sets.String) {} + +// OnStartedLeading indicates the pod is not the current leader +func (dc DummyCollector) OnStartedLeading(electionID string) {} + +// OnStoppedLeading indicates the pod is not the current leader +func (dc DummyCollector) OnStoppedLeading(electionID string) {} diff --git a/internal/ingress/metric/main.go b/internal/ingress/metric/main.go index 4950a24976..fc161c4fa8 100644 --- a/internal/ingress/metric/main.go +++ b/internal/ingress/metric/main.go @@ -36,6 +36,9 @@ type Collector interface { IncReloadCount() IncReloadErrorCount() + OnStartedLeading(string) + OnStoppedLeading(string) + RemoveMetrics(ingresses, endpoints []string) SetSSLExpireTime([]*ingress.Server) @@ -147,3 +150,13 @@ func (c *collector) SetSSLExpireTime(servers []*ingress.Server) { func (c *collector) SetHosts(hosts sets.String) { c.socket.SetHosts(hosts) } + +// OnStartedLeading indicates the pod is not the current leader +func (c *collector) OnStartedLeading(electionID string) { + c.ingressController.OnStartedLeading(electionID) +} + +// OnStoppedLeading indicates the pod is not the current leader +func (c *collector) OnStoppedLeading(electionID string) { + c.ingressController.OnStoppedLeading(electionID) +} From 20a89480f0ca0f8fbe1ce7fc3888b4e6344003c0 Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Mon, 11 Mar 2019 12:57:28 -0300 Subject: [PATCH 4/6] Use full election leader ID --- internal/ingress/controller/nginx.go | 14 +++++++++++--- internal/ingress/controller/status.go | 12 +----------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 72b0da1b58..3da8336353 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -49,6 +49,7 @@ import ( "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress" + "k8s.io/ingress-nginx/internal/ingress/annotations/class" ngx_config "k8s.io/ingress-nginx/internal/ingress/controller/config" "k8s.io/ingress-nginx/internal/ingress/controller/process" "k8s.io/ingress-nginx/internal/ingress/controller/store" @@ -262,18 +263,25 @@ func (n *NGINXController) Start() { n.store.Run(n.stopCh) + // we need to use the defined ingress class to allow multiple leaders + // in order to update information about ingress status + electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass) + if class.IngressClass != "" { + electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass) + } + setupLeaderElection(&leaderElectionConfig{ Client: n.cfg.Client, - ElectionID: n.cfg.ElectionID, + ElectionID: electionID, OnStartedLeading: func(stopCh chan struct{}) { if n.syncStatus != nil { go n.syncStatus.Run(stopCh) } - n.metricCollector.OnStartedLeading(n.cfg.ElectionID) + n.metricCollector.OnStartedLeading(electionID) }, OnStoppedLeading: func() { - n.metricCollector.OnStoppedLeading(n.cfg.ElectionID) + n.metricCollector.OnStoppedLeading(electionID) // Remove prometheus metrics related to SSL certificates srvs := sets.NewString() diff --git a/internal/ingress/controller/status.go b/internal/ingress/controller/status.go index 187de3e4e5..f6f562b919 100644 --- a/internal/ingress/controller/status.go +++ b/internal/ingress/controller/status.go @@ -18,7 +18,6 @@ package controller import ( "context" - "fmt" "os" "time" @@ -31,8 +30,6 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" - - "k8s.io/ingress-nginx/internal/ingress/annotations/class" ) type leaderElectionConfig struct { @@ -48,13 +45,6 @@ type leaderElectionConfig struct { } func setupLeaderElection(config *leaderElectionConfig) { - // we need to use the defined ingress class to allow multiple leaders - // in order to update information about ingress status - electionID := fmt.Sprintf("%v-%v", config.ElectionID, class.DefaultClass) - if class.IngressClass != "" { - electionID = fmt.Sprintf("%v-%v", config.ElectionID, class.IngressClass) - } - var elector *leaderelection.LeaderElector // start a new context @@ -106,7 +96,7 @@ func setupLeaderElection(config *leaderElectionConfig) { }) lock := resourcelock.ConfigMapLock{ - ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: electionID}, + ConfigMapMeta: metav1.ObjectMeta{Namespace: config.PodNamespace, Name: config.ElectionID}, Client: config.Client.CoreV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: config.PodName, From 870b89c72be46f551849dd0ba1676e57fb055936 Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Mon, 11 Mar 2019 13:20:41 -0300 Subject: [PATCH 5/6] Fix documentation --- internal/ingress/metric/collectors/controller.go | 4 ++-- internal/ingress/metric/main.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/ingress/metric/collectors/controller.go b/internal/ingress/metric/collectors/controller.go index 8aeeaffe97..5c281b988c 100644 --- a/internal/ingress/metric/collectors/controller.go +++ b/internal/ingress/metric/collectors/controller.go @@ -136,12 +136,12 @@ func (cm *Controller) IncReloadErrorCount() { cm.reloadOperationErrors.With(cm.constLabels).Inc() } -// OnStartedLeading indicates the pod is not the current leader +// OnStartedLeading indicates the pod was elected as the leader func (cm *Controller) OnStartedLeading(electionID string) { cm.leaderElection.WithLabelValues(electionID).Set(0) } -// OnStoppedLeading indicates the pod is not the current leader +// OnStoppedLeading indicates the pod stopped being the leader func (cm *Controller) OnStoppedLeading(electionID string) { cm.leaderElection.WithLabelValues(electionID).Set(1.0) } diff --git a/internal/ingress/metric/main.go b/internal/ingress/metric/main.go index fc161c4fa8..d604b7fb04 100644 --- a/internal/ingress/metric/main.go +++ b/internal/ingress/metric/main.go @@ -151,12 +151,12 @@ func (c *collector) SetHosts(hosts sets.String) { c.socket.SetHosts(hosts) } -// OnStartedLeading indicates the pod is not the current leader +// OnStartedLeading indicates the pod was elected as the leader func (c *collector) OnStartedLeading(electionID string) { c.ingressController.OnStartedLeading(electionID) } -// OnStoppedLeading indicates the pod is not the current leader +// OnStoppedLeading indicates the pod stopped being the leader func (c *collector) OnStoppedLeading(electionID string) { c.ingressController.OnStoppedLeading(electionID) } From f4e4335d8c3f6744e53a29388610ddad596a41b5 Mon Sep 17 00:00:00 2001 From: Manuel Alejandro de Brito Fontes Date: Mon, 11 Mar 2019 13:31:38 -0300 Subject: [PATCH 6/6] Only the leader updates metrics for SSL certificate expiration --- internal/ingress/controller/controller.go | 6 +++- internal/ingress/controller/nginx.go | 29 +++++++++++++------ .../ingress/metric/collectors/controller.go | 24 +++++++++++---- internal/ingress/metric/main.go | 1 + 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 1878a31723..7de36425b3 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -190,7 +190,11 @@ func (n *NGINXController) syncIngress(interface{}) error { klog.Infof("Backend successfully reloaded.") n.metricCollector.ConfigSuccess(hash, true) n.metricCollector.IncReloadCount() - n.metricCollector.SetSSLExpireTime(servers) + + if n.isLeader() { + klog.V(2).Infof("Updating ssl expiration metrics.") + n.metricCollector.SetSSLExpireTime(servers) + } } isFirstSync := n.runningConfig.Equal(&ingress.Configuration{}) diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 3da8336353..0a29f3d082 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -31,6 +31,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "text/template" "time" @@ -255,6 +256,8 @@ type NGINXController struct { fileSystem filesystem.Filesystem metricCollector metric.Collector + + currentLeader uint32 } // Start starts a new NGINX master process running in the foreground. @@ -278,19 +281,15 @@ func (n *NGINXController) Start() { go n.syncStatus.Run(stopCh) } + n.setLeader(true) n.metricCollector.OnStartedLeading(electionID) + // manually update SSL expiration metrics + // (to not wait for a reload) + n.metricCollector.SetSSLExpireTime(n.runningConfig.Servers) }, OnStoppedLeading: func() { + n.setLeader(false) n.metricCollector.OnStoppedLeading(electionID) - - // Remove prometheus metrics related to SSL certificates - srvs := sets.NewString() - for _, s := range n.runningConfig.Servers { - if !srvs.Has(s.Hostname) { - srvs.Insert(s.Hostname) - } - } - n.metricCollector.RemoveMetrics(nil, srvs.List()) }, PodName: n.podInfo.Name, PodNamespace: n.podInfo.Namespace, @@ -1129,3 +1128,15 @@ func buildRedirects(servers []*ingress.Server) []*redirect { return redirectServers } + +func (n *NGINXController) setLeader(leader bool) { + var i uint32 + if leader { + i = 1 + } + atomic.StoreUint32(&n.currentLeader, i) +} + +func (n *NGINXController) isLeader() bool { + return atomic.LoadUint32(&n.currentLeader) != 0 +} diff --git a/internal/ingress/metric/collectors/controller.go b/internal/ingress/metric/collectors/controller.go index 5c281b988c..3df99bafda 100644 --- a/internal/ingress/metric/collectors/controller.go +++ b/internal/ingress/metric/collectors/controller.go @@ -116,8 +116,10 @@ func NewController(pod, namespace, class string) *Controller { ), leaderElection: prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "leader_election_status", - Help: "Gauge reporting status of the leader election, 0 indicates follower, 1 indicates leader. 'name' is the string used to identify the lease", + Namespace: PrometheusNamespace, + Name: "leader_election_status", + Help: "Gauge reporting status of the leader election, 0 indicates follower, 1 indicates leader. 'name' is the string used to identify the lease", + ConstLabels: constLabels, }, []string{"name"}, ), @@ -138,12 +140,12 @@ func (cm *Controller) IncReloadErrorCount() { // OnStartedLeading indicates the pod was elected as the leader func (cm *Controller) OnStartedLeading(electionID string) { - cm.leaderElection.WithLabelValues(electionID).Set(0) + cm.leaderElection.WithLabelValues(electionID).Set(1.0) } // OnStoppedLeading indicates the pod stopped being the leader func (cm *Controller) OnStoppedLeading(electionID string) { - cm.leaderElection.WithLabelValues(electionID).Set(1.0) + cm.leaderElection.WithLabelValues(electionID).Set(0) } // ConfigSuccess set a boolean flag according to the output of the controller configuration reload @@ -169,6 +171,7 @@ func (cm Controller) Describe(ch chan<- *prometheus.Desc) { cm.reloadOperation.Describe(ch) cm.reloadOperationErrors.Describe(ch) cm.sslExpireTime.Describe(ch) + cm.leaderElection.Describe(ch) } // Collect implements the prometheus.Collector interface. @@ -179,6 +182,7 @@ func (cm Controller) Collect(ch chan<- prometheus.Metric) { cm.reloadOperation.Collect(ch) cm.reloadOperationErrors.Collect(ch) cm.sslExpireTime.Collect(ch) + cm.leaderElection.Collect(ch) } // SetSSLExpireTime sets the expiration time of SSL Certificates @@ -198,13 +202,21 @@ func (cm *Controller) SetSSLExpireTime(servers []*ingress.Server) { // RemoveMetrics removes metrics for hostnames not available anymore func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer) { + cm.removeSSLExpireMetrics(true, hosts, registry) +} + +// RemoveAllSSLExpireMetrics removes metrics for expiration of SSL Certificates +func (cm *Controller) RemoveAllSSLExpireMetrics(registry prometheus.Gatherer) { + cm.removeSSLExpireMetrics(false, []string{}, registry) +} + +func (cm *Controller) removeSSLExpireMetrics(onlyDefinedHosts bool, hosts []string, registry prometheus.Gatherer) { mfs, err := registry.Gather() if err != nil { klog.Errorf("Error gathering metrics: %v", err) return } - klog.V(2).Infof("removing SSL certificate metrics for %v hosts", hosts) toRemove := sets.NewString(hosts...) for _, mf := range mfs { @@ -227,7 +239,7 @@ func (cm *Controller) RemoveMetrics(hosts []string, registry prometheus.Gatherer continue } - if !toRemove.Has(host) { + if onlyDefinedHosts && !toRemove.Has(host) { continue } diff --git a/internal/ingress/metric/main.go b/internal/ingress/metric/main.go index d604b7fb04..8039c2d743 100644 --- a/internal/ingress/metric/main.go +++ b/internal/ingress/metric/main.go @@ -159,4 +159,5 @@ func (c *collector) OnStartedLeading(electionID string) { // OnStoppedLeading indicates the pod stopped being the leader func (c *collector) OnStoppedLeading(electionID string) { c.ingressController.OnStoppedLeading(electionID) + c.ingressController.RemoveAllSSLExpireMetrics(c.registry) }