Skip to content

Commit

Permalink
xds: generate endpoints directly from API gateway snapshot (#17390)
Browse files Browse the repository at this point in the history
* endpoints xds cluster configuration

* resources test fix

* fix reversion in resources_test

* Update agent/proxycfg/api_gateway.go

Co-authored-by: John Maguire <john.maguire@hashicorp.com>

* gofmt

* Modify getReadyUpstreams to filter upstreams by listener (#17410)

Each listener would previously have all upstreams from any route that bound to the listener. This is problematic when a route bound to one listener also binds to other listeners and so includes upstreams for multiple listeners. The list for a given listener would then wind up including upstreams for other listeners.

* Update agent/proxycfg/api_gateway.go

Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>

* Restore import blocking

* Skip to next route if route has no upstreams

* cleanup

* change set from bool to empty struct

---------

Co-authored-by: John Maguire <john.maguire@hashicorp.com>
Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
  • Loading branch information
3 people authored May 19, 2023
1 parent 1d6a0c8 commit 134aac7
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 35 deletions.
50 changes: 45 additions & 5 deletions agent/proxycfg/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, sna
return err
}
default:
return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap)
if err := (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap); err != nil {
return err
}
}

return nil
return h.recompileDiscoveryChains(snap)
}

// handleRootCAUpdate responds to changes in the watched root CA for a gateway
Expand Down Expand Up @@ -308,15 +310,14 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
DestinationNamespace: service.NamespaceOrDefault(),
DestinationPartition: service.PartitionOrDefault(),
LocalBindPort: listener.Port,
// TODO IngressHosts: g.Hosts,
// Pass the protocol that was configured on the listener in order
// to force that protocol on the Envoy listener.
Config: map[string]interface{}{
"protocol": "http",
},
}

listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port}
listenerKey := APIGatewayListenerKeyFromListener(listener)
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
}

Expand Down Expand Up @@ -370,7 +371,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
},
}

listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port}
listenerKey := APIGatewayListenerKeyFromListener(listener)
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
}

Expand Down Expand Up @@ -420,6 +421,45 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
return nil
}

func (h *handlerAPIGateway) recompileDiscoveryChains(snap *ConfigSnapshot) error {
synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{}

for name, listener := range snap.APIGateway.Listeners {
boundListener, ok := snap.APIGateway.BoundListeners[name]
if !(ok && snap.APIGateway.GatewayConfig.ListenerIsReady(name)) {
// Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again.
// skip any listeners that might be in an invalid state
continue
}

// Create a synthesized discovery chain for each service.
services, upstreams, compiled, err := snap.APIGateway.synthesizeChains(h.source.Datacenter, listener, boundListener)
if err != nil {
return err
}

if len(upstreams) == 0 {
// skip if we can't construct any upstreams
continue
}

for i, service := range services {
id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta))
if compiled[i].ServiceName != service.Name {
return fmt.Errorf("Compiled Discovery chain for %s does not match service %s", compiled[i].ServiceName, id)
}
synthesizedChains[id] = compiled[i]
}
}

// Merge in additional discovery chains
for id, chain := range synthesizedChains {
snap.APIGateway.DiscoveryChain[id] = chain
}

return nil
}

// referenceIsForListener returns whether the provided structs.ResourceReference
// targets the provided structs.APIGatewayListener. For this to be true, the kind
// and name must match the structs.APIGatewayConfigEntry containing the listener,
Expand Down
4 changes: 4 additions & 0 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ func (c *configSnapshotIngressGateway) isEmpty() bool {

type APIGatewayListenerKey = IngressListenerKey

func APIGatewayListenerKeyFromListener(l structs.APIGatewayListener) APIGatewayListenerKey {
return APIGatewayListenerKey{Protocol: string(l.Protocol), Port: l.Port}
}

type IngressListenerKey struct {
Protocol string
Port int
Expand Down
101 changes: 94 additions & 7 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh
case structs.ServiceKindIngressGateway:
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
case structs.ServiceKindAPIGateway:
// TODO Find a cleaner solution, can't currently pass unexported property types
var err error
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter)
if err != nil {
return nil, err
}
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
return s.endpointsFromSnapshotAPIGateway(cfgSnap)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
Expand Down Expand Up @@ -527,6 +521,98 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
return resources, nil
}

// helper struct to persist upstream parent information when ready upstream list is built out
type readyUpstreams struct {
listenerKey proxycfg.APIGatewayListenerKey
listenerCfg structs.APIGatewayListener
boundListenerCfg structs.BoundAPIGatewayListener
routeReference structs.ResourceReference
upstreams []structs.Upstream
}

