Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dynamic upstreams #221

Merged
merged 1 commit into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ rules:
verbs:
- list
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- apiGroups:
- gateway.networking.k8s.io
resources:
Expand Down
7 changes: 6 additions & 1 deletion docs/gateway-api-compatibility.md.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,16 @@ Fields:
* `headers` - partially supported. Only `Exact` type.
* `queryParams` - partially supported. Only `Exact` type.
* `method` - supported.
<<<<<<< HEAD
* `filters`
* `type` - supported.
* `requestRedirect` - supported except for the experimental `path` field. If multiple filters with `requestRedirect` are configured, NGINX Kubernetes Gateway will choose the first one and ignore the rest.
* `requestHeaderModifier`, `requestMirror`, `urlRewrite`, `extensionRef` - not supported.
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported. NGINX Kubernetes Gateway will use the IP of the Service as a backend, not the IPs of the corresponding Pods. Watching for Service updates is not supported.
=======
* `filters` - not supported.
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported.
>>>>>>> 1697a08... Add support for dynamic upstreams
* `status`
* `parents`
* `parentRef` - supported.
Expand Down Expand Up @@ -120,4 +125,4 @@ Fields:

Custom policies will be NGINX Kubernetes Gateway-specific CRDs that will allow supporting features like timeouts, load-balancing methods, authentication, etc. - important data-plane features that are not part of the Gateway API spec.

While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).
While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).
32 changes: 11 additions & 21 deletions internal/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-logr/logr"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
Expand All @@ -28,8 +29,6 @@ type EventHandler interface {
type EventHandlerConfig struct {
// Processor is the state ChangeProcessor.
Processor state.ChangeProcessor
// ServiceStore is the state ServiceStore.
ServiceStore state.ServiceStore
// SecretStore is the state SecretStore.
SecretStore state.SecretStore
// SecretMemoryManager is the state SecretMemoryManager.
Expand Down Expand Up @@ -73,7 +72,7 @@ func (h *EventHandlerImpl) HandleEventBatch(ctx context.Context, batch EventBatc
}
}

changed, conf, statuses := h.cfg.Processor.Process()
changed, conf, statuses := h.cfg.Processor.Process(ctx)
if !changed {
h.cfg.Logger.Info("Handling events didn't result into NGINX configuration changes")
return
Expand All @@ -98,27 +97,16 @@ func (h *EventHandlerImpl) updateNginx(ctx context.Context, conf state.Configura
return err
}

cfg, warnings := h.cfg.Generator.Generate(conf)
cfg := h.cfg.Generator.Generate(conf)

// For now, we keep all http servers in one config
// For now, we keep all http servers and upstreams in one config file.
// We might rethink that. For example, we can write each server to its file
// or group servers in some way.
err = h.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg)
err = h.cfg.NginxFileMgr.WriteHTTPConfig("http", cfg)
if err != nil {
return err
}

for obj, objWarnings := range warnings {
for _, w := range objWarnings {
// FIXME(pleshakov): report warnings via Object status
h.cfg.Logger.Info("Got warning while generating config",
"kind", obj.GetObjectKind().GroupVersionKind().Kind,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
"warning", w)
}
}

return h.cfg.NginxRuntimeMgr.Reload(ctx)
}

