Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for events #160

Merged
merged 1 commit into from
Jul 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 57 additions & 8 deletions nginx-controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
scheme "k8s.io/client-go/kubernetes/scheme"
core_v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api_v1 "k8s.io/client-go/pkg/api/v1"
Expand Down Expand Up @@ -60,6 +63,7 @@ type LoadBalancerController struct {
cnf *nginx.Configurator
watchNginxConfigMaps bool
nginxPlus bool
recorder record.EventRecorder
}

var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
Expand All @@ -73,6 +77,14 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, resyncPeriod tim
nginxPlus: nginxPlus,
}

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&core_v1.EventSinkImpl{
Interface: core_v1.New(kubeClient.Core().RESTClient()).Events(""),
})
lbc.recorder = eventBroadcaster.NewRecorder(scheme.Scheme,
api_v1.EventSource{Component: "nginx-ingress-controller"})

lbc.ingQueue = NewTaskQueue(lbc.syncIng)
lbc.endpQueue = NewTaskQueue(lbc.syncEndp)

Expand Down Expand Up @@ -289,15 +301,17 @@ func (lbc *LoadBalancerController) syncEndp(key string) {
}
ingEx, err := lbc.createIngress(&ing)
if err != nil {
glog.Warningf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err)
glog.Errorf("Error updating endpoints for %v/%v: %v, skipping", ing.Namespace, ing.Name, err)
continue
}
glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Name, ing.Namespace)
name := ing.Namespace + "-" + ing.Name
lbc.cnf.UpdateEndpoints(name, ingEx)
if err != nil {
glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err)
}
}
}

}

func (lbc *LoadBalancerController) syncCfgm(key string) {
Expand Down Expand Up @@ -498,14 +512,38 @@ func (lbc *LoadBalancerController) syncCfgm(key string) {
}

}
lbc.cnf.UpdateConfig(cfg)

var ingExes []*nginx.IngressEx
ings, _ := lbc.ingLister.List()
for _, ing := range ings.Items {
if !isNginxIngress(&ing) {
for i, _ := range ings.Items {
if !isNginxIngress(&ings.Items[i]) {
continue
}
lbc.ingQueue.enqueue(&ing)
ingEx, err := lbc.createIngress(&ings.Items[i])
if err != nil {
continue
}

ingExes = append(ingExes, ingEx)
}

if err := lbc.cnf.UpdateConfig(cfg, ingExes); err != nil {
if cfgmExists {
cfgm := obj.(*api_v1.ConfigMap)
lbc.recorder.Eventf(cfgm, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration from %v was updated, but not applied: %v", key, err)
}
for _, ingEx := range ingExes {
lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v was updated, but not applied: %v",
ingEx.Ingress.Name, ingEx.Ingress.Namespace, err)
}
} else {
if cfgmExists {
cfgm := obj.(*api_v1.ConfigMap)
lbc.recorder.Eventf(cfgm, api_v1.EventTypeNormal, "Updated", "Configuration from %v was updated", key)
}
for _, ingEx := range ingExes {
lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeNormal, "Updated", "Configuration for %v/%v was updated", ingEx.Ingress.Name, ingEx.Ingress.Namespace)
}
}
}

Expand All @@ -523,17 +561,26 @@ func (lbc *LoadBalancerController) syncIng(key string) {

if !ingExists {
glog.V(2).Infof("Deleting Ingress: %v\n", key)
lbc.cnf.DeleteIngress(name)
err := lbc.cnf.DeleteIngress(name)
if err != nil {
glog.Errorf("Error when deleting configuration for %v: %v", name, err)
}
} else {
glog.V(2).Infof("Adding or Updating Ingress: %v\n", key)

ing := obj.(*extensions.Ingress)
ingEx, err := lbc.createIngress(ing)
if err != nil {
lbc.ingQueue.requeueAfter(key, err, 5*time.Second)
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "Rejected", "%v was rejected: %v", key, err)
return
}
lbc.cnf.AddOrUpdateIngress(name, ingEx)
err = lbc.cnf.AddOrUpdateIngress(name, ingEx)
if err != nil {
lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v was added or updated, but not applied: %v", key, err)
} else {
lbc.recorder.Eventf(ing, api_v1.EventTypeNormal, "AddedOrUpdated", "Configuration for %v was added or updated", key)
}
}
}

Expand Down Expand Up @@ -591,6 +638,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
endps, err := lbc.getEndpointsForIngressBackend(ing.Spec.Backend, ing.Namespace)
if err != nil {
glog.Warningf("Error retrieving endpoints for the service %v: %v", ing.Spec.Backend.ServiceName, err)
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = []string{}
} else {
ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps
}
Expand All @@ -605,6 +653,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin
endps, err := lbc.getEndpointsForIngressBackend(&path.Backend, ing.Namespace)
if err != nil {
glog.Warningf("Error retrieving endpoints for the service %v: %v", path.Backend.ServiceName, err)
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = []string{}
} else {
ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps
}
Expand Down
60 changes: 39 additions & 21 deletions nginx-controller/nginx/configurator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/golang/glog"
"github.com/nginxinc/kubernetes-ingress/nginx-controller/nginx/plus"
Expand Down Expand Up @@ -38,21 +37,22 @@ func (cnf *Configurator) AddOrUpdateDHParam(content string) (string, error) {
}