// getReadyUpstreams returns a map containing the list of upstreams for each listener that is ready
func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[string]readyUpstreams {

ready := map[string]readyUpstreams{}
for _, l := range cfgSnap.APIGateway.Listeners {
// Only include upstreams for listeners that are ready
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) {
continue
}

// For each route bound to the listener
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name]
for _, routeRef := range boundListener.Routes {
// Get all upstreams for the route
routeUpstreams, ok := cfgSnap.APIGateway.Upstreams[routeRef]
if !ok {
continue
}

// Filter to upstreams that attach to this specific listener since
// a route can bind to + have upstreams for multiple listeners
listenerKey := proxycfg.APIGatewayListenerKeyFromListener(l)
routeUpstreamsForListener, ok := routeUpstreams[listenerKey]
if !ok {
continue
}

for _, upstream := range routeUpstreamsForListener {
// Insert or update readyUpstreams for the listener to include this upstream
r, ok := ready[l.Name]
if !ok {
r = readyUpstreams{
listenerKey: listenerKey,
listenerCfg: l,
boundListenerCfg: boundListener,
routeReference: routeRef,
}
}
r.upstreams = append(r.upstreams, upstream)
ready[l.Name] = r
}
}
}
return ready
}

func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
createdClusters := make(map[proxycfg.UpstreamID]struct{})

readyUpstreamsList := getReadyUpstreams(cfgSnap)

for _, readyUpstreams := range readyUpstreamsList {
for _, u := range readyUpstreams.upstreams {
uid := proxycfg.NewUpstreamID(&u)

// If we've already created endpoints for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
_, ok := createdClusters[uid]
if ok {
continue
}

endpoints, err := s.endpointsFromDiscoveryChain(
uid,
cfgSnap.APIGateway.DiscoveryChain[uid],
cfgSnap,
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
u.Config,
cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid],
cfgSnap.APIGateway.WatchedGatewayEndpoints[uid],
false,
)
if err != nil {
return nil, err
}
resources = append(resources, endpoints...)
createdClusters[uid] = struct{}{}
}
}
return resources, nil
}

// used in clusters.go
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
return &envoy_endpoint_v3.LbEndpoint{
Expand Down Expand Up @@ -628,6 +714,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(

var escapeHatchCluster *envoy_cluster_v3.Cluster
if !forMeshGateway {

cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
Expand Down
41 changes: 27 additions & 14 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,27 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
}},
},
}
}, []structs.BoundRoute{
&structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: "route",
Services: []structs.TCPService{{
Name: "service",
}},
},
}, []structs.InlineCertificateConfigEntry{{
Kind: structs.InlineCertificate,
Name: "certificate",
PrivateKey: gatewayTestPrivateKey,
Certificate: gatewayTestCertificate,
}}, nil)
},
[]structs.BoundRoute{
&structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: "route",
Services: []structs.TCPService{{
Name: "service",
}},
Parents: []structs.ResourceReference{
{
Kind: structs.APIGateway,
Name: "api-gateway",
},
},
},
}, []structs.InlineCertificateConfigEntry{{
Kind: structs.InlineCertificate,
Name: "certificate",
PrivateKey: gatewayTestPrivateKey,
Certificate: gatewayTestCertificate,
}}, nil)
},
},
{
Expand Down Expand Up @@ -410,6 +417,12 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
Name: "service",
}},
}},
Parents: []structs.ResourceReference{
{
Kind: structs.APIGateway,
Name: "api-gateway",
},
},
},
}, nil, []proxycfg.UpdateEvent{{
CorrelationID: "discovery-chain:" + serviceUID.String(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ load helpers
retry_default curl -f -s localhost:20000/stats -o /dev/null
}

@test "api gateway should have be accepted and not conflicted" {
@test "api gateway should have been accepted and not conflicted" {
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
}
Expand Down Expand Up @@ -63,4 +63,4 @@ load helpers
run retry_long curl -H "Host: foo.bar.baz" -s -f -d hello localhost:9995
[ "$status" -eq 0 ]
[[ "$output" == *"hello"* ]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ load helpers
retry_default curl -f -s localhost:20000/stats -o /dev/null
}

@test "api gateway should have be accepted and not conflicted" {
@test "api gateway should have been accepted and not conflicted" {
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
}
Expand All @@ -31,4 +31,4 @@ load helpers
run retry_default sh -c "curl -s localhost:9998 | grep RBAC"
[ "$status" -eq 0 ]
[[ "$output" == "RBAC: access denied" ]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ load helpers
retry_default curl -f -s localhost:20000/stats -o /dev/null
}

@test "api gateway should have be accepted and not conflicted" {
@test "api gateway should have been accepted and not conflicted" {
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
}
Expand Down Expand Up @@ -45,4 +45,4 @@ load helpers

@test "api gateway should fall back to a connect certificate on conflicted SNI on listener 2" {
assert_cert_has_cn localhost:9998 pri host.consul.example
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ load helpers
retry_default curl -f -s localhost:20000/stats -o /dev/null
}

@test "api gateway should have be accepted and not conflicted" {
@test "api gateway should have been accepted and not conflicted" {
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ load helpers
retry_default curl -f -s localhost:20000/stats -o /dev/null
}

@test "api gateway should have be accepted and not conflicted" {
@test "api gateway should have been accepted and not conflicted" {
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
}
Expand Down Expand Up @@ -40,4 +40,4 @@ load helpers

@test "api gateway should fall back to a connect certificate on conflicted SNI on listener 2" {
assert_cert_has_cn localhost:9998 pri host.consul.example
}
}

0 comments on commit 134aac7

Please sign in to comment.