Expand All @@ -131,11 +119,12 @@ func (h *EventHandlerImpl) propagateUpsert(e *UpsertEvent) {
case *v1beta1.HTTPRoute:
h.cfg.Processor.CaptureUpsertChange(r)
case *apiv1.Service:
// FIXME(pleshakov): make sure the affected hosts are updated
h.cfg.ServiceStore.Upsert(r)
h.cfg.Processor.CaptureUpsertChange(r)
case *apiv1.Secret:
// FIXME(kate-osborn): need to handle certificate rotation
h.cfg.SecretStore.Upsert(r)
case *discoveryV1.EndpointSlice:
h.cfg.Processor.CaptureUpsertChange(r)
default:
panic(fmt.Errorf("unknown resource type %T", e.Resource))
}
Expand All @@ -150,11 +139,12 @@ func (h *EventHandlerImpl) propagateDelete(e *DeleteEvent) {
case *v1beta1.HTTPRoute:
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
case *apiv1.Service:
// FIXME(pleshakov): make sure the affected hosts are updated
h.cfg.ServiceStore.Delete(e.NamespacedName)
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
case *apiv1.Secret:
// FIXME(kate-osborn): make sure that affected servers are updated
h.cfg.SecretStore.Delete(e.NamespacedName)
case *discoveryV1.EndpointSlice:
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
default:
panic(fmt.Errorf("unknown resource type %T", e.Type))
}
Expand Down
125 changes: 45 additions & 80 deletions internal/events/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -15,7 +16,6 @@ import (
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-kubernetes-gateway/internal/events"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config/configfakes"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file/filefakes"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime/runtimefakes"
Expand All @@ -40,11 +40,10 @@ var _ = Describe("EventHandler", func() {
var (
handler *events.EventHandlerImpl
fakeProcessor *statefakes.FakeChangeProcessor
fakeServiceStore *statefakes.FakeServiceStore
fakeSecretStore *statefakes.FakeSecretStore
fakeSecretMemoryManager *statefakes.FakeSecretDiskMemoryManager
fakeGenerator *configfakes.FakeGenerator
fakeNginxFimeMgr *filefakes.FakeManager
fakeNginxFileMgr *filefakes.FakeManager
fakeNginxRuntimeMgr *runtimefakes.FakeManager
fakeStatusUpdater *statusfakes.FakeUpdater
)
Expand All @@ -55,9 +54,9 @@ var _ = Describe("EventHandler", func() {
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(1))
Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(expectedConf))

Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(1))
kate-osborn marked this conversation as resolved.
Show resolved Hide resolved
name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0)
Expect(name).Should(Equal("http-servers"))
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(1))
name, cfg := fakeNginxFileMgr.WriteHTTPConfigArgsForCall(0)
Expect(name).Should(Equal("http"))
Expect(cfg).Should(Equal(expectedCfg))

Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(1))
Expand All @@ -69,22 +68,20 @@ var _ = Describe("EventHandler", func() {

BeforeEach(func() {
fakeProcessor = &statefakes.FakeChangeProcessor{}
fakeServiceStore = &statefakes.FakeServiceStore{}
fakeSecretMemoryManager = &statefakes.FakeSecretDiskMemoryManager{}
fakeSecretStore = &statefakes.FakeSecretStore{}
fakeGenerator = &configfakes.FakeGenerator{}
fakeNginxFimeMgr = &filefakes.FakeManager{}
fakeNginxFileMgr = &filefakes.FakeManager{}
fakeNginxRuntimeMgr = &runtimefakes.FakeManager{}
fakeStatusUpdater = &statusfakes.FakeUpdater{}

handler = events.NewEventHandlerImpl(events.EventHandlerConfig{
Processor: fakeProcessor,
ServiceStore: fakeServiceStore,
SecretStore: fakeSecretStore,
SecretMemoryManager: fakeSecretMemoryManager,
Generator: fakeGenerator,
Logger: zap.New(),
NginxFileMgr: fakeNginxFimeMgr,
NginxFileMgr: fakeNginxFileMgr,
NginxRuntimeMgr: fakeNginxRuntimeMgr,
StatusUpdater: fakeStatusUpdater,
})
Expand All @@ -99,7 +96,7 @@ var _ = Describe("EventHandler", func() {
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)

fakeCfg := []byte("fake")
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
fakeGenerator.GenerateReturns(fakeCfg)

batch := []interface{}{e}

Expand All @@ -125,85 +122,58 @@ var _ = Describe("EventHandler", func() {
Entry("HTTPRoute upsert", &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}),
Entry("Gateway upsert", &events.UpsertEvent{Resource: &v1beta1.Gateway{}}),
Entry("GatewayClass upsert", &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}),
Entry("Service upsert", &events.UpsertEvent{Resource: &apiv1.Service{}}),
Entry("EndpointSlice upsert", &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}}),

Entry("HTTPRoute delete", &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}),
Entry("Gateway delete", &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}),
Entry("GatewayClass delete", &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}),
Entry("Service delete", &events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "service"}}),
Entry("EndpointSlice deleted", &events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}}),
)
})

