Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventPublisher subscriptions for Consul Native API Gateway #15757

Merged
merged 11 commits into from
Dec 20, 2022
146 changes: 145 additions & 1 deletion agent/consul/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ LOOP:
}
}

// since we only modified each entry once, we should have exactly 200 reconcliation calls
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
Expand Down Expand Up @@ -271,3 +271,147 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
controller.WithQueueFactory(RunWorkQueue)
})
}

func TestConfigEntrySubscriptions(t *testing.T) {
t.Parallel()

cases := map[string]struct {
configEntry func(string) structs.ConfigEntry
topic stream.Topic
kind string
}{
"Subscribe to Service Resolver Config Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: name,
}
},
topic: state.EventTopicServiceResolver,
kind: structs.ServiceResolver,
},
"Subscribe to Ingress Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}
},
topic: state.EventTopicIngressGateway,
kind: structs.IngressGateway,
},
"Subscribe to Service Intentions Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: name,
}
},
topic: state.EventTopicServiceIntentions,
kind: structs.ServiceIntentions,
},
"Subscribe to API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: name,
}
},
topic: state.EventTopicAPIGateway,
kind: structs.APIGateway,
},
"Subscribe to Inline Certificate Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.InlineCertificateConfigEntry{
Kind: structs.InlineCertificate,
Name: name,
}
},
topic: state.EventTopicInlineCertificate,
kind: structs.InlineCertificate,
},
"Subscribe to HTTP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.HTTPRouteConfigEntry{
Kind: structs.HTTPRoute,
Name: name,
}
},
topic: state.EventTopicHTTPRoute,
kind: structs.HTTPRoute,
},
"Subscribe to TCP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: name,
}
},
topic: state.EventTopicTCPRoute,
kind: structs.TCPRoute,
},
"Subscribe to Bound API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: name,
}
},
topic: state.EventTopicBoundAPIGateway,
kind: structs.BoundAPIGateway,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

reconciler := newTestReconciler(false)

publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)

// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()

for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
}

go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: tc.topic,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)

received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, tc.kind, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}

// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
})
}
}
35 changes: 35 additions & 0 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,39 @@ func (c *FSM) registerStreamSnapshotHandlers() {
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().APIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().InlineCertificateSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().HTTPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().TCPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().BoundAPIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
}
35 changes: 35 additions & 0 deletions agent/consul/state/config_entry_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.IngressGateway: EventTopicIngressGateway,
structs.ServiceIntentions: EventTopicServiceIntentions,
structs.ServiceDefaults: EventTopicServiceDefaults,
structs.APIGateway: EventTopicAPIGateway,
structs.TCPRoute: EventTopicTCPRoute,
structs.HTTPRoute: EventTopicHTTPRoute,
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
}

// EventSubjectConfigEntry is a stream.Subject used to route and receive events
Expand Down Expand Up @@ -117,6 +122,36 @@ func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream.
return s.configEntrySnapshot(structs.ServiceDefaults, req, buf)
}

// APIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// api-gateway config entries.
func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.APIGateway, req, buf)
}

// TCPRouteSnapshot is a stream.SnapshotFunc that returns a snapshot of
// tcp-route config entries.
func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.TCPRoute, req, buf)
}

// HTTPRouteSnapshot is a stream.SnapshotFunc that retuns a snapshot of
// http-route config entries.
func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.HTTPRoute, req, buf)
}

// InlineCertificateSnapshot is a stream.SnapshotFunc that returns a snapshot of
// inline-certificate config entries.
func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.InlineCertificate, req, buf)
}

// BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// bound-api-gateways config entries.
t-eckert marked this conversation as resolved.
Show resolved Hide resolved
func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf)
}

func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64
Expand Down
Loading