Skip to content

Commit 8e2e2d8

Browse files
authored
Use state file for updating N+ upstreams (#2897)
Problem: When splitting the data and control plane, we want to be able to maintain the ability to dynamically update upstreams without a reload if using NGINX Plus. With the current process, we write the upstreams into the nginx conf but don't reload, then call the API to actually do the update. Writing into the conf will trigger a reload from the agent once we split, however. There were also two bugs: - if metrics were disabled, the nginx plus client was not initialized, preventing API calls from occuring and instead a reload occurred - stream upstreams were not updated with the API Solution: Don't write the upstream servers into the nginx conf anymore when using NGINX Plus. Instead, utilize the `state` file option that the NGINX Plus API will populate with the upstream servers. This way we can just call the API and don't unintentionally reload by writing servers into the conf. Also added support for updating stream upstreams, and fixed the initialization bug.
1 parent 0cbb726 commit 8e2e2d8

14 files changed

+725
-189
lines changed

internal/mode/static/handler.go

+87-61
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package static
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78
"time"
@@ -182,11 +183,11 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
182183

183184
h.setLatestConfiguration(&cfg)
184185

185-
err = h.updateUpstreamServers(
186-
ctx,
187-
logger,
188-
cfg,
189-
)
186+
if h.cfg.plus {
187+
err = h.updateUpstreamServers(cfg)
188+
} else {
189+
err = h.updateNginxConf(ctx, cfg)
190+
}
190191
case state.ClusterStateChange:
191192
h.version++
192193
cfg := dataplane.BuildConfiguration(ctx, gr, h.cfg.serviceResolver, h.version)
@@ -198,10 +199,7 @@ func (h *eventHandlerImpl) HandleEventBatch(ctx context.Context, logger logr.Log
198199

199200
h.setLatestConfiguration(&cfg)
200201

201-
err = h.updateNginxConf(
202-
ctx,
203-
cfg,
204-
)
202+
err = h.updateNginxConf(ctx, cfg)
205203
}
206204

207205
var nginxReloadRes status.NginxReloadResult
@@ -306,7 +304,10 @@ func (h *eventHandlerImpl) parseAndCaptureEvent(ctx context.Context, logger logr
306304
}
307305

308306
// updateNginxConf updates nginx conf files and reloads nginx.
309-
func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.Configuration) error {
307+
func (h *eventHandlerImpl) updateNginxConf(
308+
ctx context.Context,
309+
conf dataplane.Configuration,
310+
) error {
310311
files := h.cfg.generator.Generate(conf)
311312
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
312313
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
@@ -316,89 +317,114 @@ func (h *eventHandlerImpl) updateNginxConf(ctx context.Context, conf dataplane.C
316317
return fmt.Errorf("failed to reload NGINX: %w", err)
317318
}
318319

320+
// If using NGINX Plus, update upstream servers using the API.
321+
if err := h.updateUpstreamServers(conf); err != nil {
322+
return fmt.Errorf("failed to update upstream servers: %w", err)
323+
}
324+
319325
return nil
320326
}
321327

322-
// updateUpstreamServers is called only when endpoints have changed. It updates nginx conf files and then:
323-
// - if using NGINX Plus, determines which servers have changed and uses the N+ API to update them;
324-
// - otherwise if not using NGINX Plus, or an error was returned from the API, reloads nginx.
325-
func (h *eventHandlerImpl) updateUpstreamServers(
326-
ctx context.Context,
327-
logger logr.Logger,
328-
conf dataplane.Configuration,
329-
) error {
330-
isPlus := h.cfg.nginxRuntimeMgr.IsPlus()
331-
332-
files := h.cfg.generator.Generate(conf)
333-
if err := h.cfg.nginxFileMgr.ReplaceFiles(files); err != nil {
334-
return fmt.Errorf("failed to replace NGINX configuration files: %w", err)
328+
// updateUpstreamServers determines which servers have changed and uses the NGINX Plus API to update them.
329+
// Only applicable when using NGINX Plus.
330+
func (h *eventHandlerImpl) updateUpstreamServers(conf dataplane.Configuration) error {
331+
if !h.cfg.plus {
332+
return nil
335333
}
336334

337-
reload := func() error {
338-
if err := h.cfg.nginxRuntimeMgr.Reload(ctx, conf.Version); err != nil {
339-
return fmt.Errorf("failed to reload NGINX: %w", err)
340-
}
335+
prevUpstreams, prevStreamUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
336+
if err != nil {
337+
return fmt.Errorf("failed to get upstreams from API: %w", err)
338+
}
341339

342-
return nil
340+
type upstream struct {
341+
name string
342+
servers []ngxclient.UpstreamServer
343343
}
344+
var upstreams []upstream
344345

345-
if isPlus {
346-
type upstream struct {
347-
name string
348-
servers []ngxclient.UpstreamServer
346+
for _, u := range conf.Upstreams {
347+
confUpstream := upstream{
348+
name: u.Name,
349+
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
349350
}
350-
var upstreams []upstream
351351

352-
prevUpstreams, err := h.cfg.nginxRuntimeMgr.GetUpstreams()
353-
if err != nil {
354-
logger.Error(err, "failed to get upstreams from API, reloading configuration instead")
355-
return reload()
352+
if u, ok := prevUpstreams[confUpstream.name]; ok {
353+
if !serversEqual(confUpstream.servers, u.Peers) {
354+
upstreams = append(upstreams, confUpstream)
355+
}
356356
}
357+
}
357358

358-
for _, u := range conf.Upstreams {
359-
confUpstream := upstream{
360-
name: u.Name,
361-
servers: ngxConfig.ConvertEndpoints(u.Endpoints),
362-
}
359+
type streamUpstream struct {
360+
name string
361+
servers []ngxclient.StreamUpstreamServer
362+
}
363+
var streamUpstreams []streamUpstream
363364

364-
if u, ok := prevUpstreams[confUpstream.name]; ok {
365-
if !serversEqual(confUpstream.servers, u.Peers) {
366-
upstreams = append(upstreams, confUpstream)
367-
}
368-
}
365+
for _, u := range conf.StreamUpstreams {
366+
confUpstream := streamUpstream{
367+
name: u.Name,
368+
servers: ngxConfig.ConvertStreamEndpoints(u.Endpoints),
369369
}
370370

371-
var reloadPlus bool
372-
for _, upstream := range upstreams {
373-
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
374-
logger.Error(
375-
err, "couldn't update upstream via the API, reloading configuration instead",
376-
"upstreamName", upstream.name,
377-
)
378-
reloadPlus = true
371+
if u, ok := prevStreamUpstreams[confUpstream.name]; ok {
372+
if !serversEqual(confUpstream.servers, u.Peers) {
373+
streamUpstreams = append(streamUpstreams, confUpstream)
379374
}
380375
}
376+
}
381377

382-
if !reloadPlus {
383-
return nil
378+
var updateErr error
379+
for _, upstream := range upstreams {
380+
if err := h.cfg.nginxRuntimeMgr.UpdateHTTPServers(upstream.name, upstream.servers); err != nil {
381+
updateErr = errors.Join(updateErr, fmt.Errorf(
382+
"couldn't update upstream %q via the API: %w", upstream.name, err))
384383
}
385384
}
386385

387-
return reload()
386+
for _, upstream := range streamUpstreams {
387+
if err := h.cfg.nginxRuntimeMgr.UpdateStreamServers(upstream.name, upstream.servers); err != nil {
388+
updateErr = errors.Join(updateErr, fmt.Errorf(
389+
"couldn't update stream upstream %q via the API: %w", upstream.name, err))
390+
}
391+
}
392+
393+
return updateErr
388394
}
389395

390-
func serversEqual(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer) bool {
396+
// serversEqual accepts lists of either UpstreamServer/Peer or StreamUpstreamServer/StreamPeer and determines
397+
// if the server names within these lists are equal.
398+
func serversEqual[
399+
upstreamServer ngxclient.UpstreamServer | ngxclient.StreamUpstreamServer,
400+
peer ngxclient.Peer | ngxclient.StreamPeer,
401+
](newServers []upstreamServer, oldServers []peer) bool {
391402
if len(newServers) != len(oldServers) {
392403
return false
393404
}
394405

406+
getServerVal := func(T any) string {
407+
var server string
408+
switch t := T.(type) {
409+
case ngxclient.UpstreamServer:
410+
server = t.Server
411+
case ngxclient.StreamUpstreamServer:
412+
server = t.Server
413+
case ngxclient.Peer:
414+
server = t.Server
415+
case ngxclient.StreamPeer:
416+
server = t.Server
417+
}
418+
return server
419+
}
420+
395421
diff := make(map[string]struct{}, len(newServers))
396422
for _, s := range newServers {
397-
diff[s.Server] = struct{}{}
423+
diff[getServerVal(s)] = struct{}{}
398424
}
399425

400426
for _, s := range oldServers {
401-
if _, ok := diff[s.Server]; !ok {
427+
if _, ok := diff[getServerVal(s)]; !ok {
402428
return false
403429
}
404430
}

internal/mode/static/handler_test.go

+81-41
Original file line numberDiff line numberDiff line change
@@ -423,20 +423,29 @@ var _ = Describe("eventHandler", func() {
423423
},
424424
},
425425
}
426-
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
426+
427+
streamUpstreams := ngxclient.StreamUpstreams{
428+
"two": ngxclient.StreamUpstream{
429+
Peers: []ngxclient.StreamPeer{
430+
{Server: "server2"},
431+
},
432+
},
433+
}
434+
435+
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, streamUpstreams, nil)
427436
})
428437

429438
When("running NGINX Plus", func() {
430439
It("should call the NGINX Plus API", func() {
431-
fakeNginxRuntimeMgr.IsPlusReturns(true)
440+
handler.cfg.plus = true
432441

433442
handler.HandleEventBatch(context.Background(), ctlrZap.New(), batch)
434443

435444
dcfg := dataplane.GetDefaultConfiguration(&graph.Graph{}, 1)
436445
Expect(helpers.Diff(handler.GetLatestConfiguration(), &dcfg)).To(BeEmpty())
437446

438-
Expect(fakeGenerator.GenerateCallCount()).To(Equal(1))
439-
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(1))
447+
Expect(fakeGenerator.GenerateCallCount()).To(Equal(0))
448+
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(0))
440449
Expect(fakeNginxRuntimeMgr.GetUpstreamsCallCount()).To(Equal(1))
441450
})
442451
})
@@ -463,19 +472,11 @@ var _ = Describe("eventHandler", func() {
463472
Name: "one",
464473
},
465474
},
466-
}
467-
468-
type callCounts struct {
469-
generate int
470-
update int
471-
reload int
472-
}
473-
474-
assertCallCounts := func(cc callCounts) {
475-
Expect(fakeGenerator.GenerateCallCount()).To(Equal(cc.generate))
476-
Expect(fakeNginxFileMgr.ReplaceFilesCallCount()).To(Equal(cc.generate))
477-
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(cc.update))
478-
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).To(Equal(cc.reload))
475+
StreamUpstreams: []dataplane.Upstream{
476+
{
477+
Name: "two",
478+
},
479+
},
479480
}
480481

481482
BeforeEach(func() {
@@ -486,47 +487,49 @@ var _ = Describe("eventHandler", func() {
486487
},
487488
},
488489
}
489-
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, nil)
490+
491+
streamUpstreams := ngxclient.StreamUpstreams{
492+
"two": ngxclient.StreamUpstream{
493+
Peers: []ngxclient.StreamPeer{
494+
{Server: "server2"},
495+
},
496+
},
497+
}
498+
499+
fakeNginxRuntimeMgr.GetUpstreamsReturns(upstreams, streamUpstreams, nil)
490500
})
491501

492502
When("running NGINX Plus", func() {
493503
BeforeEach(func() {
494-
fakeNginxRuntimeMgr.IsPlusReturns(true)
504+
handler.cfg.plus = true
495505
})
496506

497507
It("should update servers using the NGINX Plus API", func() {
498-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
499-
500-
assertCallCounts(callCounts{generate: 1, update: 1, reload: 0})
508+
Expect(handler.updateUpstreamServers(conf)).To(Succeed())
509+
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(1))
501510
})
502511

503-
It("should reload when GET API returns an error", func() {
504-
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, errors.New("error"))
505-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
506-
507-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
512+
It("should return error when GET API returns an error", func() {
513+
fakeNginxRuntimeMgr.GetUpstreamsReturns(nil, nil, errors.New("error"))
514+
Expect(handler.updateUpstreamServers(conf)).ToNot(Succeed())
508515
})
509516

510-
It("should reload when POST API returns an error", func() {
517+
It("should return error when UpdateHTTPServers API returns an error", func() {
511518
fakeNginxRuntimeMgr.UpdateHTTPServersReturns(errors.New("error"))
512-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
519+
Expect(handler.updateUpstreamServers(conf)).ToNot(Succeed())
520+
})
513521

514-
assertCallCounts(callCounts{generate: 1, update: 1, reload: 1})
522+
It("should return error when UpdateStreamServers API returns an error", func() {
523+
fakeNginxRuntimeMgr.UpdateStreamServersReturns(errors.New("error"))
524+
Expect(handler.updateUpstreamServers(conf)).ToNot(Succeed())
515525
})
516526
})
517527

518528
When("not running NGINX Plus", func() {
519-
It("should update servers by reloading", func() {
520-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).To(Succeed())
521-
522-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
523-
})
529+
It("should not do anything", func() {
530+
Expect(handler.updateUpstreamServers(conf)).To(Succeed())
524531

525-
It("should return an error when reloading fails", func() {
526-
fakeNginxRuntimeMgr.ReloadReturns(errors.New("error"))
527-
Expect(handler.updateUpstreamServers(context.Background(), ctlrZap.New(), conf)).ToNot(Succeed())
528-
529-
assertCallCounts(callCounts{generate: 1, update: 0, reload: 1})
532+
Expect(fakeNginxRuntimeMgr.UpdateHTTPServersCallCount()).To(Equal(0))
530533
})
531534
})
532535
})
@@ -612,7 +615,7 @@ var _ = Describe("eventHandler", func() {
612615
})
613616

614617
var _ = Describe("serversEqual", func() {
615-
DescribeTable("determines if server lists are equal",
618+
DescribeTable("determines if HTTP server lists are equal",
616619
func(newServers []ngxclient.UpstreamServer, oldServers []ngxclient.Peer, equal bool) {
617620
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
618621
},
@@ -649,6 +652,43 @@ var _ = Describe("serversEqual", func() {
649652
true,
650653
),
651654
)
655+
DescribeTable("determines if stream server lists are equal",
656+
func(newServers []ngxclient.StreamUpstreamServer, oldServers []ngxclient.StreamPeer, equal bool) {
657+
Expect(serversEqual(newServers, oldServers)).To(Equal(equal))
658+
},
659+
Entry("different length",
660+
[]ngxclient.StreamUpstreamServer{
661+
{Server: "server1"},
662+
},
663+
[]ngxclient.StreamPeer{
664+
{Server: "server1"},
665+
{Server: "server2"},
666+
},
667+
false,
668+
),
669+
Entry("differing elements",
670+
[]ngxclient.StreamUpstreamServer{
671+
{Server: "server1"},
672+
{Server: "server2"},
673+
},
674+
[]ngxclient.StreamPeer{
675+
{Server: "server1"},
676+
{Server: "server3"},
677+
},
678+
false,
679+
),
680+
Entry("same elements",
681+
[]ngxclient.StreamUpstreamServer{
682+
{Server: "server1"},
683+
{Server: "server2"},
684+
},
685+
[]ngxclient.StreamPeer{
686+
{Server: "server1"},
687+
{Server: "server2"},
688+
},
689+
true,
690+
),
691+
)
652692
})
653693

654694
var _ = Describe("getGatewayAddresses", func() {

0 commit comments

Comments
 (0)