Skip to content

Commit

Permalink
Add support for dynamic upstreams
Browse files Browse the repository at this point in the history
Previously, NKG used the cluster IP of the Service to route traffic to the
backend Services specified by HTTPRoutes. With this commit, NKG will use the
endpoints of the Pods corresponding to a Service as the upstream servers for a
backend Service.

This change adds the following components:
* EndpointSlice controller for caching and listing EndpointSlices.
* Relationship.Capturer for tracking and reporting on relationships between
  Gateway API resources and non-Gateway API resources (e.g. Services).
* ServiceResolver replaces the ServiceStore and resolves Service:Port to a list
* of endpoints.

This commit also adds upstreams to the nginx config generator. One upstream is
generated for each unique and valid Service:Port BackendRef. If a BackendRef
cannot be resolved, a 502 is returned.

Known Limitations:
* Traffic cannot be routed to Headless Services that do not have a defined port.
* If a user manually creates and EndpointSlice, they will need to populate the
  "kubernetes.io/service-name" label and set the ready condition of the
   endpoints to true. Otherwise, NKG will fail to resolve the Service endpoints.
  • Loading branch information
kate-osborn committed Sep 30, 2022
1 parent b946e9c commit 231937f
Show file tree
Hide file tree
Showing 53 changed files with 4,783 additions and 1,445 deletions.
7 changes: 7 additions & 0 deletions deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ rules:
verbs:
- list
- watch
- apiGroups:
- discovery.k8s.io
resources:
- endpointslices
verbs:
- list
- watch
- apiGroups:
- gateway.networking.k8s.io
resources:
Expand Down
7 changes: 6 additions & 1 deletion docs/gateway-api-compatibility.md.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,16 @@ Fields:
* `headers` - partially supported. Only `Exact` type.
* `queryParams` - partially supported. Only `Exact` type.
* `method` - supported.
<<<<<<< HEAD
* `filters`
* `type` - supported.
* `requestRedirect` - supported except for the experimental `path` field. If multiple filters with `requestRedirect` are configured, NGINX Kubernetes Gateway will choose the first one and ignore the rest.
* `requestHeaderModifier`, `requestMirror`, `urlRewrite`, `extensionRef` - not supported.
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported. NGINX Kubernetes Gateway will use the IP of the Service as a backend, not the IPs of the corresponding Pods. Watching for Service updates is not supported.
=======
* `filters` - not supported.
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported.
>>>>>>> 1697a08... Add support for dynamic upstreams
* `status`
* `parents`
* `parentRef` - supported.
Expand Down Expand Up @@ -120,4 +125,4 @@ Fields:
Custom policies will be NGINX Kubernetes Gateway-specific CRDs that will allow supporting features like timeouts, load-balancing methods, authentication, etc. - important data-plane features that are not part of the Gateway API spec.

While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).
While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).
32 changes: 11 additions & 21 deletions internal/events/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/go-logr/logr"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
Expand All @@ -28,8 +29,6 @@ type EventHandler interface {
type EventHandlerConfig struct {
// Processor is the state ChangeProcessor.
Processor state.ChangeProcessor
// ServiceStore is the state ServiceStore.
ServiceStore state.ServiceStore
// SecretStore is the state SecretStore.
SecretStore state.SecretStore
// SecretMemoryManager is the state SecretMemoryManager.
Expand Down Expand Up @@ -73,7 +72,7 @@ func (h *EventHandlerImpl) HandleEventBatch(ctx context.Context, batch EventBatc
}
}

changed, conf, statuses := h.cfg.Processor.Process()
changed, conf, statuses := h.cfg.Processor.Process(ctx)
if !changed {
h.cfg.Logger.Info("Handling events didn't result into NGINX configuration changes")
return
Expand All @@ -98,27 +97,16 @@ func (h *EventHandlerImpl) updateNginx(ctx context.Context, conf state.Configura
return err
}

cfg, warnings := h.cfg.Generator.Generate(conf)
cfg := h.cfg.Generator.Generate(conf)

// For now, we keep all http servers in one config
// For now, we keep all http servers and upstreams in one config file.
// We might rethink that. For example, we can write each server to its file
// or group servers in some way.
err = h.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg)
err = h.cfg.NginxFileMgr.WriteHTTPConfig("http", cfg)
if err != nil {
return err
}

for obj, objWarnings := range warnings {
for _, w := range objWarnings {
// FIXME(pleshakov): report warnings via Object status
h.cfg.Logger.Info("Got warning while generating config",
"kind", obj.GetObjectKind().GroupVersionKind().Kind,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
"warning", w)
}
}

return h.cfg.NginxRuntimeMgr.Reload(ctx)
}

