Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
envoy|catalog: use TrafficMatch to build inbound filter config (#4814)
Browse files Browse the repository at this point in the history
Similar to how outbound filter config is built, this change
relies on the TrafficMatch type to build inbound filter
configs. The TrafficMatch has a 1-1 mapping to an Envoy
filter_chain config. This is necessary to be able to
cleanly apply TrafficMatch based policies (e.g. rate
limiting) without needing to rely on multiple catalog
APIs.

Part of #2018

Signed-off-by: Shashank Ram <shashr2204@gmail.com>
  • Loading branch information
shashankram authored Jun 14, 2022
1 parent c90f07a commit 3f72969
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 81 deletions.
40 changes: 28 additions & 12 deletions pkg/catalog/inbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
trafficTargets = mc.meshSpec.ListTrafficTargets(destinationFilter)
}

upstreamSvcSet := mapset.NewSet()
for _, svc := range upstreamServices {
upstreamSvcSet.Add(svc)
}

// A policy (traffic match, route, cluster) must be built for each upstream service. This
// includes apex/root services associated with the given upstream service.
allUpstreamServices := mc.getUpstreamServicesIncludeApex(upstreamServices)
Expand All @@ -59,25 +64,36 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
}
clusterConfigs = append(clusterConfigs, clusterConfigForSvc)

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc})

// ---
// Create a TrafficMatch for this upstream servic.
// The TrafficMatch will be used by LDS to program a filter chain match
// for this upstream service on the upstream server to accept inbound
// traffic.
trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{
Name: upstreamSvc.InboundTrafficMatchName(),
DestinationPort: int(upstreamSvc.TargetPort),
DestinationProtocol: upstreamSvc.Protocol,
}

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc})
if upstreamTrafficSetting != nil {
trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit
//
// Note: a TrafficMatch must exist only for a service part of the given
// 'upstreamServices' list, and not a virtual (apex) service that
// may be returned as a part of the 'allUpstreamServices' list.
// A virtual (apex) service is required for the purpose of building
// HTTP routing rules, but should not result in a TrafficMatch rule
// as TrafficMatch rules are meant to map to actual services backed
// by a proxy, defined by the 'upstreamServices' list.
if upstreamSvcSet.Contains(upstreamSvc) {
trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{
Name: upstreamSvc.InboundTrafficMatchName(),
DestinationPort: int(upstreamSvc.TargetPort),
DestinationProtocol: upstreamSvc.Protocol,
ServerNames: []string{upstreamSvc.ServerName()},
Cluster: upstreamSvc.EnvoyLocalClusterName(),
}
if upstreamTrafficSetting != nil {
trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit
}
trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc)
}

trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc)

// Build the HTTP route configs for this service and port combination.
// If the port's protocol corresponds to TCP, we can skip this step
if upstreamSvc.Protocol == constants.ProtocolTCP || upstreamSvc.Protocol == constants.ProtocolTCPServerFirst {
Expand Down
4 changes: 4 additions & 0 deletions pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,15 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
Name: "inbound_ns1/mysql-0.mysql_3306_tcp",
DestinationPort: 3306,
DestinationProtocol: "tcp",
ServerNames: []string{"mysql-0.mysql.ns1.svc.cluster.local"},
Cluster: "ns1/mysql-0.mysql|3306|local",
},
{
Name: "inbound_ns1/s2_9090_http",
DestinationPort: 9090,
DestinationProtocol: "http",
ServerNames: []string{"s2.ns1.svc.cluster.local"},
Cluster: "ns1/s2|9090|local",
},
},
ClustersConfigs: []*trafficpolicy.MeshClusterConfig{
Expand Down
107 changes: 61 additions & 46 deletions pkg/envoy/lds/inmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/envoy/rds/route"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)

Expand All @@ -26,42 +25,50 @@ const (
outboundMeshTCPProxyStatPrefix = "outbound-mesh-tcp-proxy"
)

func (lb *listenerBuilder) getInboundMeshFilterChains(proxyService service.MeshService) []*xds_listener.FilterChain {
func (lb *listenerBuilder) getInboundMeshFilterChains(trafficMatches []*trafficpolicy.TrafficMatch) []*xds_listener.FilterChain {
var filterChains []*xds_listener.FilterChain

// Create protocol specific inbound filter chains for MeshService's TargetPort
switch strings.ToLower(proxyService.Protocol) {
case constants.ProtocolHTTP, constants.ProtocolGRPC:
// Filter chain for HTTP port
filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(proxyService)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort)
}
filterChains = append(filterChains, filterChainForPort)
for _, match := range trafficMatches {
// Create protocol specific inbound filter chains for MeshService's TargetPort
switch strings.ToLower(match.DestinationProtocol) {
case constants.ProtocolHTTP, constants.ProtocolGRPC:
// Filter chain for HTTP port
filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(match)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for traffic match %s", match.Name)
} else {
filterChains = append(filterChains, filterChainForPort)
}

case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
filterChainForPort, err := lb.getInboundMeshTCPFilterChain(proxyService)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound TCP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort)
}
filterChains = append(filterChains, filterChainForPort)
case constants.ProtocolTCP, constants.ProtocolTCPServerFirst:
filterChainForPort, err := lb.getInboundMeshTCPFilterChain(match)
if err != nil {
log.Error().Err(err).Msgf("Error building inbound TCP filter chain for traffic match %s", match.Name)
} else {
filterChains = append(filterChains, filterChainForPort)
}

default:
log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for proxy-service:port %s:%d", proxyService.Protocol, proxyService, proxyService.TargetPort)
default:
log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for traffic match %s", match.DestinationProtocol, match.Name)
}
}

