-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: generate endpoints directly from API gateway snapshot #17390
Changes from 7 commits
463c5fe
94188ea
ed82d8c
8f5ea9e
8728cac
a13a37d
35cb61b
f1c14b2
ed11b43
e3e8bb8
5f6d248
4f1e110
550c33a
78994bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ package proxycfg | |
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/hashicorp/consul/acl" | ||
cachetype "github.com/hashicorp/consul/agent/cache-types" | ||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch" | ||
|
@@ -125,10 +124,12 @@ func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, sna | |
return err | ||
} | ||
default: | ||
return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap) | ||
if err := (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
return h.recompileDiscoveryChains(snap) | ||
} | ||
|
||
// handleRootCAUpdate responds to changes in the watched root CA for a gateway | ||
|
@@ -308,15 +309,14 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat | |
DestinationNamespace: service.NamespaceOrDefault(), | ||
DestinationPartition: service.PartitionOrDefault(), | ||
LocalBindPort: listener.Port, | ||
// TODO IngressHosts: g.Hosts, | ||
// Pass the protocol that was configured on the listener in order | ||
// to force that protocol on the Envoy listener. | ||
Config: map[string]interface{}{ | ||
"protocol": "http", | ||
}, | ||
} | ||
|
||
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port} | ||
listenerKey := APIGatewayListenerKeyFromListener(listener) | ||
upstreams[listenerKey] = append(upstreams[listenerKey], upstream) | ||
} | ||
|
||
|
@@ -370,7 +370,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat | |
}, | ||
} | ||
|
||
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port} | ||
listenerKey := APIGatewayListenerKeyFromListener(listener) | ||
upstreams[listenerKey] = append(upstreams[listenerKey], upstream) | ||
} | ||
|
||
|
@@ -420,6 +420,42 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat | |
return nil | ||
} | ||
|
||
func (h *handlerAPIGateway) recompileDiscoveryChains(snap *ConfigSnapshot) error { | ||
synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{} | ||
|
||
for name, listener := range snap.APIGateway.Listeners { | ||
boundListener, ok := snap.APIGateway.BoundListeners[name] | ||
if !(ok && snap.APIGateway.GatewayConfig.ListenerIsReady(name)) { | ||
// Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again. | ||
// skip any listeners that might be in an invalid state | ||
continue | ||
} | ||
|
||
// Create a synthesized discovery chain for each service. | ||
services, upstreams, compiled, err := snap.APIGateway.synthesizeChains(h.source.Datacenter, listener, boundListener) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(upstreams) == 0 { | ||
// skip if we can't construct any upstreams | ||
continue | ||
} | ||
|
||
for i, service := range services { | ||
id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta)) | ||
synthesizedChains[id] = compiled[i] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it guaranteed that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For what its worth, I believe this was code was copied directly from the ToIngress function. I'll add a quick sanity check though since I'm not sure if its guaranteed. |
||
} | ||
} | ||
|
||
// Merge in additional discovery chains | ||
for id, chain := range synthesizedChains { | ||
snap.APIGateway.DiscoveryChain[id] = chain | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// referenceIsForListener returns whether the provided structs.ResourceReference | ||
// targets the provided structs.APIGatewayListener. For this to be true, the kind | ||
// and name must match the structs.APIGatewayConfigEntry containing the listener, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,13 +41,7 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh | |
case structs.ServiceKindIngressGateway: | ||
return s.endpointsFromSnapshotIngressGateway(cfgSnap) | ||
case structs.ServiceKindAPIGateway: | ||
// TODO Find a cleaner solution, can't currently pass unexported property types | ||
var err error | ||
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return s.endpointsFromSnapshotIngressGateway(cfgSnap) | ||
return s.endpointsFromSnapshotAPIGateway(cfgSnap) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the crux of the whole change that we're making: instead of converting to an ingress gateway snapshot and generating xDS resources from that, we generate xDS resources directly from our API gateway snapshot 🎉 |
||
default: | ||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) | ||
} | ||
|
@@ -527,6 +521,91 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf | |
return resources, nil | ||
} | ||
|
||
// helper struct to persist upstream parent information when ready upstream list is built out | ||
type readyUpstreams struct { | ||
listenerKey proxycfg.APIGatewayListenerKey | ||
listenerCfg structs.APIGatewayListener | ||
boundListenerCfg structs.BoundAPIGatewayListener | ||
routeReference structs.ResourceReference | ||
upstreams []structs.Upstream | ||
} | ||
|
||
// getReadyUpstreams returns a map containing the list of upstreams for each listener that is ready | ||
func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[string]readyUpstreams { | ||
|
||
ready := map[string]readyUpstreams{} | ||
for _, l := range cfgSnap.APIGateway.Listeners { | ||
// Only include upstreams for listeners that are ready | ||
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) { | ||
continue | ||
} | ||
|
||
// For each route bound to the listener | ||
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name] | ||
for _, routeRef := range boundListener.Routes { | ||
// Get all upstreams for the route | ||
routeUpstreams := cfgSnap.APIGateway.Upstreams[routeRef] | ||
|
||
// Filter to upstreams that attach to this specific listener since | ||
// a route can bind to + have upstreams for multiple listeners | ||
listenerKey := proxycfg.APIGatewayListenerKeyFromListener(l) | ||
routeUpstreamsForListener := routeUpstreams[listenerKey] | ||
nathancoleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for _, upstream := range routeUpstreamsForListener { | ||
// Insert or update readyUpstreams for the listener to include this upstream | ||
r, ok := ready[l.Name] | ||
if !ok { | ||
r = readyUpstreams{ | ||
listenerKey: listenerKey, | ||
listenerCfg: l, | ||
boundListenerCfg: boundListener, | ||
routeReference: routeRef, | ||
} | ||
} | ||
r.upstreams = append(r.upstreams, upstream) | ||
ready[l.Name] = r | ||
} | ||
} | ||
} | ||
return ready | ||
} | ||
|
||
func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { | ||
var resources []proto.Message | ||
createdClusters := make(map[proxycfg.UpstreamID]bool) | ||
nathancoleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
readyUpstreamsList := getReadyUpstreams(cfgSnap) | ||
|
||
for _, readyUpstreams := range readyUpstreamsList { | ||
for _, u := range readyUpstreams.upstreams { | ||
uid := proxycfg.NewUpstreamID(&u) | ||
|
||
// If we've already created endpoints for this upstream, skip it. Multiple listeners may | ||
// reference the same upstream, so we don't need to create duplicate endpoints in that case. | ||
if createdClusters[uid] { | ||
continue | ||
} | ||
|
||
es, err := s.endpointsFromDiscoveryChain( | ||
nathancoleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
uid, | ||
cfgSnap.APIGateway.DiscoveryChain[uid], | ||
cfgSnap, | ||
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition}, | ||
u.Config, | ||
cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid], | ||
cfgSnap.APIGateway.WatchedGatewayEndpoints[uid], | ||
false, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
resources = append(resources, es...) | ||
createdClusters[uid] = true | ||
} | ||
} | ||
return resources, nil | ||
} | ||
|
||
// used in clusters.go | ||
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint { | ||
return &envoy_endpoint_v3.LbEndpoint{ | ||
|
@@ -628,6 +707,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( | |
|
||
var escapeHatchCluster *envoy_cluster_v3.Cluster | ||
if !forMeshGateway { | ||
|
||
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap) | ||
if err != nil { | ||
// Don't hard fail on a config typo, just warn. The parse func returns | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we would recompile the discovery chain every time xDS resources were generated, sort of a "pull" style trigger. Instead, this will recompile the discovery chains every time a related resource that we're watching changes, so more of a "push" style trigger.