Expand All @@ -131,11 +119,12 @@ func (h *EventHandlerImpl) propagateUpsert(e *UpsertEvent) {
case *v1beta1.HTTPRoute:
h.cfg.Processor.CaptureUpsertChange(r)
case *apiv1.Service:
// FIXME(pleshakov): make sure the affected hosts are updated
h.cfg.ServiceStore.Upsert(r)
h.cfg.Processor.CaptureUpsertChange(r)
case *apiv1.Secret:
// FIXME(kate-osborn): need to handle certificate rotation
h.cfg.SecretStore.Upsert(r)
case *discoveryV1.EndpointSlice:
h.cfg.Processor.CaptureUpsertChange(r)
default:
panic(fmt.Errorf("unknown resource type %T", e.Resource))
}
Expand All @@ -150,11 +139,12 @@ func (h *EventHandlerImpl) propagateDelete(e *DeleteEvent) {
case *v1beta1.HTTPRoute:
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
case *apiv1.Service:
// FIXME(pleshakov): make sure the affected hosts are updated
h.cfg.ServiceStore.Delete(e.NamespacedName)
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
case *apiv1.Secret:
// FIXME(kate-osborn): make sure that affected servers are updated
h.cfg.SecretStore.Delete(e.NamespacedName)
case *discoveryV1.EndpointSlice:
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
default:
panic(fmt.Errorf("unknown resource type %T", e.Type))
}
Expand Down
125 changes: 45 additions & 80 deletions internal/events/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apiv1 "k8s.io/api/core/v1"
discoveryV1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -15,7 +16,6 @@ import (
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-kubernetes-gateway/internal/events"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config/configfakes"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file/filefakes"
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime/runtimefakes"
Expand All @@ -40,11 +40,10 @@ var _ = Describe("EventHandler", func() {
var (
handler *events.EventHandlerImpl
fakeProcessor *statefakes.FakeChangeProcessor
fakeServiceStore *statefakes.FakeServiceStore
fakeSecretStore *statefakes.FakeSecretStore
fakeSecretMemoryManager *statefakes.FakeSecretDiskMemoryManager
fakeGenerator *configfakes.FakeGenerator
fakeNginxFimeMgr *filefakes.FakeManager
fakeNginxFileMgr *filefakes.FakeManager
fakeNginxRuntimeMgr *runtimefakes.FakeManager
fakeStatusUpdater *statusfakes.FakeUpdater
)
Expand All @@ -55,9 +54,9 @@ var _ = Describe("EventHandler", func() {
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(1))
Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(expectedConf))

Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(1))
name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0)
Expect(name).Should(Equal("http-servers"))
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(1))
name, cfg := fakeNginxFileMgr.WriteHTTPConfigArgsForCall(0)
Expect(name).Should(Equal("http"))
Expect(cfg).Should(Equal(expectedCfg))

Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(1))
Expand All @@ -69,22 +68,20 @@ var _ = Describe("EventHandler", func() {

BeforeEach(func() {
fakeProcessor = &statefakes.FakeChangeProcessor{}
fakeServiceStore = &statefakes.FakeServiceStore{}
fakeSecretMemoryManager = &statefakes.FakeSecretDiskMemoryManager{}
fakeSecretStore = &statefakes.FakeSecretStore{}
fakeGenerator = &configfakes.FakeGenerator{}
fakeNginxFimeMgr = &filefakes.FakeManager{}
fakeNginxFileMgr = &filefakes.FakeManager{}
fakeNginxRuntimeMgr = &runtimefakes.FakeManager{}
fakeStatusUpdater = &statusfakes.FakeUpdater{}

handler = events.NewEventHandlerImpl(events.EventHandlerConfig{
Processor: fakeProcessor,
ServiceStore: fakeServiceStore,
SecretStore: fakeSecretStore,
SecretMemoryManager: fakeSecretMemoryManager,
Generator: fakeGenerator,
Logger: zap.New(),
NginxFileMgr: fakeNginxFimeMgr,
NginxFileMgr: fakeNginxFileMgr,
NginxRuntimeMgr: fakeNginxRuntimeMgr,
StatusUpdater: fakeStatusUpdater,
})
Expand All @@ -99,7 +96,7 @@ var _ = Describe("EventHandler", func() {
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)

fakeCfg := []byte("fake")
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
fakeGenerator.GenerateReturns(fakeCfg)

batch := []interface{}{e}

Expand All @@ -125,85 +122,58 @@ var _ = Describe("EventHandler", func() {
Entry("HTTPRoute upsert", &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}),
Entry("Gateway upsert", &events.UpsertEvent{Resource: &v1beta1.Gateway{}}),
Entry("GatewayClass upsert", &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}),
Entry("Service upsert", &events.UpsertEvent{Resource: &apiv1.Service{}}),
Entry("EndpointSlice upsert", &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}}),

Entry("HTTPRoute delete", &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}),
Entry("Gateway delete", &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}),
Entry("GatewayClass delete", &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}),
Entry("Service delete", &events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "service"}}),
Entry("EndpointSlice deleted", &events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}}),
)
})