return filterChains
}

func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) {
func (lb *listenerBuilder) getInboundHTTPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) {
if trafficMatch == nil {
return nil, nil
}

var filters []*xds_listener.Filter

// Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters.
if !lb.cfg.IsPermissiveTrafficPolicyMode() {
// Apply RBAC policies on the inbound filters based on configured policies
rbacFilter, err := lb.buildRBACFilter()
if err != nil {
log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name)
return nil, err
}
// RBAC filter should be the very first filter in the filter chain
Expand All @@ -71,7 +78,7 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
// Build the HTTP Connection Manager filter from its options
inboundConnManager, err := httpConnManagerOptions{
direction: inbound,
rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(int(proxyService.TargetPort)),
rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(trafficMatch.DestinationPort),

// Additional filters
wasmStatsHeaders: lb.getWASMStatsHeaders(),
Expand All @@ -83,12 +90,12 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
tracingAPIEndpoint: lb.cfg.GetTracingEndpoint(),
}.build()
if err != nil {
return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService)
return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name)
}

marshalledInboundConnManager, err := anypb.New(inboundConnManager)
if err != nil {
return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService)
return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name)
}
httpConnectionManagerFilter := &xds_listener.Filter{
Name: wellknown.HTTPConnectionManager,
Expand All @@ -101,38 +108,40 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic
return filters, nil
}

func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) {
func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) {
if trafficMatch == nil {
return nil, nil
}

// Construct HTTP filters
filters, err := lb.getInboundHTTPFilters(proxyService)
filters, err := lb.getInboundHTTPFilters(trafficMatch)
if err != nil {
log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for traffic match %s", trafficMatch.Name)
return nil, err
}

// Construct downstream TLS context
marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar))
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)).
Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService)
Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name)
return nil, err
}

serverNames := []string{proxyService.ServerName()}

filterChain := &xds_listener.FilterChain{
Name: proxyService.InboundTrafficMatchName(),
Name: trafficMatch.Name,
Filters: filters,

// The 'FilterChainMatch' field defines the criteria for matching traffic against filters in this filter chain
FilterChainMatch: &xds_listener.FilterChainMatch{
// The DestinationPort is the service port the downstream directs traffic to
DestinationPort: &wrapperspb.UInt32Value{
Value: uint32(proxyService.TargetPort),
Value: uint32(trafficMatch.DestinationPort),
},

// The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext()
// This is not a field obtained from the mTLS Certificate.
ServerNames: serverNames,
ServerNames: trafficMatch.ServerNames,

// Only match when transport protocol is TLS
TransportProtocol: envoy.TransportProtocolTLS,
Expand All @@ -152,35 +161,37 @@ func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.Me
return filterChain, nil
}

func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) {
func (lb *listenerBuilder) getInboundMeshTCPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) {
if trafficMatch == nil {
return nil, nil
}

// Construct TCP filters
filters, err := lb.getInboundTCPFilters(proxyService)
filters, err := lb.getInboundTCPFilters(trafficMatch)
if err != nil {
log.Error().Err(err).Msgf("Error constructing inbound TCP filters for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error constructing inbound TCP filters for traffic match %s", trafficMatch.Name)
return nil, err
}

// Construct downstream TLS context
marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar))
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)).
Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService)
Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name)
return nil, err
}

serverNames := []string{proxyService.ServerName()}

return &xds_listener.FilterChain{
Name: proxyService.InboundTrafficMatchName(),
Name: trafficMatch.Name,
FilterChainMatch: &xds_listener.FilterChainMatch{
// The DestinationPort is the service port the downstream directs traffic to
DestinationPort: &wrapperspb.UInt32Value{
Value: uint32(proxyService.TargetPort),
Value: uint32(trafficMatch.DestinationPort),
},

// The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext()
// This is not a field obtained from the mTLS Certificate.
ServerNames: serverNames,
ServerNames: trafficMatch.ServerNames,

// Only match when transport protocol is TLS
TransportProtocol: envoy.TransportProtocolTLS,
Expand All @@ -198,15 +209,19 @@ func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.Mes
}, nil
}

func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) {
func (lb *listenerBuilder) getInboundTCPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) {
if trafficMatch == nil {
return nil, nil
}

var filters []*xds_listener.Filter

// Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters.
if !lb.cfg.IsPermissiveTrafficPolicyMode() {
// Apply RBAC policies on the inbound filters based on configured policies
rbacFilter, err := lb.buildRBACFilter()
if err != nil {
log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService)
log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name)
return nil, err
}
// RBAC filter should be the very first filter in the filter chain
Expand All @@ -215,8 +230,8 @@ func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService

// Apply the TCP Proxy Filter
tcpProxy := &xds_tcp_proxy.TcpProxy{
StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, proxyService.EnvoyLocalClusterName()),
ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: proxyService.EnvoyLocalClusterName()},
StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, trafficMatch.Cluster),
ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: trafficMatch.Cluster},
}
marshalledTCPProxy, err := anypb.New(tcpProxy)
if err != nil {
Expand Down
Loading

0 comments on commit 3f72969

Please sign in to comment.