diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index 391c4eea63..b97e45889e 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -56,6 +56,7 @@ import ( ngx_template "k8s.io/ingress-nginx/internal/ingress/controller/template" "k8s.io/ingress-nginx/internal/ingress/metric" "k8s.io/ingress-nginx/internal/ingress/status" + "k8s.io/ingress-nginx/internal/k8s" ing_net "k8s.io/ingress-nginx/internal/net" "k8s.io/ingress-nginx/internal/net/dns" "k8s.io/ingress-nginx/internal/net/ssl" @@ -110,6 +111,11 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File metricCollector: mc, } + pod, err := k8s.GetPodDetails(config.Client) + if err != nil { + glog.Fatalf("unexpected error obtaining pod information: %v", err) + } + n.store = store.New( config.EnableSSLChainCompletion, config.Namespace, @@ -121,7 +127,8 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File config.Client, fs, n.updateCh, - config.DynamicCertificatesEnabled) + config.DynamicCertificatesEnabled, + pod) n.syncQueue = task.NewTaskQueue(n.syncIngress) diff --git a/internal/ingress/controller/store/pod.go b/internal/ingress/controller/store/pod.go new file mode 100644 index 0000000000..1a437d7692 --- /dev/null +++ b/internal/ingress/controller/store/pod.go @@ -0,0 +1,26 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "k8s.io/client-go/tools/cache" +) + +// PodLister makes a Store that lists Pods. +type PodLister struct { + cache.Store +} diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 0870eda038..72ec713926 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -30,8 +30,11 @@ import ( corev1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -76,6 +79,9 @@ type Storer interface { // ListIngresses returns a list of all Ingresses in the store. ListIngresses() []*ingress.Ingress + // ListControllerPods returns a list of ingress-nginx controller Pods. + ListControllerPods() []*corev1.Pod + // GetLocalSSLCert returns the local copy of a SSLCert GetLocalSSLCert(name string) (*ingress.SSLCert, error) @@ -121,6 +127,7 @@ type Informer struct { Service cache.SharedIndexInformer Secret cache.SharedIndexInformer ConfigMap cache.SharedIndexInformer + Pod cache.SharedIndexInformer } // Lister contains object listers (stores). @@ -131,6 +138,7 @@ type Lister struct { Secret SecretLister ConfigMap ConfigMapLister IngressAnnotation IngressAnnotationsLister + Pod PodLister } // NotExistsError is returned when an object does not exist in a local store. @@ -147,6 +155,7 @@ func (i *Informer) Run(stopCh chan struct{}) { go i.Service.Run(stopCh) go i.Secret.Run(stopCh) go i.ConfigMap.Run(stopCh) + go i.Pod.Run(stopCh) // wait for all involved caches to be synced before processing items // from the queue @@ -211,6 +220,8 @@ type k8sStore struct { defaultSSLCertificate string isDynamicCertificatesEnabled bool + + pod *k8s.PodInfo } // New creates a new object store to be used in the ingress controller @@ -220,7 +231,8 @@ func New(checkOCSP bool, client clientset.Interface, fs file.Filesystem, updateCh *channels.RingChannel, - isDynamicCertificatesEnabled bool) Storer { + isDynamicCertificatesEnabled bool, + pod *k8s.PodInfo) Storer { store := &k8sStore{ isOCSPCheckEnabled: checkOCSP, @@ -234,6 +246,7 @@ func New(checkOCSP bool, secretIngressMap: NewObjectRefMap(), defaultSSLCertificate: defaultSSLCertificate, isDynamicCertificatesEnabled: isDynamicCertificatesEnabled, + pod: pod, } eventBroadcaster := record.NewBroadcaster() @@ -270,6 +283,26 @@ func New(checkOCSP bool, store.informers.Service = infFactory.Core().V1().Services().Informer() store.listers.Service.Store = store.informers.Service.GetStore() + labelSelector := labels.SelectorFromSet(store.pod.Labels) + store.informers.Pod = cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) { + + options.LabelSelector = labelSelector.String() + return client.CoreV1().Pods(store.pod.Namespace).List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + + options.LabelSelector = labelSelector.String() + return client.CoreV1().Pods(store.pod.Namespace).Watch(options) + }, + }, + &corev1.Pod{}, + resyncPeriod, + cache.Indexers{}, + ) + store.listers.Pod.Store = store.informers.Pod.GetStore() + ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing := obj.(*extensions.Ingress) @@ -512,11 +545,40 @@ func New(checkOCSP bool, }, } + podEventHandler := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + updateCh.In() <- Event{ + Type: CreateEvent, + Obj: obj, + } + }, + UpdateFunc: func(old, cur interface{}) { + oldPod := old.(*corev1.Pod) + curPod := cur.(*corev1.Pod) + + if oldPod.Status.Phase == curPod.Status.Phase { + return + } + + updateCh.In() <- Event{ + Type: UpdateEvent, + Obj: cur, + } + }, + DeleteFunc: func(obj interface{}) { + updateCh.In() <- Event{ + Type: DeleteEvent, + Obj: obj, + } + }, + } + store.informers.Ingress.AddEventHandler(ingEventHandler) store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{}) + store.informers.Pod.AddEventHandler(podEventHandler) // do not wait for informers to read the configmap configuration ns, name, _ := k8s.ParseNameNS(configmap) @@ -773,3 +835,20 @@ func (s k8sStore) Run(stopCh chan struct{}) { go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh) } } + +// ListControllerPods returns a list of ingress-nginx controller Pods +func (s k8sStore) ListControllerPods() []*corev1.Pod { + var pods []*corev1.Pod + + for _, i := range s.listers.Pod.List() { + pod := i.(*corev1.Pod) + + if pod.Status.Phase != corev1.PodRunning { + continue + } + + pods = append(pods, pod) + } + + return pods +} diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go index a3ccd8cd63..a14cc81218 100644 --- a/internal/ingress/controller/store/store_test.go +++ b/internal/ingress/controller/store/store_test.go @@ -18,6 +18,7 @@ package store import ( "fmt" + "os" "sync" "sync/atomic" "testing" @@ -38,10 +39,19 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/ingress-nginx/internal/file" "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + "k8s.io/ingress-nginx/internal/k8s" "k8s.io/ingress-nginx/test/e2e/framework" ) func TestStore(t *testing.T) { + pod := &k8s.PodInfo{ + Name: "testpod", + Namespace: v1.NamespaceDefault, + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + } + clientSet := fake.NewSimpleClientset() t.Run("should return an error searching for non existing objects", func(t *testing.T) { @@ -70,7 +80,8 @@ func TestStore(t *testing.T) { clientSet, fs, updateCh, - false) + false, + pod) storer.Run(stopCh) @@ -158,7 +169,8 @@ func TestStore(t *testing.T) { clientSet, fs, updateCh, - false) + false, + pod) storer.Run(stopCh) @@ -306,7 +318,8 @@ func TestStore(t *testing.T) { clientSet, fs, updateCh, - false) + false, + pod) storer.Run(stopCh) @@ -395,7 +408,8 @@ func TestStore(t *testing.T) { clientSet, fs, updateCh, - false) + false, + pod) storer.Run(stopCh) @@ -507,7 +521,8 @@ func TestStore(t *testing.T) { clientSet, fs, updateCh, - false) + false, + pod) storer.Run(stopCh) @@ -727,17 +742,27 @@ func newStore(t *testing.T) *k8sStore { t.Fatalf("error: %v", err) } + pod := &k8s.PodInfo{ + Name: "ingress-1", + Namespace: v1.NamespaceDefault, + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + } + return &k8sStore{ listers: &Lister{ // add more listers if needed Ingress: IngressLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, IngressAnnotation: IngressAnnotationsLister{cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)}, + Pod: PodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, }, sslStore: NewSSLCertTracker(), filesystem: fs, updateCh: channels.NewRingChannel(10), mu: new(sync.Mutex), secretIngressMap: NewObjectRefMap(), + pod: pod, } } @@ -943,3 +968,64 @@ func TestWriteSSLSessionTicketKey(t *testing.T) { } } } + +func TestListControllerPods(t *testing.T) { + os.Setenv("POD_NAMESPACE", "testns") + os.Setenv("POD_NAME", "ingress-1") + + s := newStore(t) + s.pod = &k8s.PodInfo{ + Name: "ingress-1", + Namespace: "testns", + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + } + + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ingress-1", + Namespace: "testns", + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + s.listers.Pod.Add(pod) + + pod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ingress-2", + Namespace: "testns", + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + s.listers.Pod.Add(pod) + + pod = &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ingress-3", + Namespace: "testns", + Labels: map[string]string{ + "pod-template-hash": "1234", + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + } + s.listers.Pod.Add(pod) + + pods := s.ListControllerPods() + if s := len(pods); s != 2 { + t.Errorf("Expected 1 controller Pods but got %v", s) + } +}