Describe("Process Kubernetes resources events", func() {
Describe("Process Secret events", func() {
expectNoReconfig := func() {
Expect(fakeProcessor.ProcessCallCount()).Should(Equal(1))
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(0))
Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(0))
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(0))
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(0))
Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(0))
}
It("should process upsert event", func() {
secret := &apiv1.Secret{}

Describe("Process Service events", func() {
It("should process upsert event", func() {
svc := &apiv1.Service{}

batch := []interface{}{&events.UpsertEvent{
Resource: svc,
}}

handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))

expectNoReconfig()
})

It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "service"}

batch := []interface{}{&events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Service{},
}}

handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(nsname))

expectNoReconfig()
})
})

Describe("Process Secret events", func() {
It("should process upsert event", func() {
secret := &apiv1.Secret{}

batch := []interface{}{&events.UpsertEvent{
batch := []interface{}{
&events.UpsertEvent{
Resource: secret,
}}
},
}

handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))

expectNoReconfig()
})
expectNoReconfig()
})

It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}
It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}

batch := []interface{}{&events.DeleteEvent{
batch := []interface{}{
&events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Secret{},
}}
},
}

handler.HandleEventBatch(context.TODO(), batch)
handler.HandleEventBatch(context.TODO(), batch)

Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))
Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))

expectNoReconfig()
})
expectNoReconfig()
})
})

Expand All @@ -218,13 +188,15 @@ var _ = Describe("EventHandler", func() {
&events.UpsertEvent{Resource: &v1beta1.Gateway{}},
&events.UpsertEvent{Resource: &v1beta1.GatewayClass{}},
&events.UpsertEvent{Resource: svc},
&events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}},
&events.UpsertEvent{Resource: secret},
}
deletes := []interface{}{
&events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}},
&events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}},
&events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}},
&events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: svcNsName},
&events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}},
&events.DeleteEvent{Type: &apiv1.Secret{}, NamespacedName: secretNsName},
}

Expand All @@ -238,34 +210,27 @@ var _ = Describe("EventHandler", func() {
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)

fakeCfg := []byte("fake")
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
fakeGenerator.GenerateReturns(fakeCfg)

handler.HandleEventBatch(context.TODO(), batch)

// Check that the events for Gateway API resources were captured

// 3, not 5, because the last 2 do not result into CaptureUpsertChange() call
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(3))
for i := 0; i < 3; i++ {
// 5, not 6, because secret upsert events do not result into CaptureUpsertChange() call
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(5))
for i := 0; i < 5; i++ {
Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(i)).Should(Equal(upserts[i].(*events.UpsertEvent).Resource))
}
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(3))

// 3, not 5, because the last 2 do not result into CaptureDeleteChange() call
for i := 0; i < 3; i++ {
// 5, not 6, because secret delete events do not result into CaptureDeleteChange() call
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(5))
for i := 0; i < 5; i++ {
d := deletes[i].(*events.DeleteEvent)
passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(i)
Expect(passedObj).Should(Equal(d.Type))
Expect(passedNsName).Should(Equal(d.NamespacedName))
}

// Check Service-related expectations
Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))

Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(svcNsName))

// Check Secret-related expectations
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
Expand Down
5 changes: 5 additions & 0 deletions internal/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func GetQueryParamMatchTypePointer(t v1beta1.QueryParamMatchType) *v1beta1.Query
func GetTLSModePointer(t v1beta1.TLSModeType) *v1beta1.TLSModeType {
return &t
}

// GetBoolPointer takes a bool and returns a pointer to it. Useful in unit tests when initializing structs.
func GetBoolPointer(b bool) *bool {
return &b
}
Loading

0 comments on commit 231937f

Please sign in to comment.