Skip to content

Commit

Permalink
process envoy resource by walking the map. use a map rather than arra…
Browse files Browse the repository at this point in the history
…y for envoy resource to prevent duplication.
  • Loading branch information
jmurret committed Nov 1, 2023
1 parent 3d8004d commit ba56049
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 533 deletions.
82 changes: 40 additions & 42 deletions agent/xdsv2/cluster_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package xdsv2
import (
"errors"
"fmt"

envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
Expand All @@ -19,43 +18,13 @@ import (
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate"
)

func (pr *ProxyResources) doesEnvoyClusterAlreadyExist(name string) bool {
// TODO(proxystate): consider using a map instead of [] for this kind of lookup
for _, envoyCluster := range pr.envoyResources[xdscommon.ClusterType] {
if envoyCluster.(*envoy_cluster_v3.Cluster).Name == name {
return true
}
}
return false
}

func (pr *ProxyResources) makeXDSClusters() ([]proto.Message, error) {
clusters := make([]proto.Message, 0)

for clusterName := range pr.proxyState.Clusters {
protoCluster, err := pr.makeClusters(clusterName)
// TODO: aggregate errors for clusters and still return any properly formed clusters.
if err != nil {
return nil, err
}
clusters = append(clusters, protoCluster...)
}

return clusters, nil
}

func (pr *ProxyResources) makeClusters(name string) ([]proto.Message, error) {
clusters := make([]proto.Message, 0)
func (pr *ProxyResources) makeClusters(name string) (map[string]proto.Message, error) {
clusters := make(map[string]proto.Message)
proxyStateCluster, ok := pr.proxyState.Clusters[name]
if !ok {
return nil, fmt.Errorf("cluster %q not found", name)
}

if pr.doesEnvoyClusterAlreadyExist(name) {
// don't error
return []proto.Message{}, nil
}

switch proxyStateCluster.Group.(type) {
case *pbproxystate.Cluster_FailoverGroup:
fg := proxyStateCluster.GetFailoverGroup()
Expand All @@ -64,7 +33,7 @@ func (pr *ProxyResources) makeClusters(name string) ([]proto.Message, error) {
return nil, err
}
for _, c := range clusters {
clusters = append(clusters, c)
clusters[c.Name] = c
}

case *pbproxystate.Cluster_EndpointGroup:
Expand All @@ -73,7 +42,7 @@ func (pr *ProxyResources) makeClusters(name string) ([]proto.Message, error) {
if err != nil {
return nil, err
}
clusters = append(clusters, cluster)
clusters[cluster.Name] = cluster

default:
return nil, errors.New("cluster group type should be Endpoint Group or Failover Group")
Expand Down Expand Up @@ -207,8 +176,8 @@ func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol pbpr
return cluster, nil
}

func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbproxystate.Protocol, fg *pbproxystate.FailoverGroup) ([]*envoy_cluster_v3.Cluster, error) {
var clusters []*envoy_cluster_v3.Cluster
func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbproxystate.Protocol, fg *pbproxystate.FailoverGroup) (map[string]*envoy_cluster_v3.Cluster, error) {
clusters := make(map[string]*envoy_cluster_v3.Cluster)
if fg != nil {
var egNames []string
for _, eg := range fg.EndpointGroups {
Expand All @@ -217,7 +186,7 @@ func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbprox
return nil, err
}
egNames = append(egNames, cluster.Name)
clusters = append(clusters, cluster)
clusters[cluster.Name] = cluster
}
aggregateClusterConfig, err := anypb.New(&envoy_aggregate_cluster_v3.ClusterConfig{
Clusters: egNames,
Expand Down Expand Up @@ -245,7 +214,7 @@ func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol pbprox
if err != nil {
return nil, err
}
clusters = append(clusters, c)
clusters[c.Name] = c
}
return clusters, nil
}
Expand Down Expand Up @@ -376,9 +345,38 @@ func addEnvoyLBToCluster(dynamicConfig *pbproxystate.DynamicEndpointGroupConfig,
return nil
}

// TODO(proxystate): In a future PR this will create clusters and add it to ProxyResources.proxyState
// Currently, we do not traverse the listener -> endpoint paths and instead just generate each resource by iterating
// through its top level map. In the future we want to traverse these paths to ensure each listener has a cluster, etc.
func (pr *ProxyResources) makeEnvoyClusterFromL4Destination(l4 *pbproxystate.L4Destination) error {
switch l4.Destination.(type) {
case *pbproxystate.L4Destination_Cluster:
clusterName := l4.GetCluster().Name
clusters, _ := pr.makeClusters(clusterName)
for name, cluster := range clusters {
pr.envoyResources[xdscommon.ClusterType][name] = cluster
}

eps := pr.proxyState.GetEndpoints()[clusterName]
if eps != nil {
protoEndpoint := makeEnvoyClusterLoadAssignment(clusterName, eps.Endpoints)
pr.envoyResources[xdscommon.EndpointType][protoEndpoint.ClusterName] = protoEndpoint
}

case *pbproxystate.L4Destination_WeightedClusters:
psWeightedClusters := l4.GetWeightedClusters()
for _, psCluster := range psWeightedClusters.GetClusters() {
clusters, _ := pr.makeClusters(psCluster.Name)
for name, cluster := range clusters {
pr.envoyResources[xdscommon.ClusterType][name] = cluster
}

eps := pr.proxyState.GetEndpoints()[psCluster.Name]
if eps != nil {
protoEndpoint := makeEnvoyClusterLoadAssignment(psCluster.Name, eps.Endpoints)
pr.envoyResources[xdscommon.EndpointType][psCluster.Name] = protoEndpoint
}
}
default:
return errors.New("cluster group type should be Endpoint Group or Failover Group")
}

return nil
}
16 changes: 7 additions & 9 deletions agent/xdsv2/listener_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,16 @@ const (
envoyHttpConnectionManagerFilterName = "envoy.filters.network.http_connection_manager"
)

func (pr *ProxyResources) makeXDSListeners() ([]proto.Message, error) {
listeners := make([]proto.Message, 0)

func (pr *ProxyResources) makeXDSListeners() error {
for _, l := range pr.proxyState.Listeners {
protoListener, err := pr.makeListener(l)
// TODO: aggregate errors for listeners and still return any properly formed listeners.
if err != nil {
return nil, err
return err
}
listeners = append(listeners, protoListener)
pr.envoyResources[xdscommon.ListenerType][protoListener.Name] = protoListener
}
return listeners, nil
return nil
}

func (pr *ProxyResources) makeListener(listener *pbproxystate.Listener) (*envoy_listener_v3.Listener, error) {
Expand Down Expand Up @@ -309,7 +307,7 @@ func (pr *ProxyResources) makeEnvoyResourcesForL4Destination(l4 *pbproxystate.Ro
if err != nil {
return nil, err
}
envoyFilters, err := makeL4Filters(l4.L4)
envoyFilters, err := pr.makeL4Filters(l4.L4)
return envoyFilters, err
}

Expand All @@ -334,7 +332,7 @@ func getAlpnProtocols(protocol pbproxystate.L7Protocol) []string {
return alpnProtocols
}

func makeL4Filters(l4 *pbproxystate.L4Destination) ([]*envoy_listener_v3.Filter, error) {
func (pr *ProxyResources) makeL4Filters(l4 *pbproxystate.L4Destination) ([]*envoy_listener_v3.Filter, error) {
var envoyFilters []*envoy_listener_v3.Filter
if l4 != nil {
rbacFilters, err := MakeRBACNetworkFilters(l4.TrafficPermissions)
Expand Down Expand Up @@ -442,7 +440,7 @@ func (pr *ProxyResources) makeL7Filters(l7 *pbproxystate.L7Destination) ([]*envo
}
} else {
// Add Envoy route under the route resource since it's not inlined.
pr.envoyResources[xdscommon.RouteType] = append(pr.envoyResources[xdscommon.RouteType], routeConfig)
pr.envoyResources[xdscommon.RouteType][routeConfig.Name] = routeConfig

httpConnMgr.RouteSpecifier = &envoy_http_v3.HttpConnectionManager_Rds{
Rds: &envoy_http_v3.Rds{
Expand Down
43 changes: 19 additions & 24 deletions agent/xdsv2/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,41 @@ func NewResourceGenerator(

type ProxyResources struct {
proxyState *proxytracker.ProxyState
envoyResources map[string][]proto.Message
envoyResources map[string]map[string]proto.Message
}

func (g *ResourceGenerator) AllResourcesFromIR(proxyState *proxytracker.ProxyState) (map[string][]proto.Message, error) {
pr := &ProxyResources{
proxyState: proxyState,
envoyResources: make(map[string][]proto.Message),
envoyResources: make(map[string]map[string]proto.Message),
}
pr.envoyResources[xdscommon.ListenerType] = make(map[string]proto.Message)
pr.envoyResources[xdscommon.RouteType] = make(map[string]proto.Message)
pr.envoyResources[xdscommon.ClusterType] = make(map[string]proto.Message)
pr.envoyResources[xdscommon.EndpointType] = make(map[string]proto.Message)

err := pr.generateXDSResources()
if err != nil {
return nil, fmt.Errorf("failed to generate xDS resources for ProxyState: %v", err)
}
return pr.envoyResources, nil
envoyResources := make(map[string][]proto.Message)
envoyResources[xdscommon.ListenerType] = make([]proto.Message, 0)
envoyResources[xdscommon.RouteType] = make([]proto.Message, 0)
envoyResources[xdscommon.ClusterType] = make([]proto.Message, 0)
envoyResources[xdscommon.EndpointType] = make([]proto.Message, 0)
for resourceTypeName, resourceMap := range pr.envoyResources {
for _, resource := range resourceMap {
envoyResources[resourceTypeName] = append(envoyResources[resourceTypeName], resource)
}
}
return envoyResources, nil
}

func (pr *ProxyResources) generateXDSResources() error {
listeners, err := pr.makeXDSListeners()
if err != nil {
return err
}

pr.envoyResources[xdscommon.ListenerType] = listeners

clusters, err := pr.makeXDSClusters()
if err != nil {
return err
}
pr.envoyResources[xdscommon.ClusterType] = clusters

endpoints, err := pr.makeXDSEndpoints()
if err != nil {
return err
}
pr.envoyResources[xdscommon.EndpointType] = endpoints

routes, err := pr.makeXDSRoutes()
err := pr.makeXDSListeners()
if err != nil {
return err
}
pr.envoyResources[xdscommon.RouteType] = routes

return nil
}
35 changes: 29 additions & 6 deletions agent/xdsv2/route_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,32 @@ func (pr *ProxyResources) makeEnvoyRouteActionFromProxystateRouteDestination(psR
Cluster: psCluster.GetName(),
}
clusters, _ := pr.makeClusters(psCluster.Name)
pr.envoyResources[xdscommon.ClusterType] = append(pr.envoyResources[xdscommon.ClusterType], clusters...)
for name, cluster := range clusters {
pr.envoyResources[xdscommon.ClusterType][name] = cluster
}

eps := pr.proxyState.GetEndpoints()[psCluster.Name]
if eps != nil {
protoEndpoint := makeEnvoyClusterLoadAssignment(psCluster.Name, eps.Endpoints)
pr.envoyResources[xdscommon.EndpointType][protoEndpoint.ClusterName] = protoEndpoint
}

case *pbproxystate.RouteDestination_WeightedClusters:
psWeightedClusters := psRouteDestination.GetWeightedClusters()
envoyClusters := make([]*envoy_route_v3.WeightedCluster_ClusterWeight, 0, len(psWeightedClusters.GetClusters()))
totalWeight := 0
for _, psCluster := range psWeightedClusters.GetClusters() {
clusters, _ := pr.makeClusters(psCluster.Name)
pr.envoyResources[xdscommon.ClusterType] = append(pr.envoyResources[xdscommon.ClusterType], clusters...)
for name, cluster := range clusters {
pr.envoyResources[xdscommon.ClusterType][name] = cluster
}

eps := pr.proxyState.GetEndpoints()[psCluster.Name]
if eps != nil {
protoEndpoint := makeEnvoyClusterLoadAssignment(psCluster.Name, eps.Endpoints)
pr.envoyResources[xdscommon.EndpointType][protoEndpoint.ClusterName] = protoEndpoint
}

totalWeight += int(psCluster.Weight.GetValue())
envoyClusters = append(envoyClusters, makeEnvoyClusterWeightFromProxystateWeightedCluster(psCluster))
}
Expand Down Expand Up @@ -318,10 +335,7 @@ func (pr *ProxyResources) makeEnvoyRouteActionFromProxystateRouteDestination(psR
}

func makeEnvoyClusterWeightFromProxystateWeightedCluster(cluster *pbproxystate.L7WeightedDestinationCluster) *envoy_route_v3.WeightedCluster_ClusterWeight {
envoyClusterWeight := &envoy_route_v3.WeightedCluster_ClusterWeight{
Name: cluster.GetName(),
Weight: cluster.GetWeight(),
}
envoyClusterWeight := makeEnvoyClusterWeightFromNameAndWeight(cluster.GetName(), cluster.GetWeight())

for _, hm := range cluster.GetHeaderMutations() {
injectEnvoyClusterWeightWithProxystateHeaderMutation(envoyClusterWeight, hm)
Expand All @@ -330,6 +344,15 @@ func makeEnvoyClusterWeightFromProxystateWeightedCluster(cluster *pbproxystate.L
return envoyClusterWeight
}

func makeEnvoyClusterWeightFromNameAndWeight(name string, weight *wrapperspb.UInt32Value) *envoy_route_v3.WeightedCluster_ClusterWeight {
envoyClusterWeight := &envoy_route_v3.WeightedCluster_ClusterWeight{
Name: name,
Weight: weight,
}

return envoyClusterWeight
}

func injectEnvoyClusterWeightWithProxystateHeaderMutation(envoyClusterWeight *envoy_route_v3.WeightedCluster_ClusterWeight, mutation *pbproxystate.HeaderMutation) {
mutation.GetAction()
switch mutation.GetAction().(type) {
Expand Down
48 changes: 0 additions & 48 deletions agent/xdsv2/testdata/routes/source/l7-expose-paths.golden
Original file line number Diff line number Diff line change
@@ -1,53 +1,5 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
"name": "exposed_path_route_GetHealth1235",
"virtualHosts": [
{
"name": "exposed_path_route_GetHealth1235",
"domains": [
"*"
],
"routes": [
{
"match": {
"path": "GetHealth"
},
"route": {
"cluster": "exposed_cluster_9091"
}
}
]
}
],
"validateClusters": true
},
{
"@type": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
"name": "exposed_path_route_health1234",
"virtualHosts": [
{
"name": "exposed_path_route_health1234",
"domains": [
"*"
],
"routes": [
{
"match": {
"path": "/health"
},
"route": {
"cluster": "exposed_cluster_9090"
}
}
]
}
],
"validateClusters": true
}
],
"typeUrl": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
"nonce": "00000001"
}
Loading

0 comments on commit ba56049

Please sign in to comment.