Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Gateway XDS Primitives, endpoints and clusters #17002

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/consul/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1906,7 +1906,7 @@ func TestServer_ReloadConfig(t *testing.T) {
defaults := DefaultConfig()
got := s.raft.ReloadableConfig()
require.Equal(t, uint64(4321), got.SnapshotThreshold,
"should have be reloaded to new value")
"should have been reloaded to new value")
require.Equal(t, defaults.RaftConfig.SnapshotInterval, got.SnapshotInterval,
"should have remained the default interval")
require.Equal(t, defaults.RaftConfig.TrailingLogs, got.TrailingLogs,
Expand Down
38 changes: 37 additions & 1 deletion agent/proxycfg/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, sna
return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap)
}

return nil
return h.recompileDiscoveryChains(snap)
}

// handleRootCAUpdate responds to changes in the watched root CA for a gateway
Expand Down Expand Up @@ -420,6 +420,42 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
return nil
}

func (h *handlerAPIGateway) recompileDiscoveryChains(snap *ConfigSnapshot) error {
synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{}

for name, listener := range snap.APIGateway.Listeners {
boundListener, ok := snap.APIGateway.BoundListeners[name]
if !ok {
// Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again.
continue
}

if !snap.APIGateway.GatewayConfig.ListenerIsReady(name) {
// skip any listeners that might be in an invalid state
continue
}

// Create a synthesized discovery chain for each service.
services, upstreams, compiled, err := snap.APIGateway.synthesizeChains(h.source.Datacenter, listener, boundListener)
if err != nil {
return err
}

if len(upstreams) == 0 {
// skip if we can't construct any upstreams
continue
}

for i, service := range services {
id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta))
synthesizedChains[id] = compiled[i]
}
}

snap.APIGateway.DiscoveryChain = synthesizedChains
return nil
}

// referenceIsForListener returns whether the provided structs.ResourceReference
// targets the provided structs.APIGatewayListener. For this to be true, the kind
// and name must match the structs.APIGatewayConfigEntry containing the listener,
Expand Down
1 change: 1 addition & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ DOMAIN_LOOP:
return services, upstreams, compiled, err
}

// TODO use this in listener code
func (c *configSnapshotAPIGateway) toIngressTLS(key IngressListenerKey, listener structs.APIGatewayListener, bound structs.BoundAPIGatewayListener) (*structs.GatewayTLSConfig, error) {
if len(listener.TLS.Certificates) == 0 {
return nil, nil
Expand Down
54 changes: 47 additions & 7 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,7 @@ func (s *ResourceGenerator) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapsho
}
return res, nil
case structs.ServiceKindAPIGateway:
// TODO Find a cleaner solution, can't currently pass unexported property types
var err error
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter)
if err != nil {
return nil, err
}
res, err := s.clustersFromSnapshotIngressGateway(cfgSnap)
res, err := s.clustersFromSnapshotAPIGateway(cfgSnap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -816,6 +810,52 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg
return clusters, nil
}

func (s *ResourceGenerator) clustersFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var clusters []proto.Message
createdClusters := make(map[proxycfg.UpstreamID]bool)
readyUpstreams := getReadyUpstreams(cfgSnap)

for listenerKey, upstreams := range readyUpstreams {
for _, upstream := range upstreams {
uid := proxycfg.NewUpstreamID(&upstream)

// If we've already created a cluster for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate clusters in that case.
if createdClusters[uid] {
continue
}

// Grab the discovery chain compiled in handlerAPIGateway.recompileDiscoveryChains
chain, ok := cfgSnap.APIGateway.DiscoveryChain[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no discovery chain for upstream %q", uid)
}

// Generate the list of upstream clusters for the discovery chain
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &upstream, chain, cfgSnap, false)
if err != nil {
return nil, err
}

for _, cluster := range upstreamClusters {
// TODO Something analogous to s.configIngressUpstreamCluster(c, cfgSnap, listenerKey, &u)
// but not sure what that func does yet
s.configAPIUpstreamCluster(cluster, cfgSnap, listenerKey, &upstream)
clusters = append(clusters, cluster)
}
createdClusters[uid] = true

}
}
return clusters, nil
}

