From c89f47cd3fdf28198166fd07d353587b417b0bd8 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Thu, 8 Dec 2022 14:41:01 -0500 Subject: [PATCH 01/11] Create new event topics in subscribe proto --- agent/consul/state/memdb.go | 4 ++ proto/pbsubscribe/subscribe.pb.go | 78 +++++++++++++++++++------------ proto/pbsubscribe/subscribe.proto | 14 ++++++ 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 395325ee61f3..ce2f0b0dc8fe 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -186,6 +186,10 @@ var ( EventTopicServiceIntentions = pbsubscribe.Topic_ServiceIntentions EventTopicServiceDefaults = pbsubscribe.Topic_ServiceDefaults EventTopicServiceList = pbsubscribe.Topic_ServiceList + EventTopicAPIGateways = pbsubscribe.Topic_APIGateways + EventTopicTCPRoutes = pbsubscribe.Topic_TCPRoutes + EventTopicHTTPRoutes = pbsubscribe.Topic_HTTPRoutes + EventTopicInlineCertificates = pbsubscribe.Topic_InlineCertificates ) func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index d890846cefd2..dbf8da98890d 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -58,20 +58,32 @@ const ( Topic_ServiceList Topic = 7 // ServiceDefaults topic contains events for changes to service-defaults. Topic_ServiceDefaults Topic = 8 + // APIGateway topic contains events for changes to api-gateways. + Topic_APIGateways Topic = 9 + // TCPRoutes topic contains events for changes to tcp-routes. + Topic_TCPRoutes Topic = 10 + // HTTPRoutes topic contains events for changes to http-routes. + Topic_HTTPRoutes Topic = 11 + // InlineCertificates topic contains events for changes to inline-certificates. + Topic_InlineCertificates Topic = 12 ) // Enum value maps for Topic. var ( Topic_name = map[int32]string{ - 0: "Unknown", - 1: "ServiceHealth", - 2: "ServiceHealthConnect", - 3: "MeshConfig", - 4: "ServiceResolver", - 5: "IngressGateway", - 6: "ServiceIntentions", - 7: "ServiceList", - 8: "ServiceDefaults", + 0: "Unknown", + 1: "ServiceHealth", + 2: "ServiceHealthConnect", + 3: "MeshConfig", + 4: "ServiceResolver", + 5: "IngressGateway", + 6: "ServiceIntentions", + 7: "ServiceList", + 8: "ServiceDefaults", + 9: "APIGateways", + 10: "TCPRoutes", + 11: "HTTPRoutes", + 12: "InlineCertificates", } Topic_value = map[string]int32{ "Unknown": 0, @@ -83,6 +95,10 @@ var ( "ServiceIntentions": 6, "ServiceList": 7, "ServiceDefaults": 8, + "APIGateways": 9, + "TCPRoutes": 10, + "HTTPRoutes": 11, + "InlineCertificates": 12, } ) @@ -961,7 +977,7 @@ var file_proto_pbsubscribe_subscribe_proto_rawDesc = []byte{ 0x45, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xb7, 0x01, 0x0a, 0x05, 0x54, + 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xff, 0x01, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, @@ -973,25 +989,29 @@ var file_proto_pbsubscribe_subscribe_proto_rawDesc = []byte{ 0x63, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x10, 0x06, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x10, 0x07, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, - 0x74, 0x73, 0x10, 0x08, 0x2a, 0x29, 0x0a, 0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, - 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, - 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, - 0x59, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x09, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x92, 0x01, 0x0a, 0x0d, 0x63, - 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, - 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, - 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, - 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x73, 0x10, 0x08, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, + 0x61, 0x79, 0x73, 0x10, 0x09, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x43, 0x50, 0x52, 0x6f, 0x75, 0x74, + 0x65, 0x73, 0x10, 0x0a, 0x12, 0x0e, 0x0a, 0x0a, 0x48, 0x54, 0x54, 0x50, 0x52, 0x6f, 0x75, 0x74, + 0x65, 0x73, 0x10, 0x0b, 0x12, 0x16, 0x0a, 0x12, 0x49, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x43, 0x65, + 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x73, 0x10, 0x0c, 0x2a, 0x29, 0x0a, 0x09, + 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x59, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, 0x65, + 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, + 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, + 0x30, 0x01, 0x42, 0x92, 0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, + 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5c, + 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index c1b3e82ebb20..cbb2b5b3ebaa 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -73,6 +73,20 @@ enum Topic { // ServiceDefaults topic contains events for changes to service-defaults. ServiceDefaults = 8; + + // APIGateway topic contains events for changes to api-gateways. + APIGateways = 9; + + // TCPRoutes topic contains events for changes to tcp-routes. + TCPRoutes = 10; + + // HTTPRoutes topic contains events for changes to http-routes. + HTTPRoutes = 11; + + // InlineCertificates topic contains events for changes to inline-certificates. + InlineCertificates = 12; + + // TODO Do I add BoundAPIGateway here? I think I do... } message NamedSubject { From b3648be30c52012e1f9b36cfdfc1b6616ec051d2 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Thu, 8 Dec 2022 16:58:35 -0500 Subject: [PATCH 02/11] Add tests for PBSubscribe func --- agent/consul/state/events.go | 7 +- agent/consul/state/events_test.go | 148 ++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 agent/consul/state/events_test.go diff --git a/agent/consul/state/events.go b/agent/consul/state/events.go index 93a4558f271d..0d2bf6df1c41 100644 --- a/agent/consul/state/events.go +++ b/agent/consul/state/events.go @@ -9,6 +9,8 @@ import ( "github.com/hashicorp/consul/proto/pbsubscribe" ) +// PBToStreamSubscribeRequest takes a protobuf subscribe request and enterprise +// metadata to properly generate the matching stream subscribe request. func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.EnterpriseMeta) (*stream.SubscribeRequest, error) { var subject stream.Subject @@ -17,7 +19,7 @@ func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.E } else { named := req.GetNamedSubject() - // Support the (deprcated) top-level Key, Partition, Namespace, and PeerName fields. + // Support the (deprecated) top-level Key, Partition, Namespace, and PeerName fields. if named == nil { named = &pbsubscribe.NamedSubject{ Key: req.Key, // nolint:staticcheck // SA1019 intentional use of deprecated field @@ -38,7 +40,8 @@ func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.E EnterpriseMeta: entMeta, PeerName: named.PeerName, } - case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway, EventTopicServiceIntentions, EventTopicServiceDefaults: + case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway, + EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateways: subject = EventSubjectConfigEntry{ Name: named.Key, EnterpriseMeta: &entMeta, diff --git a/agent/consul/state/events_test.go b/agent/consul/state/events_test.go new file mode 100644 index 000000000000..60d71800f126 --- /dev/null +++ b/agent/consul/state/events_test.go @@ -0,0 +1,148 @@ +package state + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/stretchr/testify/require" +) + +const aclToken = "67b04fbc-e35f-494a-ad43-739f1c8b839c" + +func TestPBToStreamSubscribeRequest(t *testing.T) { + cases := map[string]struct { + req *pbsubscribe.SubscribeRequest + entMeta acl.EnterpriseMeta + expectedSubscribeRequest *stream.SubscribeRequest + err error + }{ + "Wildcard subject": { + req: &pbsubscribe.SubscribeRequest{ + Topic: EventTopicServiceList, + Subject: &pbsubscribe.SubscribeRequest_WildcardSubject{WildcardSubject: true}, + Token: aclToken, + Index: 1, + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: &stream.SubscribeRequest{ + Topic: EventTopicServiceList, + Subject: stream.SubjectWildcard, + Token: aclToken, + Index: 1, + }, + err: nil, + }, + "Deprecated top level fields": { + req: &pbsubscribe.SubscribeRequest{ + Topic: EventTopicServiceDefaults, + Key: "key", + Partition: "partition", + Namespace: "consul", + PeerName: "peer", + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: &stream.SubscribeRequest{ + Topic: EventTopicServiceDefaults, + Subject: EventSubjectConfigEntry{ + Name: "key", + EnterpriseMeta: &acl.EnterpriseMeta{}, + }, + }, + err: nil, + }, + "Service health": { + req: &pbsubscribe.SubscribeRequest{ + Topic: EventTopicServiceHealth, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "key", + Namespace: "consul", + Partition: "partition", + PeerName: "peer", + }, + }, + Token: aclToken, + Index: 2, + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: &stream.SubscribeRequest{ + Topic: EventTopicServiceHealth, + Subject: EventSubjectService{ + Key: "key", + EnterpriseMeta: acl.EnterpriseMeta{}, + PeerName: "peer", + }, + Token: aclToken, + Index: 2, + }, + err: nil, + }, + "Config": { + req: &pbsubscribe.SubscribeRequest{ + Topic: EventTopicAPIGateways, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "key", + Namespace: "consul", + Partition: "partition", + PeerName: "peer", + }, + }, + Token: aclToken, + Index: 2, + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: &stream.SubscribeRequest{ + Topic: EventTopicAPIGateways, + Subject: EventSubjectConfigEntry{ + Name: "key", + EnterpriseMeta: &acl.EnterpriseMeta{}, + }, + Token: aclToken, + Index: 2, + }, + err: nil, + }, + "Service list without wildcard returns error": { + req: &pbsubscribe.SubscribeRequest{ + Topic: EventTopicServiceList, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "key", + Namespace: "consul", + Partition: "partition", + PeerName: "peer", + }, + }, + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: nil, + err: fmt.Errorf("topic %s can only be consumed using WildcardSubject", EventTopicServiceList), + }, + "Unrecognized topic returns error": { + req: &pbsubscribe.SubscribeRequest{ + Topic: 99999, + }, + entMeta: acl.EnterpriseMeta{}, + expectedSubscribeRequest: nil, + err: fmt.Errorf("cannot construct subject for topic 99999"), + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + actual, err := PBToStreamSubscribeRequest(tc.req, tc.entMeta) + + if tc.err != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.expectedSubscribeRequest, actual) + }) + } +} From f3e8c1b5688ba0cd452f756e53197c6692e6f2d6 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Fri, 9 Dec 2022 13:45:30 -0500 Subject: [PATCH 03/11] Make configs singular, add all configs to PBToStreamSubscribeRequest --- agent/consul/state/events.go | 4 +- agent/consul/state/events_test.go | 4 +- agent/consul/state/memdb.go | 9 ++-- proto/pbsubscribe/subscribe.pb.go | 83 ++++++++++++++++--------------- proto/pbsubscribe/subscribe.proto | 17 ++++--- 5 files changed, 63 insertions(+), 54 deletions(-) diff --git a/agent/consul/state/events.go b/agent/consul/state/events.go index 0d2bf6df1c41..f17f38f65763 100644 --- a/agent/consul/state/events.go +++ b/agent/consul/state/events.go @@ -41,7 +41,9 @@ func PBToStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta acl.E PeerName: named.PeerName, } case EventTopicMeshConfig, EventTopicServiceResolver, EventTopicIngressGateway, - EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateways: + EventTopicServiceIntentions, EventTopicServiceDefaults, EventTopicAPIGateway, + EventTopicTCPRoute, EventTopicHTTPRoute, EventTopicInlineCertificate, + EventTopicBoundAPIGateway: subject = EventSubjectConfigEntry{ Name: named.Key, EnterpriseMeta: &entMeta, diff --git a/agent/consul/state/events_test.go b/agent/consul/state/events_test.go index 60d71800f126..e5c4714f2478 100644 --- a/agent/consul/state/events_test.go +++ b/agent/consul/state/events_test.go @@ -82,7 +82,7 @@ func TestPBToStreamSubscribeRequest(t *testing.T) { }, "Config": { req: &pbsubscribe.SubscribeRequest{ - Topic: EventTopicAPIGateways, + Topic: EventTopicAPIGateway, Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ NamedSubject: &pbsubscribe.NamedSubject{ Key: "key", @@ -96,7 +96,7 @@ func TestPBToStreamSubscribeRequest(t *testing.T) { }, entMeta: acl.EnterpriseMeta{}, expectedSubscribeRequest: &stream.SubscribeRequest{ - Topic: EventTopicAPIGateways, + Topic: EventTopicAPIGateway, Subject: EventSubjectConfigEntry{ Name: "key", EnterpriseMeta: &acl.EnterpriseMeta{}, diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index ce2f0b0dc8fe..a0b2f116acb2 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -186,10 +186,11 @@ var ( EventTopicServiceIntentions = pbsubscribe.Topic_ServiceIntentions EventTopicServiceDefaults = pbsubscribe.Topic_ServiceDefaults EventTopicServiceList = pbsubscribe.Topic_ServiceList - EventTopicAPIGateways = pbsubscribe.Topic_APIGateways - EventTopicTCPRoutes = pbsubscribe.Topic_TCPRoutes - EventTopicHTTPRoutes = pbsubscribe.Topic_HTTPRoutes - EventTopicInlineCertificates = pbsubscribe.Topic_InlineCertificates + EventTopicAPIGateway = pbsubscribe.Topic_APIGateway + EventTopicTCPRoute = pbsubscribe.Topic_TCPRoute + EventTopicHTTPRoute = pbsubscribe.Topic_HTTPRoute + EventTopicInlineCertificate = pbsubscribe.Topic_InlineCertificate + EventTopicBoundAPIGateway = pbsubscribe.Topic_BoundAPIGateway ) func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index dbf8da98890d..adbe8d23758e 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -59,13 +59,15 @@ const ( // ServiceDefaults topic contains events for changes to service-defaults. Topic_ServiceDefaults Topic = 8 // APIGateway topic contains events for changes to api-gateways. - Topic_APIGateways Topic = 9 - // TCPRoutes topic contains events for changes to tcp-routes. - Topic_TCPRoutes Topic = 10 - // HTTPRoutes topic contains events for changes to http-routes. - Topic_HTTPRoutes Topic = 11 - // InlineCertificates topic contains events for changes to inline-certificates. - Topic_InlineCertificates Topic = 12 + Topic_APIGateway Topic = 9 + // TCPRoute topic contains events for changes to tcp-routes. + Topic_TCPRoute Topic = 10 + // HTTPRoute topic contains events for changes to http-routes. + Topic_HTTPRoute Topic = 11 + // InlineCertificate topic contains events for changes to inline-certificates. + Topic_InlineCertificate Topic = 12 + // BoundAPIGateway topic contains events for changes to bound-api-gateways. + Topic_BoundAPIGateway Topic = 13 ) // Enum value maps for Topic. @@ -80,10 +82,11 @@ var ( 6: "ServiceIntentions", 7: "ServiceList", 8: "ServiceDefaults", - 9: "APIGateways", - 10: "TCPRoutes", - 11: "HTTPRoutes", - 12: "InlineCertificates", + 9: "APIGateway", + 10: "TCPRoute", + 11: "HTTPRoute", + 12: "InlineCertificate", + 13: "BoundAPIGateway", } Topic_value = map[string]int32{ "Unknown": 0, @@ -95,10 +98,11 @@ var ( "ServiceIntentions": 6, "ServiceList": 7, "ServiceDefaults": 8, - "APIGateways": 9, - "TCPRoutes": 10, - "HTTPRoutes": 11, - "InlineCertificates": 12, + "APIGateway": 9, + "TCPRoute": 10, + "HTTPRoute": 11, + "InlineCertificate": 12, + "BoundAPIGateway": 13, } ) @@ -977,7 +981,7 @@ var file_proto_pbsubscribe_subscribe_proto_rawDesc = []byte{ 0x45, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x52, 0x0e, 0x45, 0x6e, 0x74, 0x65, 0x72, 0x70, 0x72, 0x69, 0x73, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0xff, 0x01, 0x0a, 0x05, 0x54, + 0x52, 0x08, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0x90, 0x02, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x48, @@ -989,29 +993,30 @@ var file_proto_pbsubscribe_subscribe_proto_rawDesc = []byte{ 0x63, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x10, 0x06, 0x12, 0x0f, 0x0a, 0x0b, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x10, 0x07, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, - 0x74, 0x73, 0x10, 0x08, 0x12, 0x0f, 0x0a, 0x0b, 0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, - 0x61, 0x79, 0x73, 0x10, 0x09, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x43, 0x50, 0x52, 0x6f, 0x75, 0x74, - 0x65, 0x73, 0x10, 0x0a, 0x12, 0x0e, 0x0a, 0x0a, 0x48, 0x54, 0x54, 0x50, 0x52, 0x6f, 0x75, 0x74, - 0x65, 0x73, 0x10, 0x0b, 0x12, 0x16, 0x0a, 0x12, 0x49, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x43, 0x65, - 0x72, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x73, 0x10, 0x0c, 0x2a, 0x29, 0x0a, 0x09, - 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x59, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, 0x65, - 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, - 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, - 0x30, 0x01, 0x42, 0x92, 0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, - 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, 0x6e, - 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5c, - 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x73, 0x10, 0x08, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, + 0x61, 0x79, 0x10, 0x09, 0x12, 0x0c, 0x0a, 0x08, 0x54, 0x43, 0x50, 0x52, 0x6f, 0x75, 0x74, 0x65, + 0x10, 0x0a, 0x12, 0x0d, 0x0a, 0x09, 0x48, 0x54, 0x54, 0x50, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x10, + 0x0b, 0x12, 0x15, 0x0a, 0x11, 0x49, 0x6e, 0x6c, 0x69, 0x6e, 0x65, 0x43, 0x65, 0x72, 0x74, 0x69, + 0x66, 0x69, 0x63, 0x61, 0x74, 0x65, 0x10, 0x0c, 0x12, 0x13, 0x0a, 0x0f, 0x42, 0x6f, 0x75, 0x6e, + 0x64, 0x41, 0x50, 0x49, 0x47, 0x61, 0x74, 0x65, 0x77, 0x61, 0x79, 0x10, 0x0d, 0x2a, 0x29, 0x0a, + 0x09, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x4f, 0x70, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x44, 0x65, 0x72, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x10, 0x01, 0x32, 0x59, 0x0a, 0x17, 0x53, 0x74, 0x61, 0x74, + 0x65, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x3e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x12, 0x1b, 0x2e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, + 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, + 0x00, 0x30, 0x01, 0x42, 0x92, 0x01, 0x0a, 0x0d, 0x63, 0x6f, 0x6d, 0x2e, 0x73, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x61, 0x73, 0x68, 0x69, 0x63, 0x6f, 0x72, 0x70, 0x2f, 0x63, 0x6f, + 0x6e, 0x73, 0x75, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x62, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xa2, 0x02, 0x03, 0x53, 0x58, 0x58, 0xaa, 0x02, 0x09, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0xca, 0x02, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0xe2, 0x02, 0x15, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, + 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x09, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index cbb2b5b3ebaa..8eaf66b2b3d6 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -75,18 +75,19 @@ enum Topic { ServiceDefaults = 8; // APIGateway topic contains events for changes to api-gateways. - APIGateways = 9; + APIGateway = 9; - // TCPRoutes topic contains events for changes to tcp-routes. - TCPRoutes = 10; + // TCPRoute topic contains events for changes to tcp-routes. + TCPRoute = 10; - // HTTPRoutes topic contains events for changes to http-routes. - HTTPRoutes = 11; + // HTTPRoute topic contains events for changes to http-routes. + HTTPRoute = 11; - // InlineCertificates topic contains events for changes to inline-certificates. - InlineCertificates = 12; + // InlineCertificate topic contains events for changes to inline-certificates. + InlineCertificate = 12; - // TODO Do I add BoundAPIGateway here? I think I do... + // BoundAPIGateway topic contains events for changes to bound-api-gateways. + BoundAPIGateway = 13; } message NamedSubject { From e319e2d04161faec726a858159759a335383e0e0 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Fri, 9 Dec 2022 14:19:19 -0500 Subject: [PATCH 04/11] Add snapshot methods --- agent/consul/state/config_entry_events.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/agent/consul/state/config_entry_events.go b/agent/consul/state/config_entry_events.go index 5fc71f0275c9..f763f7535b13 100644 --- a/agent/consul/state/config_entry_events.go +++ b/agent/consul/state/config_entry_events.go @@ -117,6 +117,26 @@ func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream. return s.configEntrySnapshot(structs.ServiceDefaults, req, buf) } +func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.APIGateway, req, buf) +} + +func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.InlineCertificate, req, buf) +} + +func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.TCPRoute, req, buf) +} + +func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.HTTPRoute, req, buf) +} + +func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf) +} + func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { var ( idx uint64 From 60c52b84b4cee09f43a03e429f332a04c083515f Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Mon, 12 Dec 2022 13:04:40 -0500 Subject: [PATCH 05/11] Add config_entry_events tests --- .../consul/state/config_entry_events_test.go | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/agent/consul/state/config_entry_events_test.go b/agent/consul/state/config_entry_events_test.go index 2e50b677bda2..a1b7b332f694 100644 --- a/agent/consul/state/config_entry_events_test.go +++ b/agent/consul/state/config_entry_events_test.go @@ -547,3 +547,72 @@ func TestServiceDefaultsSnapshot(t *testing.T) { }) } } + +func TestAPIGatewaySnapshot(t *testing.T) { + const index uint64 = 123 + + ixn1 := &structs.APIGatewayConfigEntry{ + Kind: structs.APIGateway, + Name: "agw1", + } + ixn2 := &structs.APIGatewayConfigEntry{ + Kind: structs.APIGateway, + Name: "agw2", + } + + store := testStateStore(t) + require.NoError(t, store.EnsureConfigEntry(index, ixn1)) + require.NoError(t, store.EnsureConfigEntry(index, ixn2)) + + testCases := map[string]struct { + subject stream.Subject + events []stream.Event + }{ + "named entry": { + subject: EventSubjectConfigEntry{Name: ixn1.Name}, + events: []stream.Event{ + { + Topic: EventTopicAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + }, + }, + "wildcard": { + subject: stream.SubjectWildcard, + events: []stream.Event{ + { + Topic: EventTopicAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + { + Topic: EventTopicAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn2, + }, + }, + }, + }, + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := store.APIGatewaySnapshot(stream.SubscribeRequest{Subject: tc.subject}, buf) + require.NoError(t, err) + require.Equal(t, index, idx) + require.Len(t, buf.events, 1) + require.ElementsMatch(t, tc.events, buf.events[0]) + }) + } +} From ac39dd2f49709685e6f74c0129c838587545b9bf Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Mon, 12 Dec 2022 16:35:33 -0500 Subject: [PATCH 06/11] Add config entry kind to topic for new configs --- agent/consul/state/config_entry_events.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/agent/consul/state/config_entry_events.go b/agent/consul/state/config_entry_events.go index f763f7535b13..d9af30aee4ad 100644 --- a/agent/consul/state/config_entry_events.go +++ b/agent/consul/state/config_entry_events.go @@ -17,6 +17,11 @@ var configEntryKindToTopic = map[string]stream.Topic{ structs.IngressGateway: EventTopicIngressGateway, structs.ServiceIntentions: EventTopicServiceIntentions, structs.ServiceDefaults: EventTopicServiceDefaults, + structs.APIGateway: EventTopicAPIGateway, + structs.TCPRoute: EventTopicTCPRoute, + structs.HTTPRoute: EventTopicHTTPRoute, + structs.InlineCertificate: EventTopicInlineCertificate, + structs.BoundAPIGateway: EventTopicBoundAPIGateway, } // EventSubjectConfigEntry is a stream.Subject used to route and receive events From f1f3626e8e8dfe501b60f67b0840941eca5f2c85 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Mon, 12 Dec 2022 16:59:51 -0500 Subject: [PATCH 07/11] Add unit tests for snapshot methods --- agent/consul/state/config_entry_events.go | 18 +- .../consul/state/config_entry_events_test.go | 276 ++++++++++++++++++ 2 files changed, 290 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/config_entry_events.go b/agent/consul/state/config_entry_events.go index d9af30aee4ad..f22f40d5e14a 100644 --- a/agent/consul/state/config_entry_events.go +++ b/agent/consul/state/config_entry_events.go @@ -122,22 +122,32 @@ func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream. return s.configEntrySnapshot(structs.ServiceDefaults, req, buf) } +// APIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of +// api-gateway config entries. func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { return s.configEntrySnapshot(structs.APIGateway, req, buf) } -func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { - return s.configEntrySnapshot(structs.InlineCertificate, req, buf) -} - +// TCPRouteSnapshot is a stream.SnapshotFunc that returns a snapshot of +// tcp-route config entries. func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { return s.configEntrySnapshot(structs.TCPRoute, req, buf) } +// HTTPRouteSnapshot is a stream.SnapshotFunc that retuns a snapshot of +// http-route config entries. func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { return s.configEntrySnapshot(structs.HTTPRoute, req, buf) } +// InlineCertificateSnapshot is a stream.SnapshotFunc that returns a snapshot of +// inline-certificate config entries. +func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return s.configEntrySnapshot(structs.InlineCertificate, req, buf) +} + +// BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of +// bound-api-gateways config entries. func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf) } diff --git a/agent/consul/state/config_entry_events_test.go b/agent/consul/state/config_entry_events_test.go index a1b7b332f694..3de3a1841ade 100644 --- a/agent/consul/state/config_entry_events_test.go +++ b/agent/consul/state/config_entry_events_test.go @@ -616,3 +616,279 @@ func TestAPIGatewaySnapshot(t *testing.T) { }) } } + +func TestTCPRouteSnapshot(t *testing.T) { + const index uint64 = 123 + + ixn1 := &structs.TCPRouteConfigEntry{ + Kind: structs.TCPRoute, + Name: "tcprt1", + } + ixn2 := &structs.TCPRouteConfigEntry{ + Kind: structs.TCPRoute, + Name: "tcprt2", + } + + store := testStateStore(t) + require.NoError(t, store.EnsureConfigEntry(index, ixn1)) + require.NoError(t, store.EnsureConfigEntry(index, ixn2)) + + testCases := map[string]struct { + subject stream.Subject + events []stream.Event + }{ + "named entry": { + subject: EventSubjectConfigEntry{Name: ixn1.Name}, + events: []stream.Event{ + { + Topic: EventTopicTCPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + }, + }, + "wildcard": { + subject: stream.SubjectWildcard, + events: []stream.Event{ + { + Topic: EventTopicTCPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + { + Topic: EventTopicTCPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn2, + }, + }, + }, + }, + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := store.TCPRouteSnapshot(stream.SubscribeRequest{Subject: tc.subject}, buf) + require.NoError(t, err) + require.Equal(t, index, idx) + require.Len(t, buf.events, 1) + require.ElementsMatch(t, tc.events, buf.events[0]) + }) + } +} + +func TestHTTPRouteSnapshot(t *testing.T) { + const index uint64 = 123 + + ixn1 := &structs.HTTPRouteConfigEntry{ + Kind: structs.HTTPRoute, + Name: "httprt1", + } + ixn2 := &structs.HTTPRouteConfigEntry{ + Kind: structs.HTTPRoute, + Name: "httprt2", + } + + store := testStateStore(t) + require.NoError(t, store.EnsureConfigEntry(index, ixn1)) + require.NoError(t, store.EnsureConfigEntry(index, ixn2)) + + testCases := map[string]struct { + subject stream.Subject + events []stream.Event + }{ + "named entry": { + subject: EventSubjectConfigEntry{Name: ixn1.Name}, + events: []stream.Event{ + { + Topic: EventTopicHTTPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + }, + }, + "wildcard": { + subject: stream.SubjectWildcard, + events: []stream.Event{ + { + Topic: EventTopicHTTPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + { + Topic: EventTopicHTTPRoute, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn2, + }, + }, + }, + }, + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := store.HTTPRouteSnapshot(stream.SubscribeRequest{Subject: tc.subject}, buf) + require.NoError(t, err) + require.Equal(t, index, idx) + require.Len(t, buf.events, 1) + require.ElementsMatch(t, tc.events, buf.events[0]) + }) + } +} + +func TestInlineCertificateSnapshot(t *testing.T) { + const index uint64 = 123 + + ixn1 := &structs.InlineCertificateConfigEntry{ + Kind: structs.InlineCertificate, + Name: "inlinecert1", + } + ixn2 := &structs.InlineCertificateConfigEntry{ + Kind: structs.InlineCertificate, + Name: "inlinecert2", + } + + store := testStateStore(t) + require.NoError(t, store.EnsureConfigEntry(index, ixn1)) + require.NoError(t, store.EnsureConfigEntry(index, ixn2)) + + testCases := map[string]struct { + subject stream.Subject + events []stream.Event + }{ + "named entry": { + subject: EventSubjectConfigEntry{Name: ixn1.Name}, + events: []stream.Event{ + { + Topic: EventTopicInlineCertificate, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + }, + }, + "wildcard": { + subject: stream.SubjectWildcard, + events: []stream.Event{ + { + Topic: EventTopicInlineCertificate, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + { + Topic: EventTopicInlineCertificate, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn2, + }, + }, + }, + }, + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := store.InlineCertificateSnapshot(stream.SubscribeRequest{Subject: tc.subject}, buf) + require.NoError(t, err) + require.Equal(t, index, idx) + require.Len(t, buf.events, 1) + require.ElementsMatch(t, tc.events, buf.events[0]) + }) + } +} + +func TestBoundAPIGatewaySnapshot(t *testing.T) { + const index uint64 = 123 + + ixn1 := &structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: "boundapigw1", + } + ixn2 := &structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: "boundapigw2", + } + + store := testStateStore(t) + require.NoError(t, store.EnsureConfigEntry(index, ixn1)) + require.NoError(t, store.EnsureConfigEntry(index, ixn2)) + + testCases := map[string]struct { + subject stream.Subject + events []stream.Event + }{ + "named entry": { + subject: EventSubjectConfigEntry{Name: ixn1.Name}, + events: []stream.Event{ + { + Topic: EventTopicBoundAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + }, + }, + "wildcard": { + subject: stream.SubjectWildcard, + events: []stream.Event{ + { + Topic: EventTopicBoundAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn1, + }, + }, + { + Topic: EventTopicBoundAPIGateway, + Index: index, + Payload: EventPayloadConfigEntry{ + Op: pbsubscribe.ConfigEntryUpdate_Upsert, + Value: ixn2, + }, + }, + }, + }, + } + + for desc, tc := range testCases { + t.Run(desc, func(t *testing.T) { + buf := &snapshotAppender{} + + idx, err := store.BoundAPIGatewaySnapshot(stream.SubscribeRequest{Subject: tc.subject}, buf) + require.NoError(t, err) + require.Equal(t, index, idx) + require.Len(t, buf.events, 1) + require.ElementsMatch(t, tc.events, buf.events[0]) + }) + } +} From 996e9aa1767b804778560a2122133f4716b58317 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Wed, 14 Dec 2022 11:11:34 -0500 Subject: [PATCH 08/11] Start adding integration test --- agent/consul/subscription_test.go | 81 +++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 agent/consul/subscription_test.go diff --git a/agent/consul/subscription_test.go b/agent/consul/subscription_test.go new file mode 100644 index 000000000000..f3c349433091 --- /dev/null +++ b/agent/consul/subscription_test.go @@ -0,0 +1,81 @@ +package consul + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestConfigEntrySubscriptions(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + configEntry func(string) structs.ConfigEntry + topic stream.Topic + }{ + "Subscribe to API Gateway Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.APIGatewayConfigEntry{ + Kind: structs.APIGateway, + Name: name, + } + }, + topic: state.EventTopicAPIGateway, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := stream.NewEventPublisher(1 * time.Millisecond) + go publisher.Run(ctx) + + store := fsm.NewFromDeps(fsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStoreWithEventPublisher(nil, publisher) + }, + Publisher: publisher, + }).State() + + // Push 200 instances of the config entry to the store. + for i := 0; i < 200; i++ { + entryIndex := uint64(i + 1) + name := fmt.Sprintf("foo-%d", i) + require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name))) + } + + received := []string{} + + go func() { + subscribeRequest := &stream.SubscribeRequest{ + Topic: tc.topic, + } + fmt.Println(subscribeRequest) // TODO + }() + + LOOP: + for { + select { + case <-ctx.Done(): + break LOOP + } + } + + require.Len(t, received, 200) + for i := 0; i < 200; i++ { + require.Contains(t, received, fmt.Sprintf("foo-%d", i)) + } + }) + } +} From e1b2260f16436410cd6105381ea261e117954111 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Mon, 19 Dec 2022 16:03:05 -0500 Subject: [PATCH 09/11] Test using the new controller code --- agent/consul/controller/controller_test.go | 146 ++++++++++++++++++++- agent/consul/fsm/fsm.go | 35 +++++ agent/consul/subscription_test.go | 81 ------------ 3 files changed, 180 insertions(+), 82 deletions(-) delete mode 100644 agent/consul/subscription_test.go diff --git a/agent/consul/controller/controller_test.go b/agent/consul/controller/controller_test.go index d71b7403a7db..fc270a6564f5 100644 --- a/agent/consul/controller/controller_test.go +++ b/agent/consul/controller/controller_test.go @@ -64,7 +64,7 @@ LOOP: } } - // since we only modified each entry once, we should have exactly 200 reconcliation calls + // since we only modified each entry once, we should have exactly 200 reconciliation calls require.Len(t, received, 200) for i := 0; i < 200; i++ { require.Contains(t, received, fmt.Sprintf("foo-%d", i)) @@ -271,3 +271,147 @@ func TestBasicController_RunPanicAssertions(t *testing.T) { controller.WithQueueFactory(RunWorkQueue) }) } + +func TestConfigEntrySubscriptions(t *testing.T) { + t.Parallel() + + cases := map[string]struct { + configEntry func(string) structs.ConfigEntry + topic stream.Topic + kind string + }{ + "Subscribe to Service Resolver Config Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: name, + } + }, + topic: state.EventTopicServiceResolver, + kind: structs.ServiceResolver, + }, + "Subscribe to Ingress Gateway Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: name, + } + }, + topic: state.EventTopicIngressGateway, + kind: structs.IngressGateway, + }, + "Subscribe to Service Intentions Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.ServiceIntentionsConfigEntry{ + Kind: structs.ServiceIntentions, + Name: name, + } + }, + topic: state.EventTopicServiceIntentions, + kind: structs.ServiceIntentions, + }, + "Subscribe to API Gateway Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.APIGatewayConfigEntry{ + Kind: structs.APIGateway, + Name: name, + } + }, + topic: state.EventTopicAPIGateway, + kind: structs.APIGateway, + }, + "Subscribe to Inline Certificate Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.InlineCertificateConfigEntry{ + Kind: structs.InlineCertificate, + Name: name, + } + }, + topic: state.EventTopicInlineCertificate, + kind: structs.InlineCertificate, + }, + "Subscribe to HTTP Route Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.HTTPRouteConfigEntry{ + Kind: structs.HTTPRoute, + Name: name, + } + }, + topic: state.EventTopicHTTPRoute, + kind: structs.HTTPRoute, + }, + "Subscribe to TCP Route Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.TCPRouteConfigEntry{ + Kind: structs.TCPRoute, + Name: name, + } + }, + topic: state.EventTopicTCPRoute, + kind: structs.TCPRoute, + }, + "Subscribe to Bound API Gateway Changes": { + configEntry: func(name string) structs.ConfigEntry { + return &structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: name, + } + }, + topic: state.EventTopicBoundAPIGateway, + kind: structs.BoundAPIGateway, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reconciler := newTestReconciler(false) + + publisher := stream.NewEventPublisher(1 * time.Millisecond) + go publisher.Run(ctx) + + // get the store through the FSM since the publisher handlers get registered through it + store := fsm.NewFromDeps(fsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStoreWithEventPublisher(nil, publisher) + }, + Publisher: publisher, + }).State() + + for i := 0; i < 200; i++ { + entryIndex := uint64(i + 1) + name := fmt.Sprintf("foo-%d", i) + require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name))) + } + + go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ + Topic: tc.topic, + Subject: stream.SubjectWildcard, + }).WithWorkers(10).Run(ctx) + + received := []string{} + LOOP: + for { + select { + case request := <-reconciler.received: + require.Equal(t, tc.kind, request.Kind) + received = append(received, request.Name) + if len(received) == 200 { + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + + // since we only modified each entry once, we should have exactly 200 reconciliation calls + require.Len(t, received, 200) + for i := 0; i < 200; i++ { + require.Contains(t, received, fmt.Sprintf("foo-%d", i)) + } + }) + } +} diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 46d7d7731020..87f761f154d6 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -342,4 +342,39 @@ func (c *FSM) registerStreamSnapshotHandlers() { if err != nil { panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) } + + err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return c.State().APIGatewaySnapshot(req, buf) + }, true) + if err != nil { + panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) + } + + err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return c.State().InlineCertificateSnapshot(req, buf) + }, true) + if err != nil { + panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) + } + + err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return c.State().HTTPRouteSnapshot(req, buf) + }, true) + if err != nil { + panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) + } + + err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return c.State().TCPRouteSnapshot(req, buf) + }, true) + if err != nil { + panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) + } + + err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + return c.State().BoundAPIGatewaySnapshot(req, buf) + }, true) + if err != nil { + panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err)) + } } diff --git a/agent/consul/subscription_test.go b/agent/consul/subscription_test.go deleted file mode 100644 index f3c349433091..000000000000 --- a/agent/consul/subscription_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package consul - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/hashicorp/consul/agent/consul/fsm" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/consul/stream" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/require" -) - -func TestConfigEntrySubscriptions(t *testing.T) { - t.Parallel() - - cases := map[string]struct { - configEntry func(string) structs.ConfigEntry - topic stream.Topic - }{ - "Subscribe to API Gateway Changes": { - configEntry: func(name string) structs.ConfigEntry { - return &structs.APIGatewayConfigEntry{ - Kind: structs.APIGateway, - Name: name, - } - }, - topic: state.EventTopicAPIGateway, - }, - } - - for name, tc := range cases { - t.Run(name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - publisher := stream.NewEventPublisher(1 * time.Millisecond) - go publisher.Run(ctx) - - store := fsm.NewFromDeps(fsm.Deps{ - Logger: hclog.New(nil), - NewStateStore: func() *state.Store { - return state.NewStateStoreWithEventPublisher(nil, publisher) - }, - Publisher: publisher, - }).State() - - // Push 200 instances of the config entry to the store. - for i := 0; i < 200; i++ { - entryIndex := uint64(i + 1) - name := fmt.Sprintf("foo-%d", i) - require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name))) - } - - received := []string{} - - go func() { - subscribeRequest := &stream.SubscribeRequest{ - Topic: tc.topic, - } - fmt.Println(subscribeRequest) // TODO - }() - - LOOP: - for { - select { - case <-ctx.Done(): - break LOOP - } - } - - require.Len(t, received, 200) - for i := 0; i < 200; i++ { - require.Contains(t, received, fmt.Sprintf("foo-%d", i)) - } - }) - } -} From 4b0a1babeebacbe863ba0080950fa4f34f04d534 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Tue, 20 Dec 2022 11:28:27 -0500 Subject: [PATCH 10/11] Update agent/consul/state/config_entry_events.go Co-authored-by: Nathan Coleman --- agent/consul/state/config_entry_events.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/consul/state/config_entry_events.go b/agent/consul/state/config_entry_events.go index f22f40d5e14a..c081d778b8d2 100644 --- a/agent/consul/state/config_entry_events.go +++ b/agent/consul/state/config_entry_events.go @@ -147,7 +147,7 @@ func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf strea } // BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of -// bound-api-gateways config entries. +// bound-api-gateway config entries. func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf) } From 85c480d98f910f3b93c6acfa3ce018cc2645f6d4 Mon Sep 17 00:00:00 2001 From: Thomas Eckert Date: Tue, 20 Dec 2022 11:39:55 -0500 Subject: [PATCH 11/11] Check value of error --- agent/consul/state/events_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/agent/consul/state/events_test.go b/agent/consul/state/events_test.go index e5c4714f2478..a619356aada4 100644 --- a/agent/consul/state/events_test.go +++ b/agent/consul/state/events_test.go @@ -125,6 +125,14 @@ func TestPBToStreamSubscribeRequest(t *testing.T) { "Unrecognized topic returns error": { req: &pbsubscribe.SubscribeRequest{ Topic: 99999, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "key", + Namespace: "consul", + Partition: "partition", + PeerName: "peer", + }, + }, }, entMeta: acl.EnterpriseMeta{}, expectedSubscribeRequest: nil, @@ -137,7 +145,7 @@ func TestPBToStreamSubscribeRequest(t *testing.T) { actual, err := PBToStreamSubscribeRequest(tc.req, tc.entMeta) if tc.err != nil { - require.Error(t, err) + require.EqualError(t, err, tc.err.Error()) } else { require.NoError(t, err) }