Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add watching for secret resource updates #166

Merged
merged 1 commit into from
Aug 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 148 additions & 13 deletions nginx-controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ type LoadBalancerController struct {
svcController cache.Controller
endpController cache.Controller
cfgmController cache.Controller
secrController cache.Controller
ingLister StoreToIngressLister
svcLister cache.Store
endpLister StoreToEndpointLister
cfgmLister StoreToConfigMapLister
secrLister StoreToSecretLister
syncQueue *taskQueue
stopCh chan struct{}
cnf *nginx.Configurator
Expand Down Expand Up @@ -200,8 +202,49 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, resyncPeriod tim
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "endpoints", namespace, fields.Everything()),
&api_v1.Endpoints{}, resyncPeriod, endpHandlers)

secrHandlers := cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
remSecr, isSecr := obj.(*api_v1.Secret)
if !isSecr {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.V(3).Infof("Error received unexpected object: %v", obj)
return
}
remSecr, ok = deletedState.Obj.(*api_v1.Secret)
if !ok {
glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Secret object: %v", deletedState.Obj)
return
}
}
if err := ValidateTLSSecret(remSecr); err != nil {
return
}

glog.V(3).Infof("Removing Secret: %v", remSecr.Name)
lbc.syncQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
errOld := ValidateTLSSecret(old.(*api_v1.Secret))
errCur := ValidateTLSSecret(cur.(*api_v1.Secret))
if errOld != nil && errCur != nil {
return
}

if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Secret %v changed, syncing",
cur.(*api_v1.Secret).Name)
lbc.syncQueue.enqueue(cur)
}
},
}

lbc.secrLister.Store, lbc.secrController = cache.NewInformer(
cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "secrets", namespace, fields.Everything()),
&api_v1.Secret{}, resyncPeriod, secrHandlers)

