diff --git a/pkg/envoy/cds/cluster.go b/pkg/envoy/cds/cluster.go index c17deccfbd..2ecf4a5fb3 100644 --- a/pkg/envoy/cds/cluster.go +++ b/pkg/envoy/cds/cluster.go @@ -7,9 +7,9 @@ import ( xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" - "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/wrappers" + "github.com/pkg/errors" "github.com/openservicemesh/osm/pkg/catalog" "github.com/openservicemesh/osm/pkg/configurator" @@ -23,6 +23,9 @@ const ( clusterConnectTimeout = 1 * time.Second ) +// ErrMultiplePorts is an error for describing that a Kubernetes Service which exposes multiple ports in not supported +var ErrMultiplePorts = errors.New("Error multiple ports on service not supported") + // getUpstreamServiceCluster returns an Envoy Cluster corresponding to the given upstream service func getUpstreamServiceCluster(upstreamSvc, downstreamSvc service.MeshService, cfg configurator.Configurator) (*xds_cluster.Cluster, error) { clusterName := upstreamSvc.String() @@ -97,13 +100,18 @@ func getLocalServiceCluster(catalog catalog.MeshCataloger, proxyServiceName serv Http2ProtocolOptions: &xds_core.Http2ProtocolOptions{}, } - endpoints, err := catalog.ListEndpointsForService(proxyServiceName) + ports, err := catalog.GetPortToProtocolMappingForService(proxyServiceName) if err != nil { - log.Error().Err(err).Msgf("Failed to get endpoints for service %s", proxyServiceName) + log.Error().Err(err).Msgf("Failed to get ports for service %s", proxyServiceName) return nil, err } - for _, ep := range endpoints { + if len(ports) > 1 { + log.Error().Err(ErrMultiplePorts).Msgf("Error multiple ports on service %s in namespace %s", proxyServiceName.Name, proxyServiceName.Namespace) + return nil, ErrMultiplePorts + } + + for port := range ports { localityEndpoint := &xds_endpoint.LocalityLbEndpoints{ Locality: &xds_core.Locality{ Zone: "zone", @@ -111,7 +119,7 @@ func getLocalServiceCluster(catalog catalog.MeshCataloger, proxyServiceName serv LbEndpoints: []*xds_endpoint.LbEndpoint{{ HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ Endpoint: &xds_endpoint.Endpoint{ - Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(ep.Port)), + Address: envoy.GetAddress(constants.WildcardIPAddr, port), }, }, LoadBalancingWeight: &wrappers.UInt32Value{ diff --git a/pkg/envoy/cds/cluster_test.go b/pkg/envoy/cds/cluster_test.go index 2b8d12f9ec..40a0de5725 100644 --- a/pkg/envoy/cds/cluster_test.go +++ b/pkg/envoy/cds/cluster_test.go @@ -1,7 +1,6 @@ package cds import ( - "net" "testing" xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" @@ -15,7 +14,6 @@ import ( "github.com/openservicemesh/osm/pkg/catalog" "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/constants" - "github.com/openservicemesh/osm/pkg/endpoint" "github.com/openservicemesh/osm/pkg/envoy" "github.com/openservicemesh/osm/pkg/service" "github.com/openservicemesh/osm/pkg/tests" @@ -78,14 +76,15 @@ func TestGetLocalServiceCluster(t *testing.T) { testCases := []struct { name string - endpoints []endpoint.Endpoint + portToProtocolMapping map[uint32]string expectedLocalityLbEndpoints []*xds_endpoint.LocalityLbEndpoints expectedLbPolicy xds_cluster.Cluster_LbPolicy expectedProtocolSelection xds_cluster.Cluster_ClusterProtocolSelection + expectedErr bool }{ { - name: "when service returns one endpoint", - endpoints: []endpoint.Endpoint{tests.Endpoint}, + name: "when service returns one endpoint", + portToProtocolMapping: map[uint32]string{uint32(8080): "something"}, expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{ { Locality: &xds_core.Locality{ @@ -94,7 +93,7 @@ func TestGetLocalServiceCluster(t *testing.T) { LbEndpoints: []*xds_endpoint.LbEndpoint{{ HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ Endpoint: &xds_endpoint.Endpoint{ - Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(tests.ServicePort)), + Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(8080)), }, }, LoadBalancingWeight: &wrappers.UInt32Value{ @@ -103,48 +102,36 @@ func TestGetLocalServiceCluster(t *testing.T) { }}, }, }, + expectedErr: false, }, { - name: "when service returns two endpoints with same port", - endpoints: []endpoint.Endpoint{tests.Endpoint, { - IP: net.ParseIP("1.2.3.4"), - Port: endpoint.Port(tests.ServicePort), - }}, - expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{ - { - Locality: &xds_core.Locality{ - Zone: "zone", - }, - LbEndpoints: []*xds_endpoint.LbEndpoint{{ - HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{ - Endpoint: &xds_endpoint.Endpoint{ - Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(tests.ServicePort)), - }, - }, - LoadBalancingWeight: &wrappers.UInt32Value{ - Value: constants.ClusterWeightAcceptAll, // Local cluster accepts all traffic - }, - }}, - }, - }, + name: "when service returns multiple ports", + portToProtocolMapping: map[uint32]string{uint32(8080): "something", uint32(8081): "something"}, + expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{}, + expectedErr: true, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - mockCatalog.EXPECT().ListEndpointsForService(proxyService).Return(tc.endpoints, nil).Times(1) + mockCatalog.EXPECT().GetPortToProtocolMappingForService(proxyService).Return(tc.portToProtocolMapping, nil).Times(1) cluster, err := getLocalServiceCluster(mockCatalog, proxyService, clusterName) - assert.Nil(err) - assert.Equal(clusterName, cluster.Name) - assert.Equal(clusterName, cluster.AltStatName) - assert.Equal(ptypes.DurationProto(clusterConnectTimeout), cluster.ConnectTimeout) - assert.Equal(xds_cluster.Cluster_ROUND_ROBIN, cluster.LbPolicy) - assert.Equal(&xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_STRICT_DNS}, cluster.ClusterDiscoveryType) - assert.Equal(true, cluster.RespectDnsTtl) - assert.Equal(xds_cluster.Cluster_V4_ONLY, cluster.DnsLookupFamily) - assert.Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL, cluster.ProtocolSelection) - assert.Equal(len(tc.expectedLocalityLbEndpoints), len(cluster.LoadAssignment.Endpoints)) - //assert.Equal(tc.expectedLocalityLbEndpoints, cluster.LoadAssignment.Endpoints) + if tc.expectedErr { + assert.NotNil(err) + assert.Nil(cluster) + } else { + assert.Nil(err) + assert.Equal(clusterName, cluster.Name) + assert.Equal(clusterName, cluster.AltStatName) + assert.Equal(ptypes.DurationProto(clusterConnectTimeout), cluster.ConnectTimeout) + assert.Equal(xds_cluster.Cluster_ROUND_ROBIN, cluster.LbPolicy) + assert.Equal(&xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_STRICT_DNS}, cluster.ClusterDiscoveryType) + assert.Equal(true, cluster.RespectDnsTtl) + assert.Equal(xds_cluster.Cluster_V4_ONLY, cluster.DnsLookupFamily) + assert.Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL, cluster.ProtocolSelection) + assert.Equal(len(tc.expectedLocalityLbEndpoints), len(cluster.LoadAssignment.Endpoints)) + assert.ElementsMatch(tc.expectedLocalityLbEndpoints, cluster.LoadAssignment.Endpoints) + } }) } }