// AddOrUpdateIngress adds or updates NGINX configuration for an Ingress resource
func (cnf *Configurator) AddOrUpdateIngress(name string, ingEx *IngressEx) {
func (cnf *Configurator) AddOrUpdateIngress(name string, ingEx *IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

cnf.addOrUpdateIngress(name, ingEx)

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error when adding or updating ingress %v: %v", name, err)
}
return nil
}

func (cnf *Configurator) addOrUpdateIngress(name string, ingEx *IngressEx) {
pems := cnf.updateCertificates(ingEx)
nginxCfg := cnf.generateNginxCfg(ingEx, pems)
cnf.nginx.AddOrUpdateIngress(name, nginxCfg)
if err := cnf.nginx.Reload(); err != nil {
glog.Errorf("Error when adding or updating ingress %q: %q", name, err)
} else {
if cnf.isPlus() {
time.Sleep(500 * time.Millisecond)
cnf.updatePlusEndpoints(name, ingEx)
}
}
}

func (cnf *Configurator) updateCertificates(ingEx *IngressEx) map[string]string {
Expand Down Expand Up @@ -439,12 +439,14 @@ func createLocation(path string, upstream Upstream, cfg *Config, websocket bool,
}

func (cnf *Configurator) createUpstream(ingEx *IngressEx, name string, backend *extensions.IngressBackend, namespace string, stickyCookie string) Upstream {
var ups Upstream

if cnf.isPlus() {
return Upstream{Name: name, StickyCookie: stickyCookie}
ups = Upstream{Name: name, StickyCookie: stickyCookie}
} else {
ups = NewUpstreamWithDefaultServer(name)
}

ups := NewUpstreamWithDefaultServer(name)

endps, exists := ingEx.Endpoints[backend.ServiceName+backend.ServicePort.String()]
if exists {
var upsServers []UpstreamServer
Expand Down Expand Up @@ -482,26 +484,32 @@ func upstreamMapToSlice(upstreams map[string]Upstream) []Upstream {
}

// DeleteIngress deletes NGINX configuration for an Ingress resource
func (cnf *Configurator) DeleteIngress(name string) {
func (cnf *Configurator) DeleteIngress(name string) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

cnf.nginx.DeleteIngress(name)
if err := cnf.nginx.Reload(); err != nil {
glog.Errorf("Error when removing ingress %q: %q", name, err)
return fmt.Errorf("Error when removing ingress %v: %v", name, err)
}
return nil
}

// UpdateEndpoints updates endpoints in NGINX configuration for an Ingress resource
func (cnf *Configurator) UpdateEndpoints(name string, ingEx *IngressEx) {
if cnf.isPlus() {
cnf.lock.Lock()
defer cnf.lock.Unlock()
func (cnf *Configurator) UpdateEndpoints(name string, ingEx *IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

if cnf.isPlus() {
cnf.addOrUpdateIngress(name, ingEx)
cnf.updatePlusEndpoints(name, ingEx)
} else {
cnf.AddOrUpdateIngress(name, ingEx)
cnf.addOrUpdateIngress(name, ingEx)
if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error reloading NGINX when updating endpoints for %v: %v", name, err)
}
}
return nil
}

func (cnf *Configurator) updatePlusEndpoints(name string, ingEx *IngressEx) {
Expand Down Expand Up @@ -533,7 +541,7 @@ func (cnf *Configurator) updatePlusEndpoints(name string, ingEx *IngressEx) {
}

// UpdateConfig updates NGINX Configuration parameters
func (cnf *Configurator) UpdateConfig(config *Config) {
func (cnf *Configurator) UpdateConfig(config *Config, ingExes []*IngressEx) error {
cnf.lock.Lock()
defer cnf.lock.Unlock()

Expand All @@ -550,6 +558,16 @@ func (cnf *Configurator) UpdateConfig(config *Config) {
}

cnf.nginx.UpdateMainConfigFile(mainCfg)

for _, ingEx := range ingExes {
cnf.addOrUpdateIngress(ingEx.Ingress.Namespace+"-"+ingEx.Ingress.Name, ingEx)
}

if err := cnf.nginx.Reload(); err != nil {
return fmt.Errorf("Error when updating config from ConfigMap: %v", err)
}

return nil
}

func (cnf *Configurator) isPlus() bool {
Expand Down
6 changes: 3 additions & 3 deletions nginx-controller/nginx/plus/nginx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (client *NginxClient) AddHTTPServer(upstream string, server string) error {
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to add %v server to %v upstream: got expected 200 response, got %v", server, upstream, resp.StatusCode)
return fmt.Errorf("Failed to add %v server to %v upstream: expected 200 response, got %v", server, upstream, resp.StatusCode)
}

return nil
Expand All @@ -153,8 +153,8 @@ func (client *NginxClient) DeleteHTTPServer(upstream string, server string) erro
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to add %v server to %v upstream: got expected 200 response, got %v", server, upstream, resp.StatusCode)
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
return fmt.Errorf("Failed to add %v server to %v upstream: expected 200 or 204 response, got %v", server, upstream, resp.StatusCode)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion nginx-controller/nginx/templates/nginx-plus.ingress.tmpl
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{{range $upstream := .Upstreams}}
upstream {{$upstream.Name}} {
zone {{$upstream.Name}} 256k;
{{range $server := $upstream.UpstreamServers}}
server {{$server.Address}}:{{$server.Port}};{{end}}
{{if $upstream.StickyCookie}}
sticky cookie {{$upstream.StickyCookie}};
{{end}}
state /var/lib/nginx/state/{{$upstream.Name}}.state;
}{{end}}

{{range $server := .Servers}}
Expand Down
Loading