diff --git a/pkg/catalog/inbound_traffic_policies.go b/pkg/catalog/inbound_traffic_policies.go index d50e6adbce..e3dc7763f4 100644 --- a/pkg/catalog/inbound_traffic_policies.go +++ b/pkg/catalog/inbound_traffic_policies.go @@ -41,6 +41,11 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser trafficTargets = mc.meshSpec.ListTrafficTargets(destinationFilter) } + upstreamSvcSet := mapset.NewSet() + for _, svc := range upstreamServices { + upstreamSvcSet.Add(svc) + } + // A policy (traffic match, route, cluster) must be built for each upstream service. This // includes apex/root services associated with the given upstream service. allUpstreamServices := mc.getUpstreamServicesIncludeApex(upstreamServices) @@ -59,25 +64,36 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser } clusterConfigs = append(clusterConfigs, clusterConfigForSvc) + upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting( + policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc}) + // --- // Create a TrafficMatch for this upstream servic. // The TrafficMatch will be used by LDS to program a filter chain match // for this upstream service on the upstream server to accept inbound // traffic. - trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{ - Name: upstreamSvc.InboundTrafficMatchName(), - DestinationPort: int(upstreamSvc.TargetPort), - DestinationProtocol: upstreamSvc.Protocol, - } - - upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting( - policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc}) - if upstreamTrafficSetting != nil { - trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit + // + // Note: a TrafficMatch must exist only for a service part of the given + // 'upstreamServices' list, and not a virtual (apex) service that + // may be returned as a part of the 'allUpstreamServices' list. + // A virtual (apex) service is required for the purpose of building + // HTTP routing rules, but should not result in a TrafficMatch rule + // as TrafficMatch rules are meant to map to actual services backed + // by a proxy, defined by the 'upstreamServices' list. + if upstreamSvcSet.Contains(upstreamSvc) { + trafficMatchForUpstreamSvc := &trafficpolicy.TrafficMatch{ + Name: upstreamSvc.InboundTrafficMatchName(), + DestinationPort: int(upstreamSvc.TargetPort), + DestinationProtocol: upstreamSvc.Protocol, + ServerNames: []string{upstreamSvc.ServerName()}, + Cluster: upstreamSvc.EnvoyLocalClusterName(), + } + if upstreamTrafficSetting != nil { + trafficMatchForUpstreamSvc.RateLimit = upstreamTrafficSetting.Spec.RateLimit + } + trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc) } - trafficMatches = append(trafficMatches, trafficMatchForUpstreamSvc) - // Build the HTTP route configs for this service and port combination. // If the port's protocol corresponds to TCP, we can skip this step if upstreamSvc.Protocol == constants.ProtocolTCP || upstreamSvc.Protocol == constants.ProtocolTCPServerFirst { diff --git a/pkg/catalog/inbound_traffic_policies_test.go b/pkg/catalog/inbound_traffic_policies_test.go index ba4fada9d6..fe8d5af4d8 100644 --- a/pkg/catalog/inbound_traffic_policies_test.go +++ b/pkg/catalog/inbound_traffic_policies_test.go @@ -307,11 +307,15 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) { Name: "inbound_ns1/mysql-0.mysql_3306_tcp", DestinationPort: 3306, DestinationProtocol: "tcp", + ServerNames: []string{"mysql-0.mysql.ns1.svc.cluster.local"}, + Cluster: "ns1/mysql-0.mysql|3306|local", }, { Name: "inbound_ns1/s2_9090_http", DestinationPort: 9090, DestinationProtocol: "http", + ServerNames: []string{"s2.ns1.svc.cluster.local"}, + Cluster: "ns1/s2|9090|local", }, }, ClustersConfigs: []*trafficpolicy.MeshClusterConfig{ diff --git a/pkg/envoy/lds/inmesh.go b/pkg/envoy/lds/inmesh.go index baad04885a..6a65cd9f00 100644 --- a/pkg/envoy/lds/inmesh.go +++ b/pkg/envoy/lds/inmesh.go @@ -17,7 +17,6 @@ import ( "github.com/openservicemesh/osm/pkg/envoy" "github.com/openservicemesh/osm/pkg/envoy/rds/route" "github.com/openservicemesh/osm/pkg/errcode" - "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/trafficpolicy" ) @@ -26,34 +25,42 @@ const ( outboundMeshTCPProxyStatPrefix = "outbound-mesh-tcp-proxy" ) -func (lb *listenerBuilder) getInboundMeshFilterChains(proxyService service.MeshService) []*xds_listener.FilterChain { +func (lb *listenerBuilder) getInboundMeshFilterChains(trafficMatches []*trafficpolicy.TrafficMatch) []*xds_listener.FilterChain { var filterChains []*xds_listener.FilterChain - // Create protocol specific inbound filter chains for MeshService's TargetPort - switch strings.ToLower(proxyService.Protocol) { - case constants.ProtocolHTTP, constants.ProtocolGRPC: - // Filter chain for HTTP port - filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(proxyService) - if err != nil { - log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort) - } - filterChains = append(filterChains, filterChainForPort) + for _, match := range trafficMatches { + // Create protocol specific inbound filter chains for MeshService's TargetPort + switch strings.ToLower(match.DestinationProtocol) { + case constants.ProtocolHTTP, constants.ProtocolGRPC: + // Filter chain for HTTP port + filterChainForPort, err := lb.getInboundMeshHTTPFilterChain(match) + if err != nil { + log.Error().Err(err).Msgf("Error building inbound HTTP filter chain for traffic match %s", match.Name) + } else { + filterChains = append(filterChains, filterChainForPort) + } - case constants.ProtocolTCP, constants.ProtocolTCPServerFirst: - filterChainForPort, err := lb.getInboundMeshTCPFilterChain(proxyService) - if err != nil { - log.Error().Err(err).Msgf("Error building inbound TCP filter chain for proxy:port %s:%d", proxyService, proxyService.TargetPort) - } - filterChains = append(filterChains, filterChainForPort) + case constants.ProtocolTCP, constants.ProtocolTCPServerFirst: + filterChainForPort, err := lb.getInboundMeshTCPFilterChain(match) + if err != nil { + log.Error().Err(err).Msgf("Error building inbound TCP filter chain for traffic match %s", match.Name) + } else { + filterChains = append(filterChains, filterChainForPort) + } - default: - log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for proxy-service:port %s:%d", proxyService.Protocol, proxyService, proxyService.TargetPort) + default: + log.Error().Msgf("Cannot build inbound filter chain, unsupported protocol %s for traffic match %s", match.DestinationProtocol, match.Name) + } } return filterChains } -func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) { +func (lb *listenerBuilder) getInboundHTTPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) { + if trafficMatch == nil { + return nil, nil + } + var filters []*xds_listener.Filter // Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters. @@ -61,7 +68,7 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic // Apply RBAC policies on the inbound filters based on configured policies rbacFilter, err := lb.buildRBACFilter() if err != nil { - log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService) + log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name) return nil, err } // RBAC filter should be the very first filter in the filter chain @@ -71,7 +78,7 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic // Build the HTTP Connection Manager filter from its options inboundConnManager, err := httpConnManagerOptions{ direction: inbound, - rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(int(proxyService.TargetPort)), + rdsRoutConfigName: route.GetInboundMeshRouteConfigNameForPort(trafficMatch.DestinationPort), // Additional filters wasmStatsHeaders: lb.getWASMStatsHeaders(), @@ -83,12 +90,12 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic tracingAPIEndpoint: lb.cfg.GetTracingEndpoint(), }.build() if err != nil { - return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService) + return nil, errors.Wrapf(err, "Error building inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name) } marshalledInboundConnManager, err := anypb.New(inboundConnManager) if err != nil { - return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and service %s", lb.serviceIdentity, proxyService) + return nil, errors.Wrapf(err, "Error marshalling inbound HTTP connection manager for proxy with identity %s and traffic match %s", lb.serviceIdentity, trafficMatch.Name) } httpConnectionManagerFilter := &xds_listener.Filter{ Name: wellknown.HTTPConnectionManager, @@ -101,11 +108,15 @@ func (lb *listenerBuilder) getInboundHTTPFilters(proxyService service.MeshServic return filters, nil } -func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) { +func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) { + if trafficMatch == nil { + return nil, nil + } + // Construct HTTP filters - filters, err := lb.getInboundHTTPFilters(proxyService) + filters, err := lb.getInboundHTTPFilters(trafficMatch) if err != nil { - log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for proxy service %s", proxyService) + log.Error().Err(err).Msgf("Error constructing inbound HTTP filters for traffic match %s", trafficMatch.Name) return nil, err } @@ -113,26 +124,24 @@ func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.Me marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar)) if err != nil { log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)). - Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService) + Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name) return nil, err } - serverNames := []string{proxyService.ServerName()} - filterChain := &xds_listener.FilterChain{ - Name: proxyService.InboundTrafficMatchName(), + Name: trafficMatch.Name, Filters: filters, // The 'FilterChainMatch' field defines the criteria for matching traffic against filters in this filter chain FilterChainMatch: &xds_listener.FilterChainMatch{ // The DestinationPort is the service port the downstream directs traffic to DestinationPort: &wrapperspb.UInt32Value{ - Value: uint32(proxyService.TargetPort), + Value: uint32(trafficMatch.DestinationPort), }, // The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext() // This is not a field obtained from the mTLS Certificate. - ServerNames: serverNames, + ServerNames: trafficMatch.ServerNames, // Only match when transport protocol is TLS TransportProtocol: envoy.TransportProtocolTLS, @@ -152,11 +161,15 @@ func (lb *listenerBuilder) getInboundMeshHTTPFilterChain(proxyService service.Me return filterChain, nil } -func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.MeshService) (*xds_listener.FilterChain, error) { +func (lb *listenerBuilder) getInboundMeshTCPFilterChain(trafficMatch *trafficpolicy.TrafficMatch) (*xds_listener.FilterChain, error) { + if trafficMatch == nil { + return nil, nil + } + // Construct TCP filters - filters, err := lb.getInboundTCPFilters(proxyService) + filters, err := lb.getInboundTCPFilters(trafficMatch) if err != nil { - log.Error().Err(err).Msgf("Error constructing inbound TCP filters for proxy service %s", proxyService) + log.Error().Err(err).Msgf("Error constructing inbound TCP filters for traffic match %s", trafficMatch.Name) return nil, err } @@ -164,23 +177,21 @@ func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.Mes marshalledDownstreamTLSContext, err := anypb.New(envoy.GetDownstreamTLSContext(lb.serviceIdentity, true /* mTLS */, lb.cfg.GetMeshConfig().Spec.Sidecar)) if err != nil { log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrMarshallingXDSResource)). - Msgf("Error marshalling DownstreamTLSContext for proxy service %s", proxyService) + Msgf("Error marshalling DownstreamTLSContext for traffic match %s", trafficMatch.Name) return nil, err } - serverNames := []string{proxyService.ServerName()} - return &xds_listener.FilterChain{ - Name: proxyService.InboundTrafficMatchName(), + Name: trafficMatch.Name, FilterChainMatch: &xds_listener.FilterChainMatch{ // The DestinationPort is the service port the downstream directs traffic to DestinationPort: &wrapperspb.UInt32Value{ - Value: uint32(proxyService.TargetPort), + Value: uint32(trafficMatch.DestinationPort), }, // The ServerName is the SNI set by the downstream in the UptreamTlsContext by GetUpstreamTLSContext() // This is not a field obtained from the mTLS Certificate. - ServerNames: serverNames, + ServerNames: trafficMatch.ServerNames, // Only match when transport protocol is TLS TransportProtocol: envoy.TransportProtocolTLS, @@ -198,7 +209,11 @@ func (lb *listenerBuilder) getInboundMeshTCPFilterChain(proxyService service.Mes }, nil } -func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService) ([]*xds_listener.Filter, error) { +func (lb *listenerBuilder) getInboundTCPFilters(trafficMatch *trafficpolicy.TrafficMatch) ([]*xds_listener.Filter, error) { + if trafficMatch == nil { + return nil, nil + } + var filters []*xds_listener.Filter // Apply an RBAC filter when permissive mode is disabled. The RBAC filter must be the first filter in the list of filters. @@ -206,7 +221,7 @@ func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService // Apply RBAC policies on the inbound filters based on configured policies rbacFilter, err := lb.buildRBACFilter() if err != nil { - log.Error().Err(err).Msgf("Error applying RBAC filter for proxy service %s", proxyService) + log.Error().Err(err).Msgf("Error applying RBAC filter for traffic match %s", trafficMatch.Name) return nil, err } // RBAC filter should be the very first filter in the filter chain @@ -215,8 +230,8 @@ func (lb *listenerBuilder) getInboundTCPFilters(proxyService service.MeshService // Apply the TCP Proxy Filter tcpProxy := &xds_tcp_proxy.TcpProxy{ - StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, proxyService.EnvoyLocalClusterName()), - ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: proxyService.EnvoyLocalClusterName()}, + StatPrefix: fmt.Sprintf("%s.%s", inboundMeshTCPProxyStatPrefix, trafficMatch.Cluster), + ClusterSpecifier: &xds_tcp_proxy.TcpProxy_Cluster{Cluster: trafficMatch.Cluster}, } marshalledTCPProxy, err := anypb.New(tcpProxy) if err != nil { diff --git a/pkg/envoy/lds/inmesh_test.go b/pkg/envoy/lds/inmesh_test.go index bd14c6fdff..4829f95ef8 100644 --- a/pkg/envoy/lds/inmesh_test.go +++ b/pkg/envoy/lds/inmesh_test.go @@ -226,12 +226,10 @@ func TestGetInboundMeshHTTPFilterChain(t *testing.T) { serviceIdentity: tests.BookbuyerServiceIdentity, } - proxyService := tests.BookbuyerService - testCases := []struct { name string permissiveMode bool - port uint16 + trafficMatch *trafficpolicy.TrafficMatch expectedFilterChainMatch *xds_listener.FilterChainMatch expectedFilterNames []string @@ -240,10 +238,15 @@ func TestGetInboundMeshHTTPFilterChain(t *testing.T) { { name: "inbound HTTP filter chain with permissive mode disabled", permissiveMode: false, - port: 80, + trafficMatch: &trafficpolicy.TrafficMatch{ + Name: "inbound_ns1/svc1_80_http", + DestinationPort: 80, + DestinationProtocol: "http", + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, + }, expectedFilterChainMatch: &xds_listener.FilterChainMatch{ DestinationPort: &wrapperspb.UInt32Value{Value: 80}, - ServerNames: []string{proxyService.ServerName()}, + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, TransportProtocol: "tls", ApplicationProtocols: []string{"osm"}, }, @@ -253,10 +256,15 @@ func TestGetInboundMeshHTTPFilterChain(t *testing.T) { { name: "inbound HTTP filter chain with permissive mode enabled", permissiveMode: true, - port: 90, + trafficMatch: &trafficpolicy.TrafficMatch{ + Name: "inbound_ns1/svc1_90_http", + DestinationPort: 90, + DestinationProtocol: "http", + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, + }, expectedFilterChainMatch: &xds_listener.FilterChainMatch{ DestinationPort: &wrapperspb.UInt32Value{Value: 90}, - ServerNames: []string{proxyService.ServerName()}, + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, TransportProtocol: "tls", ApplicationProtocols: []string{"osm"}, }, @@ -287,8 +295,7 @@ func TestGetInboundMeshHTTPFilterChain(t *testing.T) { mockCatalog.EXPECT().ListInboundTrafficTargetsWithRoutes(lb.serviceIdentity).Return(trafficTargets, nil).Times(1) } - proxyService.TargetPort = tc.port - filterChain, err := lb.getInboundMeshHTTPFilterChain(proxyService) + filterChain, err := lb.getInboundMeshHTTPFilterChain(tc.trafficMatch) assert.Equal(err != nil, tc.expectError) assert.Equal(filterChain.FilterChainMatch, tc.expectedFilterChainMatch) @@ -324,12 +331,10 @@ func TestGetInboundMeshTCPFilterChain(t *testing.T) { serviceIdentity: tests.BookbuyerServiceIdentity, } - proxyService := tests.BookbuyerService - testCases := []struct { name string permissiveMode bool - port uint16 + trafficMatch *trafficpolicy.TrafficMatch expectedFilterChainMatch *xds_listener.FilterChainMatch expectedFilterNames []string @@ -338,10 +343,15 @@ func TestGetInboundMeshTCPFilterChain(t *testing.T) { { name: "inbound TCP filter chain with permissive mode disabled", permissiveMode: false, - port: 80, + trafficMatch: &trafficpolicy.TrafficMatch{ + Name: "inbound_ns1/svc1_80_http", + DestinationPort: 80, + DestinationProtocol: "tcp", + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, + }, expectedFilterChainMatch: &xds_listener.FilterChainMatch{ DestinationPort: &wrapperspb.UInt32Value{Value: 80}, - ServerNames: []string{proxyService.ServerName()}, + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, TransportProtocol: "tls", ApplicationProtocols: []string{"osm"}, }, @@ -352,10 +362,15 @@ func TestGetInboundMeshTCPFilterChain(t *testing.T) { { name: "inbound TCP filter chain with permissive mode enabled", permissiveMode: true, - port: 90, + trafficMatch: &trafficpolicy.TrafficMatch{ + Name: "inbound_ns1/svc1_90_http", + DestinationPort: 90, + DestinationProtocol: "tcp", + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, + }, expectedFilterChainMatch: &xds_listener.FilterChainMatch{ DestinationPort: &wrapperspb.UInt32Value{Value: 90}, - ServerNames: []string{proxyService.ServerName()}, + ServerNames: []string{"svc1.ns1.svc.cluster.local"}, TransportProtocol: "tls", ApplicationProtocols: []string{"osm"}, }, @@ -386,8 +401,7 @@ func TestGetInboundMeshTCPFilterChain(t *testing.T) { mockCatalog.EXPECT().ListInboundTrafficTargetsWithRoutes(lb.serviceIdentity).Return(trafficTargets, nil).Times(1) } - proxyService.TargetPort = tc.port - filterChain, err := lb.getInboundMeshTCPFilterChain(proxyService) + filterChain, err := lb.getInboundMeshTCPFilterChain(tc.trafficMatch) assert.Equal(err != nil, tc.expectError) assert.Equal(filterChain.FilterChainMatch, tc.expectedFilterChainMatch) diff --git a/pkg/envoy/lds/response.go b/pkg/envoy/lds/response.go index dae9e23721..e4bf869db3 100644 --- a/pkg/envoy/lds/response.go +++ b/pkg/envoy/lds/response.go @@ -63,12 +63,14 @@ func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_d Str("proxy", proxy.String()).Msgf("Error looking up MeshServices associated with proxy") return nil, err } - // Create inbound filter chains per service behind proxy - for _, proxyService := range svcList { - // Add in-mesh filter chains - inboundSvcFilterChains := lb.getInboundMeshFilterChains(proxyService) - inboundListener.FilterChains = append(inboundListener.FilterChains, inboundSvcFilterChains...) + // Create inbound mesh filter chains based on mesh traffic policies + inboundMeshTrafficPolicy := meshCatalog.GetInboundMeshTrafficPolicy(lb.serviceIdentity, svcList) + if inboundMeshTrafficPolicy != nil { + inboundListener.FilterChains = append(inboundListener.FilterChains, lb.getInboundMeshFilterChains(inboundMeshTrafficPolicy.TrafficMatches)...) + } + // Create ingress filter chains per service behind proxy + for _, proxyService := range svcList { // Add ingress filter chains ingressFilterChains := lb.getIngressFilterChains(proxyService) inboundListener.FilterChains = append(inboundListener.FilterChains, ingressFilterChains...)