Skip to content

Commit 978cfba

Browse files
committed
Add fan-out support for multiple NGINX+ Edge Load-Balancers
Settings using Informer instead of Watch
1 parent 6983e3f commit 978cfba

File tree

10 files changed

+341
-82
lines changed

10 files changed

+341
-82
lines changed

cmd/nginx-k8s-edge-controller/main.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ package main
77
import (
88
"context"
99
"fmt"
10+
"github.com/nginxinc/kubernetes-nginx-ingress/internal/config"
1011
"github.com/nginxinc/kubernetes-nginx-ingress/internal/observation"
1112
"github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization"
1213
"github.com/sirupsen/logrus"
14+
"k8s.io/client-go/kubernetes"
15+
"k8s.io/client-go/rest"
1316
)
1417

1518
func main() {
@@ -23,7 +26,22 @@ func run() error {
2326
ctx := context.Background()
2427
var err error
2528

26-
synchronizer, err := synchronization.NewSynchronizer()
29+
k8sClient, err := buildKubernetesClient()
30+
if err != nil {
31+
return fmt.Errorf(`error building a Kubernetes client: %w`, err)
32+
}
33+
34+
settings, err := config.NewSettings(ctx, k8sClient)
35+
if err != nil {
36+
return fmt.Errorf(`error occurred creating settings: %w`, err)
37+
}
38+
39+
err = settings.Initialize()
40+
if err != nil {
41+
return fmt.Errorf(`error occurred initializing settings: %w`, err)
42+
}
43+
44+
synchronizer, err := synchronization.NewSynchronizer(settings)
2745
if err != nil {
2846
return fmt.Errorf(`error initializing synchronizer: %w`, err)
2947
}
@@ -36,7 +54,7 @@ func run() error {
3654
handler := observation.NewHandler(synchronizer)
3755
handler.Initialize()
3856

39-
watcher, err := observation.NewWatcher(ctx, handler)
57+
watcher, err := observation.NewWatcher(ctx, handler, k8sClient)
4058
if err != nil {
4159
return fmt.Errorf(`error occurred creating a watcher: %w`, err)
4260
}
@@ -46,6 +64,7 @@ func run() error {
4664
return fmt.Errorf(`error occurred initializing the watcher: %w`, err)
4765
}
4866

67+
go settings.Run()
4968
go handler.Run(ctx.Done())
5069
go synchronizer.Run(ctx.Done())
5170

@@ -57,3 +76,20 @@ func run() error {
5776
<-ctx.Done()
5877
return nil
5978
}
79+
80+
func buildKubernetesClient() (*kubernetes.Clientset, error) {
81+
logrus.Debug("Watcher::buildKubernetesClient")
82+
k8sConfig, err := rest.InClusterConfig()
83+
if err == rest.ErrNotInCluster {
84+
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
85+
} else if err != nil {
86+
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
87+
}
88+
89+
client, err := kubernetes.NewForConfig(k8sConfig)
90+
if err != nil {
91+
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
92+
}
93+
94+
return client, nil
95+
}

deployment/nkl-configmap.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apiVersion: v1
2+
kind: ConfigMap
3+
data:
4+
nginx-hosts:
5+
"http://10.1.1.4:9000/api,http://10.1.1.5:9000/api"
6+
metadata:
7+
name: nkl-config
8+
namespace: nkl

deployment/nkl-deployment.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@ kind: Deployment
33
metadata:
44
name: nkl-deployment
55
labels:
6-
app: nec
6+
app: nkl
77
spec:
88
replicas: 1
99
selector:
1010
matchLabels:
11-
app: nec
11+
app: nkl
1212
template:
1313
metadata:
1414
labels:
15-
app: nec
15+
app: nkl
1616
spec:
1717
containers:
1818
- name: nginx-k8s-edge-controller
1919
env:
2020
- name: NGINX_PLUS_HOST
21-
value: "http://192.168.1.109:9000/api"
21+
value: "http://10.1.1.4:9000/api"
2222
image: ciroque/nginx-k8s-edge-controller:latest
2323
imagePullPolicy: Always
2424
serviceAccountName: nginx-k8s-edge-controller

deployment/nkl-namespace.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
apiVersion: v1
2+
kind: Namespace
3+
metadata:
4+
name: nkl
5+
labels:
6+
name: nkl

internal/config/settings.go

Lines changed: 118 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,131 @@
55
package config
66

77
import (
8-
"errors"
9-
"os"
8+
"context"
9+
"fmt"
10+
"github.com/sirupsen/logrus"
11+
corev1 "k8s.io/api/core/v1"
12+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
"k8s.io/client-go/informers"
14+
"k8s.io/client-go/kubernetes"
15+
"k8s.io/client-go/tools/cache"
16+
"strings"
17+
)
18+
19+
const (
20+
ConfigMapsNamespace = "nkl"
21+
ResyncPeriod = 0
1022
)
1123

1224
type Settings struct {
13-
NginxPlusHost string
25+
ctx context.Context
26+
NginxPlusHosts []string
27+
k8sClient *kubernetes.Clientset
28+
informer cache.SharedInformer
29+
eventHandlerRegistration cache.ResourceEventHandlerRegistration
1430
}
1531

16-
func NewSettings() (*Settings, error) {
32+
func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) {
1733
config := new(Settings)
1834

19-
config.NginxPlusHost = os.Getenv("NGINX_PLUS_HOST")
20-
if config.NginxPlusHost == "" {
21-
return nil, errors.New("the NGINX_PLUS_HOST variable is not defined. This is required")
22-
}
35+
config.k8sClient = k8sClient
36+
config.ctx = ctx
2337

2438
return config, nil
2539
}
40+
41+
func (s *Settings) Initialize() error {
42+
logrus.Info("Settings::Initialize")
43+
44+
var err error
45+
46+
informer, err := s.buildInformer()
47+
if err != nil {
48+
return fmt.Errorf(`error occurred building ConfigMap informer: %w`, err)
49+
}
50+
51+
s.informer = informer
52+
53+
err = s.initializeEventListeners()
54+
if err != nil {
55+
return fmt.Errorf(`error occurred initializing event listeners: %w`, err)
56+
}
57+
58+
return nil
59+
}
60+
61+
func (s *Settings) Run() {
62+
logrus.Debug("Settings::Run")
63+
64+
defer utilruntime.HandleCrash()
65+
66+
go s.informer.Run(s.ctx.Done())
67+
68+
<-s.ctx.Done()
69+
}
70+
71+
func (s *Settings) buildInformer() (cache.SharedInformer, error) {
72+
options := informers.WithNamespace(ConfigMapsNamespace)
73+
factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options)
74+
informer := factory.Core().V1().ConfigMaps().Informer()
75+
76+
return informer, nil
77+
}
78+
79+
func (s *Settings) initializeEventListeners() error {
80+
logrus.Debug("Settings::initializeEventListeners")
81+
82+
var err error
83+
84+
handlers := cache.ResourceEventHandlerFuncs{
85+
AddFunc: s.handleAddEvent,
86+
UpdateFunc: s.handleUpdateEvent,
87+
DeleteFunc: s.handleDeleteEvent,
88+
}
89+
90+
s.eventHandlerRegistration, err = s.informer.AddEventHandler(handlers)
91+
if err != nil {
92+
return fmt.Errorf(`error occurred registering event handlers: %w`, err)
93+
}
94+
95+
return nil
96+
}
97+
98+
func (s *Settings) handleAddEvent(obj interface{}) {
99+
logrus.Debug("Settings::handleAddEvent")
100+
101+
s.handleUpdateEvent(obj, nil)
102+
}
103+
104+
func (s *Settings) handleDeleteEvent(_ interface{}) {
105+
logrus.Debug("Settings::handleDeleteEvent")
106+
107+
s.updateHosts([]string{})
108+
}
109+
110+
func (s *Settings) handleUpdateEvent(obj interface{}, _ interface{}) {
111+
logrus.Debug("Settings::handleUpdateEvent")
112+
113+
configMap, ok := obj.(*corev1.ConfigMap)
114+
if !ok {
115+
logrus.Errorf("Settings::handleUpdateEvent: could not convert obj to ConfigMap")
116+
return
117+
}
118+
119+
hosts, found := configMap.Data["nginx-hosts"]
120+
if !found {
121+
logrus.Errorf("Settings::handleUpdateEvent: nginx-hosts key not found in ConfigMap")
122+
return
123+
}
124+
125+
newHosts := s.parseHosts(hosts)
126+
s.updateHosts(newHosts)
127+
}
128+
129+
func (s *Settings) parseHosts(hosts string) []string {
130+
return strings.Split(hosts, ",")
131+
}
132+
133+
func (s *Settings) updateHosts(hosts []string) {
134+
s.NginxPlusHosts = hosts
135+
}

internal/core/events.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ type Event struct {
2121
}
2222

2323
type ServerUpdateEvent struct {
24+
Id string
25+
NginxHost string
2426
Type EventType
2527
UpstreamName string
2628
Servers []nginxClient.StreamUpstreamServer
@@ -45,6 +47,16 @@ func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []ng
4547
}
4648
}
4749

50+
func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent {
51+
return &ServerUpdateEvent{
52+
Id: id,
53+
NginxHost: nginxHost,
54+
Type: event.Type,
55+
UpstreamName: event.UpstreamName,
56+
Servers: event.Servers,
57+
}
58+
}
59+
4860
func (e *ServerUpdateEvent) TypeName() string {
4961
switch e.Type {
5062
case Created:

internal/observation/watcher.go

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1515
"k8s.io/client-go/informers"
1616
"k8s.io/client-go/kubernetes"
17-
"k8s.io/client-go/rest"
1817
"k8s.io/client-go/tools/cache"
1918
"time"
2019
)
@@ -30,9 +29,10 @@ type Watcher struct {
3029
informer cache.SharedIndexInformer
3130
}
3231

33-
func NewWatcher(ctx context.Context, handler *Handler) (*Watcher, error) {
32+
func NewWatcher(ctx context.Context, handler *Handler, k8sClient *kubernetes.Clientset) (*Watcher, error) {
3433
return &Watcher{
3534
ctx: ctx,
35+
client: k8sClient,
3636
handler: handler,
3737
}, nil
3838
}
@@ -41,11 +41,6 @@ func (w *Watcher) Initialize() error {
4141
logrus.Debug("Watcher::Initialize")
4242
var err error
4343

44-
w.client, err = w.buildKubernetesClient()
45-
if err != nil {
46-
return fmt.Errorf(`initalization error: %w`, err)
47-
}
48-
4944
w.informer, err = w.buildInformer()
5045
if err != nil {
5146
return fmt.Errorf(`initialization error: %w`, err)
@@ -129,23 +124,6 @@ func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) {
129124
return informer, nil
130125
}
131126

132-
func (w *Watcher) buildKubernetesClient() (*kubernetes.Clientset, error) {
133-
logrus.Debug("Watcher::buildKubernetesClient")
134-
k8sConfig, err := rest.InClusterConfig()
135-
if err == rest.ErrNotInCluster {
136-
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
137-
} else if err != nil {
138-
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
139-
}
140-
141-
client, err := kubernetes.NewForConfig(k8sConfig)
142-
if err != nil {
143-
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
144-
}
145-
146-
return client, nil
147-
}
148-
149127
func (w *Watcher) initializeEventListeners() error {
150128
logrus.Debug("Watcher::initializeEventListeners")
151129
var err error
@@ -186,7 +164,8 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) {
186164
}
187165
}
188166

189-
logrus.Infof("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())
167+
logrus.Debugf("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())
168+
190169
return nodeIps, nil
191170
}
192171

internal/synchronization/rand.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2023 f5 Inc. All rights reserved.
2+
// Use of this source code is governed by the Apache
3+
// license that can be found in the LICENSE file.
4+
5+
package synchronization
6+
7+
import (
8+
"math/rand"
9+
"time"
10+
)
11+
12+
var charset = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
13+
var number = []byte("0123456789")
14+
var alphaNumeric = append(charset, number...)
15+
16+
// RandomString where n is the length of random string we want to generate
17+
func RandomString(n int) string {
18+
b := make([]byte, n)
19+
for i := range b {
20+
// randomly select 1 character from given charset
21+
b[i] = alphaNumeric[rand.Intn(len(alphaNumeric))]
22+
}
23+
return string(b)
24+
}
25+
26+
func RandomMilliseconds(min, max int) time.Duration {
27+
randomizer := rand.New(rand.NewSource(time.Now().UnixNano()))
28+
random := randomizer.Intn(max-min) + min
29+
30+
return time.Millisecond * time.Duration(random)
31+
}

0 commit comments

Comments
 (0)