func (s *ResourceGenerator) configAPIUpstreamCluster(c *envoy_cluster_v3.Cluster, cfgSnap *proxycfg.ConfigSnapshot, listenerKey proxycfg.APIGatewayListenerKey, u *structs.Upstream) {
//TODO I don't think this is currently needed with what api gateway supports, but will be needed in the future

}

func (s *ResourceGenerator) configIngressUpstreamCluster(c *envoy_cluster_v3.Cluster, cfgSnap *proxycfg.ConfigSnapshot, listenerKey proxycfg.IngressListenerKey, u *structs.Upstream) {
var threshold *envoy_cluster_v3.CircuitBreakers_Thresholds
setThresholdLimit := func(limitType string, limit int) {
Expand Down
68 changes: 61 additions & 7 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh
case structs.ServiceKindIngressGateway:
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
case structs.ServiceKindAPIGateway:
// TODO Find a cleaner solution, can't currently pass unexported property types
var err error
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter)
if err != nil {
return nil, err
}
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
return s.endpointsFromSnapshotAPIGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
Expand Down Expand Up @@ -527,6 +521,65 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
return resources, nil
}

func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[proxycfg.APIGatewayListenerKey][]structs.Upstream {

readyUpstreams := map[proxycfg.APIGatewayListenerKey][]structs.Upstream{}
for _, l := range cfgSnap.APIGateway.Listeners {
//need to account for the state of the Listener when building the upstreams list
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) {
continue
}
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name]
//get route ref
for _, routeRef := range boundListener.Routes {
//get upstreams
upstreamMap := cfgSnap.APIGateway.Upstreams[routeRef]
for listenerKey, upstreams := range upstreamMap {
for _, u := range upstreams {
readyUpstreams[listenerKey] = append(readyUpstreams[listenerKey], u)
}
}
}
}
return readyUpstreams
}

func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
createdClusters := make(map[proxycfg.UpstreamID]bool)

readyUpstreams := getReadyUpstreams(cfgSnap)

for _, upstreams := range readyUpstreams {
for _, u := range upstreams {
uid := proxycfg.NewUpstreamID(&u)

// If we've already created endpoints for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
if createdClusters[uid] {
continue
}

es, err := s.endpointsFromDiscoveryChain(
uid,
cfgSnap.APIGateway.DiscoveryChain[uid],
cfgSnap,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
u.Config,
cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid],
cfgSnap.APIGateway.WatchedGatewayEndpoints[uid],
false,
)
if err != nil {
return nil, err
}
resources = append(resources, es...)
createdClusters[uid] = true
}
}
return resources, nil
}

// used in clusters.go
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
return &envoy_endpoint_v3.LbEndpoint{
Expand Down Expand Up @@ -628,6 +681,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(

var escapeHatchCluster *envoy_cluster_v3.Cluster
if !forMeshGateway {

cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
Expand Down
41 changes: 27 additions & 14 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,27 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
}},
},
}
}, []structs.BoundRoute{
&structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: "route",
Services: []structs.TCPService{{
Name: "service",
}},
},
}, []structs.InlineCertificateConfigEntry{{
Kind: structs.InlineCertificate,
Name: "certificate",
PrivateKey: gatewayTestPrivateKey,
Certificate: gatewayTestCertificate,
}}, nil)
},
[]structs.BoundRoute{
&structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: "route",
Services: []structs.TCPService{{
Name: "service",
}},
Parents: []structs.ResourceReference{
{
Kind: structs.APIGateway,
Name: "api-gateway",
},
},
},
}, []structs.InlineCertificateConfigEntry{{
Kind: structs.InlineCertificate,
Name: "certificate",
PrivateKey: gatewayTestPrivateKey,
Certificate: gatewayTestCertificate,
}}, nil)
},
},
{
Expand Down Expand Up @@ -410,6 +417,12 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
Name: "service",
}},
}},
Parents: []structs.ResourceReference{
{
Kind: structs.APIGateway,
Name: "api-gateway",
},
},
},
}, nil, []proxycfg.UpdateEvent{{
CorrelationID: "discovery-chain:" + serviceUID.String(),
Expand Down