if nginxConfigMaps != "" {
nginxConfigMapsNS, nginxConfigMapsName, err := parseNginxConfigMaps(nginxConfigMaps)
nginxConfigMapsNS, nginxConfigMapsName, err := parseNamespaceName(nginxConfigMaps)
if err != nil {
glog.Warning(err)
} else {
Expand Down Expand Up @@ -259,6 +302,7 @@ func (lbc *LoadBalancerController) Run() {
go lbc.ingController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.endpController.Run(lbc.stopCh)
go lbc.secrController.Run(lbc.stopCh)
go lbc.syncQueue.run(time.Second, lbc.stopCh)
if lbc.watchNginxConfigMaps {
go lbc.cfgmController.Run(lbc.stopCh)
Expand Down Expand Up @@ -296,8 +340,7 @@ func (lbc *LoadBalancerController) syncEndp(task Task) {
continue
}
glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Name, ing.Namespace)
name := ing.Namespace + "-" + ing.Name
lbc.cnf.UpdateEndpoints(name, ingEx)
lbc.cnf.UpdateEndpoints(ingEx)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
}
Expand Down Expand Up @@ -507,7 +550,7 @@ func (lbc *LoadBalancerController) syncCfgm(task Task) {

var ingExes []*nginx.IngressEx
ings, _ := lbc.ingLister.List()
for i, _ := range ings.Items {
for i := range ings.Items {
if !isNginxIngress(&ings.Items[i]) {
continue
}
Expand Down Expand Up @@ -551,6 +594,8 @@ func (lbc *LoadBalancerController) sync(task Task) {
case Endpoints:
lbc.syncEndp(task)
return
case Secret:
lbc.syncSecret(task)
}
}

Expand All @@ -562,14 +607,11 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
return
}

// defaut/some-ingress -> default-some-ingress
name := strings.Replace(key, "/", "-", -1)

if !ingExists {
glog.V(2).Infof("Deleting Ingress: %v\n", key)
err := lbc.cnf.DeleteIngress(name)
err := lbc.cnf.DeleteIngress(key)
if err != nil {
glog.Errorf("Error when deleting configuration for %v: %v", name, err)
glog.Errorf("Error when deleting configuration for %v: %v", key, err)
}
} else {
glog.V(2).Infof("Adding or Updating Ingress: %v\n", key)
Expand All @@ -581,7 +623,7 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}
err = lbc.cnf.AddOrUpdateIngress(name, ingEx)
err = lbc.cnf.AddOrUpdateIngress(ingEx)
if err != nil {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v was added or updated, but not applied: %v", key, err)
} else {
Expand All @@ -590,6 +632,95 @@ func (lbc *LoadBalancerController) syncIng(task Task) {
}
}

func (lbc *LoadBalancerController) syncSecret(task Task) {
key := task.Key
obj, secrExists, err := lbc.secrLister.Store.GetByKey(key)
if err != nil {
lbc.syncQueue.requeue(task, err)
return
}

_, name, err := parseNamespaceName(key)
if err != nil {
glog.Warningf("Secret key %v is invalid: %v", key, err)
return
}

ings, err := lbc.findIngressesForSecret(name)
if err != nil {
glog.Warningf("Failed to find Ingress resources for Secret %v: %v", key, err)
lbc.syncQueue.requeueAfter(task, err, 5*time.Second)
}

glog.V(2).Infof("Found %v Ingress resources with Secret %v", len(ings), key)

if !secrExists {
glog.V(2).Infof("Deleting Secret: %v\n", key)

if err := lbc.cnf.DeleteTLSSecret(key, ings); err != nil {
glog.Errorf("Error when deleting Secret: %v: %v", key, err)
}

for _, ing := range ings {
lbc.syncQueue.enqueue(&ing)
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Rejected", "%v/%v was rejected due to deleted Secret %v: %v", ing.Namespace, ing.Name, key)
}
} else {
glog.V(2).Infof("Updating Secret: %v\n", key)

secret := obj.(*api_v1.Secret)

if len(ings) > 0 {
err := ValidateTLSSecret(secret)
if err != nil {
glog.Errorf("Couldn't validate secret %v: %v", key, err)
if err := lbc.cnf.DeleteTLSSecret(key, ings); err != nil {
glog.Errorf("Error when deleting Secret: %v: %v", key, err)
}
for _, ing := range ings {
lbc.syncQueue.enqueue(&ing)
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "Rejected", "%v/%v was rejected due to invalid Secret %v: %v", ing.Namespace, ing.Name, key, err)
}
lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}

if err := lbc.cnf.AddOrUpdateTLSSecret(secret, true); err != nil {
glog.Errorf("Error when updating Secret %v: %v", key, err)
lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "UpdatedWithError", "%v was updated, but not applied: %v", key, err)
for _, ing := range ings {
lbc.recorder.Eventf(&ing, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v was updated, but not applied: %v", ing.Namespace, ing.Name, err)
}
} else {
lbc.recorder.Eventf(secret, api_v1.EventTypeNormal, "Updated", "%v was updated", key)
for _, ing := range ings {
lbc.recorder.Eventf(&ing, api_v1.EventTypeNormal, "Updated", "Configuration for %v/%v was updated", ing.Namespace, ing.Name)
}
}
}
}
}

func (lbc *LoadBalancerController) findIngressesForSecret(secret string) ([]extensions.Ingress, error) {
res := []extensions.Ingress{}
ings, err := lbc.ingLister.List()
if err != nil {
return nil, fmt.Errorf("Couldn't get the list of Ingress resources: %v", err)
}
for _, ing := range ings.Items {
if !isNginxIngress(&ing) {
continue
}
for _, tls := range ing.Spec.TLS {
if tls.SecretName == secret {
res = append(res, ing)
}
}
}

return res, nil
}

func (lbc *LoadBalancerController) enqueueIngressForService(svc *api_v1.Service) {
ings := lbc.getIngressesForService(svc)
for _, ing := range ings {
Expand Down Expand Up @@ -636,6 +767,10 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
if err != nil {
return nil, fmt.Errorf("Error retrieving secret %v for Ingress %v: %v", secretName, ing.Name, err)
}
err = ValidateTLSSecret(secret)
if err != nil {
return nil, fmt.Errorf("Error validating secret %v for Ingress %v: %v", secretName, ing.Name, err)
}
ingEx.Secrets[secretName] = secret
}

Expand Down Expand Up @@ -768,10 +903,10 @@ func (lbc *LoadBalancerController) getServiceForIngressBackend(backend *extensio
return nil, fmt.Errorf("service %s doesn't exists", svcKey)
}

func parseNginxConfigMaps(nginxConfigMaps string) (string, string, error) {
res := strings.Split(nginxConfigMaps, "/")
func parseNamespaceName(value string) (ns string, name string, err error) {
res := strings.Split(value, "/")
if len(res) != 2 {
return "", "", fmt.Errorf("NGINX configmaps name must follow the format <namespace>/<name>, got: %v", nginxConfigMaps)
return "", "", fmt.Errorf("%v must follow the format <namespace>/<name>", value)
}
return res[0], res[1], nil
}
Expand Down
20 changes: 20 additions & 0 deletions nginx-controller/controller/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package controller

import (
"fmt"

api_v1 "k8s.io/client-go/pkg/api/v1"
)

// ValidateTLSSecret validates the secret. If it is valid, the function returns nil.
func ValidateTLSSecret(secret *api_v1.Secret) error {
if _, exists := secret.Data[api_v1.TLSCertKey]; !exists {
return fmt.Errorf("Secret doesn't have %v", api_v1.TLSCertKey)
}

if _, exists := secret.Data[api_v1.TLSPrivateKeyKey]; !exists {
return fmt.Errorf("Secret doesn't have %v", api_v1.TLSPrivateKeyKey)
}

return nil
}
4 changes: 4 additions & 0 deletions nginx-controller/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ const (
Endpoints
// ConfigMap resource
ConfigMap
// Secret resource
Secret
)

// Task is an element of a taskQueue
Expand All @@ -136,6 +138,8 @@ func NewTask(key string, obj interface{}) (Task, error) {
k = Endpoints
case *api_v1.ConfigMap:
k = ConfigMap
case *api_v1.Secret:
k = Secret
default:
return Task{}, fmt.Errorf("Unknow type: %v", t)
}
Expand Down
Loading