Describe("Process Kubernetes resources events", func() {
Describe("Process Secret events", func() {
expectNoReconfig := func() {
Expect(fakeProcessor.ProcessCallCount()).Should(Equal(1))
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(0))
Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(0))
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(0))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(0))
Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(0))
}
It("should process upsert event", func() {
secret := &apiv1.Secret{}

Describe("Process Service events", func() {
It("should process upsert event", func() {
svc := &apiv1.Service{}

batch := []interface{}{&events.UpsertEvent{
Resource: svc,
}}

handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))

expectNoReconfig()
})

It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "service"}

batch := []interface{}{&events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Service{},
}}

handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(nsname))

expectNoReconfig()
})
})

Describe("Process Secret events", func() {
It("should process upsert event", func() {
secret := &apiv1.Secret{}

batch := []interface{}{&events.UpsertEvent{
batch := []interface{}{
&events.UpsertEvent{
Resource: secret,
}}
},
}

handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))

expectNoReconfig()
})
expectNoReconfig()
})

It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}
It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}

batch := []interface{}{&events.DeleteEvent{
batch := []interface{}{
&events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Secret{},
}}
},
}

handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))
Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))

expectNoReconfig()
})
expectNoReconfig()
})
})

Expand All @@ -218,13 +188,15 @@ var _ = Describe("EventHandler", func() {
&events.UpsertEvent{Resource: &v1beta1.Gateway{}},
&events.UpsertEvent{Resource: &v1beta1.GatewayClass{}},
&events.UpsertEvent{Resource: svc},
&events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}},
&events.UpsertEvent{Resource: secret},
}
deletes := []interface{}{
&events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}},
&events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}},
&events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}},
&events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: svcNsName},
&events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}},
&events.DeleteEvent{Type: &apiv1.Secret{}, NamespacedName: secretNsName},
}

Expand All @@ -238,34 +210,27 @@ var _ = Describe("EventHandler", func() {
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)

fakeCfg := []byte("fake")
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
fakeGenerator.GenerateReturns(fakeCfg)

handler.HandleEventBatch(context.TODO(), batch)

// Check that the events for Gateway API resources were captured

// 3, not 5, because the last 2 do not result into CaptureUpsertChange() call
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(3))
for i := 0; i < 3; i++ {
// 5, not 6, because secret upsert events do not result into CaptureUpsertChange() call
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(5))
for i := 0; i < 5; i++ {
Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(i)).Should(Equal(upserts[i].(*events.UpsertEvent).Resource))
}
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(3))

// 3, not 5, because the last 2 do not result into CaptureDeleteChange() call
for i := 0; i < 3; i++ {
// 5, not 6, because secret delete events do not result into CaptureDeleteChange() call
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(5))
for i := 0; i < 5; i++ {
d := deletes[i].(*events.DeleteEvent)
passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(i)
Expect(passedObj).Should(Equal(d.Type))
Expect(passedNsName).Should(Equal(d.NamespacedName))
}

// Check Service-related expectations
Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))

Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(svcNsName))

// Check Secret-related expectations
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
Expand Down
5 changes: 5 additions & 0 deletions internal/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func GetQueryParamMatchTypePointer(t v1beta1.QueryParamMatchType) *v1beta1.Query
func GetTLSModePointer(t v1beta1.TLSModeType) *v1beta1.TLSModeType {
return &t
}

// GetBoolPointer takes a bool and returns a pointer to it. Useful in unit tests when initializing structs.
func GetBoolPointer(b bool) *bool {
return &b
}
Loading