Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
fix(cds): return local cluster endpoints per port
Browse files Browse the repository at this point in the history
* getLocalServiceCluster was returning an xds cluster with endpoints
based on Kubernetes service endpoints when it should have been building
lcoal cluster endpoints based on the port specified in the service spec.
i.e. 0.0.0.0:<port>
As a result, there were 18 extra, duplicate  entries in the envoy local
clusters that were being programmed per Kubernetes service endpoints
(or Pod replicas)
* This fixes the getLocalServiceCluster to return a local cluster with endpoints
based on the Kubernetes port rather than the Kubernetes Service endpoints.
* It also adds a validation that ensures that only one port is specified on
the Kubernetes service as multiple ports are not currently supported

Signed-off-by: Michelle Noorali <minooral@microsoft.com>
  • Loading branch information
Michelle Noorali committed Feb 8, 2021
1 parent c3ced5d commit 08a1715
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 45 deletions.
18 changes: 13 additions & 5 deletions pkg/envoy/cds/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pkg/errors"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/configurator"
Expand All @@ -23,6 +23,9 @@ const (
clusterConnectTimeout = 1 * time.Second
)

// ErrMultiplePorts is an error for describing that a Kubernetes Service which exposes multiple ports in not supported
var ErrMultiplePorts = errors.New("Error multiple ports on service not supported")

// getUpstreamServiceCluster returns an Envoy Cluster corresponding to the given upstream service
func getUpstreamServiceCluster(upstreamSvc, downstreamSvc service.MeshService, cfg configurator.Configurator) (*xds_cluster.Cluster, error) {
clusterName := upstreamSvc.String()
Expand Down Expand Up @@ -97,21 +100,26 @@ func getLocalServiceCluster(catalog catalog.MeshCataloger, proxyServiceName serv
Http2ProtocolOptions: &xds_core.Http2ProtocolOptions{},
}

endpoints, err := catalog.ListEndpointsForService(proxyServiceName)
ports, err := catalog.GetPortToProtocolMappingForService(proxyServiceName)
if err != nil {
log.Error().Err(err).Msgf("Failed to get endpoints for service %s", proxyServiceName)
log.Error().Err(err).Msgf("Failed to get ports for service %s", proxyServiceName)
return nil, err
}

for _, ep := range endpoints {
if len(ports) > 1 {
log.Error().Err(ErrMultiplePorts).Msgf("Error multiple ports on service %s in namespace %s", proxyServiceName.Name, proxyServiceName.Namespace)
return nil, ErrMultiplePorts
}

for port := range ports {
localityEndpoint := &xds_endpoint.LocalityLbEndpoints{
Locality: &xds_core.Locality{
Zone: "zone",
},
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(ep.Port)),
Address: envoy.GetAddress(constants.WildcardIPAddr, port),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Expand Down
67 changes: 27 additions & 40 deletions pkg/envoy/cds/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cds

import (
"net"
"testing"

xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/tests"
Expand Down Expand Up @@ -78,14 +76,15 @@ func TestGetLocalServiceCluster(t *testing.T) {

testCases := []struct {
name string
endpoints []endpoint.Endpoint
portToProtocolMapping map[uint32]string
expectedLocalityLbEndpoints []*xds_endpoint.LocalityLbEndpoints
expectedLbPolicy xds_cluster.Cluster_LbPolicy
expectedProtocolSelection xds_cluster.Cluster_ClusterProtocolSelection
expectedErr bool
}{
{
name: "when service returns one endpoint",
endpoints: []endpoint.Endpoint{tests.Endpoint},
name: "when service returns one endpoint",
portToProtocolMapping: map[uint32]string{uint32(8080): "something"},
expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{
{
Locality: &xds_core.Locality{
Expand All @@ -94,7 +93,7 @@ func TestGetLocalServiceCluster(t *testing.T) {
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(tests.ServicePort)),
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(8080)),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Expand All @@ -103,48 +102,36 @@ func TestGetLocalServiceCluster(t *testing.T) {
}},
},
},
expectedErr: false,
},
{
name: "when service returns two endpoints with same port",
endpoints: []endpoint.Endpoint{tests.Endpoint, {
IP: net.ParseIP("1.2.3.4"),
Port: endpoint.Port(tests.ServicePort),
}},
expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{
{
Locality: &xds_core.Locality{
Zone: "zone",
},
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(tests.ServicePort)),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: constants.ClusterWeightAcceptAll, // Local cluster accepts all traffic
},
}},
},
},
name: "when service returns multiple ports",
portToProtocolMapping: map[uint32]string{uint32(8080): "something", uint32(8081): "something"},
expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{},
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockCatalog.EXPECT().ListEndpointsForService(proxyService).Return(tc.endpoints, nil).Times(1)
mockCatalog.EXPECT().GetPortToProtocolMappingForService(proxyService).Return(tc.portToProtocolMapping, nil).Times(1)
cluster, err := getLocalServiceCluster(mockCatalog, proxyService, clusterName)
assert.Nil(err)
assert.Equal(clusterName, cluster.Name)
assert.Equal(clusterName, cluster.AltStatName)
assert.Equal(ptypes.DurationProto(clusterConnectTimeout), cluster.ConnectTimeout)
assert.Equal(xds_cluster.Cluster_ROUND_ROBIN, cluster.LbPolicy)
assert.Equal(&xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_STRICT_DNS}, cluster.ClusterDiscoveryType)
assert.Equal(true, cluster.RespectDnsTtl)
assert.Equal(xds_cluster.Cluster_V4_ONLY, cluster.DnsLookupFamily)
assert.Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL, cluster.ProtocolSelection)
assert.Equal(len(tc.expectedLocalityLbEndpoints), len(cluster.LoadAssignment.Endpoints))
//assert.Equal(tc.expectedLocalityLbEndpoints, cluster.LoadAssignment.Endpoints)
if tc.expectedErr {
assert.NotNil(err)
assert.Nil(cluster)
} else {
assert.Nil(err)
assert.Equal(clusterName, cluster.Name)
assert.Equal(clusterName, cluster.AltStatName)
assert.Equal(ptypes.DurationProto(clusterConnectTimeout), cluster.ConnectTimeout)
assert.Equal(xds_cluster.Cluster_ROUND_ROBIN, cluster.LbPolicy)
assert.Equal(&xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_STRICT_DNS}, cluster.ClusterDiscoveryType)
assert.Equal(true, cluster.RespectDnsTtl)
assert.Equal(xds_cluster.Cluster_V4_ONLY, cluster.DnsLookupFamily)
assert.Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL, cluster.ProtocolSelection)
assert.Equal(len(tc.expectedLocalityLbEndpoints), len(cluster.LoadAssignment.Endpoints))
assert.ElementsMatch(tc.expectedLocalityLbEndpoints, cluster.LoadAssignment.Endpoints)
}
})
}
}

0 comments on commit 08a1715

Please sign in to comment.