Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2478 from michelleN/cdsclusters
Browse files Browse the repository at this point in the history
fix(cds): return local cluster endpoints per  port
  • Loading branch information
Michelle Noorali authored Feb 10, 2021
2 parents 989b1cd + 418b47f commit 9e7b53f
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 36 deletions.
20 changes: 15 additions & 5 deletions pkg/envoy/cds/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"

Expand Down Expand Up @@ -73,6 +72,17 @@ func getOutboundPassthroughCluster() *xds_cluster.Cluster {
}
}

// getSyntheticCluster returns a static cluster with no endpoints
func getSyntheticCluster(name string) *xds_cluster.Cluster {
return &xds_cluster.Cluster{
Name: name,
ClusterDiscoveryType: &xds_cluster.Cluster_Type{
Type: xds_cluster.Cluster_STATIC,
},
LbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
}
}

// getLocalServiceCluster returns an Envoy Cluster corresponding to the local service
func getLocalServiceCluster(catalog catalog.MeshCataloger, proxyServiceName service.MeshService, clusterName string) (*xds_cluster.Cluster, error) {
xdsCluster := xds_cluster.Cluster{
Expand All @@ -97,21 +107,21 @@ func getLocalServiceCluster(catalog catalog.MeshCataloger, proxyServiceName serv
Http2ProtocolOptions: &xds_core.Http2ProtocolOptions{},
}

endpoints, err := catalog.ListEndpointsForService(proxyServiceName)
ports, err := catalog.GetTargetPortToProtocolMappingForService(proxyServiceName)
if err != nil {
log.Error().Err(err).Msgf("Failed to get endpoints for service %s", proxyServiceName)
log.Error().Err(err).Msgf("Failed to get ports for service %s", proxyServiceName)
return nil, err
}

for _, ep := range endpoints {
for port := range ports {
localityEndpoint := &xds_endpoint.LocalityLbEndpoints{
Locality: &xds_core.Locality{
Zone: "zone",
},
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(ep.Port)),
Address: envoy.GetAddress(constants.WildcardIPAddr, port),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Expand Down
158 changes: 131 additions & 27 deletions pkg/envoy/cds/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,150 @@
package cds

import (
xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
"errors"
"testing"

xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
xds_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
xds_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers"
tassert "github.com/stretchr/testify/assert"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/envoy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/tests"
)

var _ = Describe("Cluster configurations", func() {
var (
mockCtrl *gomock.Controller
mockConfigurator *configurator.MockConfigurator
)

mockCtrl = gomock.NewController(GinkgoT())
mockConfigurator = configurator.NewMockConfigurator(mockCtrl)
func TestGetUpstreamServiceCluster(t *testing.T) {
assert := tassert.New(t)

mockCtrl := gomock.NewController(t)
mockConfigurator := configurator.NewMockConfigurator(mockCtrl)
downstreamSvc := tests.BookbuyerService
upstreamSvc := tests.BookstoreV1Service
Context("Test getUpstreamServiceCluster", func() {
It("Returns an EDS based cluster when permissive mode is disabled", func() {
mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(false).Times(1)

testCases := []struct {
name string
permissiveMode bool
expectedClusterType xds_cluster.Cluster_DiscoveryType
expectedLbPolicy xds_cluster.Cluster_LbPolicy
expectedProtocolSelection xds_cluster.Cluster_ClusterProtocolSelection
}{
{
name: "Returns an EDS based cluster when permissive mode is disabled",
permissiveMode: false,
expectedClusterType: xds_cluster.Cluster_EDS,
expectedLbPolicy: xds_cluster.Cluster_ROUND_ROBIN,
expectedProtocolSelection: xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL,
},
{
name: "Returns an Original Destination based cluster when permissive mode is enabled",
permissiveMode: true,
expectedClusterType: xds_cluster.Cluster_ORIGINAL_DST,
expectedLbPolicy: xds_cluster.Cluster_CLUSTER_PROVIDED,
expectedProtocolSelection: xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(tc.permissiveMode).Times(1)
remoteCluster, err := getUpstreamServiceCluster(upstreamSvc, downstreamSvc, mockConfigurator)
Expect(err).ToNot(HaveOccurred())
Expect(remoteCluster.GetType()).To(Equal(xds_cluster.Cluster_EDS))
Expect(remoteCluster.LbPolicy).To(Equal(xds_cluster.Cluster_ROUND_ROBIN))
Expect(remoteCluster.ProtocolSelection).To(Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL))
assert.Nil(err)
assert.Equal(tc.expectedClusterType, remoteCluster.GetType())
assert.Equal(tc.expectedLbPolicy, remoteCluster.LbPolicy)
assert.Equal(tc.expectedProtocolSelection, remoteCluster.ProtocolSelection)
})
}
}

It("Returns an Original Destination based cluster when permissive mode is enabled", func() {
mockConfigurator.EXPECT().IsPermissiveTrafficPolicyMode().Return(true).Times(1)
func TestGetLocalServiceCluster(t *testing.T) {
assert := tassert.New(t)

remoteCluster, err := getUpstreamServiceCluster(upstreamSvc, downstreamSvc, mockConfigurator)
Expect(err).ToNot(HaveOccurred())
Expect(remoteCluster.GetType()).To(Equal(xds_cluster.Cluster_ORIGINAL_DST))
Expect(remoteCluster.LbPolicy).To(Equal(xds_cluster.Cluster_CLUSTER_PROVIDED))
Expect(remoteCluster.ProtocolSelection).To(Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL))
clusterName := "bookbuyer-local"
proxyService := service.MeshService{
Name: "bookbuyer",
Namespace: "bookbuyer-ns",
}

mockCtrl := gomock.NewController(t)
mockCatalog := catalog.NewMockMeshCataloger(mockCtrl)

testCases := []struct {
name string
proxyService service.MeshService
portToProtocolMapping map[uint32]string
expectedLocalityLbEndpoints []*xds_endpoint.LocalityLbEndpoints
expectedLbPolicy xds_cluster.Cluster_LbPolicy
expectedProtocolSelection xds_cluster.Cluster_ClusterProtocolSelection
expectedPortToProtocolMappingErr bool
expectedErr bool
}{
{
name: "when service returns a single port",
proxyService: proxyService,
portToProtocolMapping: map[uint32]string{uint32(8080): "something"},
expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{
{
Locality: &xds_core.Locality{
Zone: "zone",
},
LbEndpoints: []*xds_endpoint.LbEndpoint{{
HostIdentifier: &xds_endpoint.LbEndpoint_Endpoint{
Endpoint: &xds_endpoint.Endpoint{
Address: envoy.GetAddress(constants.WildcardIPAddr, uint32(8080)),
},
},
LoadBalancingWeight: &wrappers.UInt32Value{
Value: constants.ClusterWeightAcceptAll, // Local cluster accepts all traffic
},
}},
},
},
expectedPortToProtocolMappingErr: false,
expectedErr: false,
},
{
name: "when err fetching ports",
proxyService: proxyService,
portToProtocolMapping: map[uint32]string{},
expectedLocalityLbEndpoints: []*xds_endpoint.LocalityLbEndpoints{},
expectedPortToProtocolMappingErr: true,
expectedErr: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expectedPortToProtocolMappingErr {
mockCatalog.EXPECT().GetTargetPortToProtocolMappingForService(tc.proxyService).Return(tc.portToProtocolMapping, errors.New("error")).Times(1)
} else {
mockCatalog.EXPECT().GetTargetPortToProtocolMappingForService(tc.proxyService).Return(tc.portToProtocolMapping, nil).Times(1)
}

cluster, err := getLocalServiceCluster(mockCatalog, tc.proxyService, clusterName)

if tc.expectedErr {
assert.NotNil(err)
assert.Nil(cluster)
} else {
assert.Nil(err)
assert.Equal(clusterName, cluster.Name)
assert.Equal(clusterName, cluster.AltStatName)
assert.Equal(ptypes.DurationProto(clusterConnectTimeout), cluster.ConnectTimeout)
assert.Equal(xds_cluster.Cluster_ROUND_ROBIN, cluster.LbPolicy)
assert.Equal(&xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_STRICT_DNS}, cluster.ClusterDiscoveryType)
assert.Equal(true, cluster.RespectDnsTtl)
assert.Equal(xds_cluster.Cluster_V4_ONLY, cluster.DnsLookupFamily)
assert.Equal(xds_cluster.Cluster_USE_DOWNSTREAM_PROTOCOL, cluster.ProtocolSelection)
assert.Equal(len(tc.expectedLocalityLbEndpoints), len(cluster.LoadAssignment.Endpoints))
assert.ElementsMatch(tc.expectedLocalityLbEndpoints, cluster.LoadAssignment.Endpoints)
}
})
})
})
}
}
13 changes: 9 additions & 4 deletions pkg/envoy/cds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ func NewResponse(meshCatalog catalog.MeshCataloger, proxy *envoy.Proxy, _ *xds_d
// Create a local cluster for the service.
// The local cluster will be used for incoming traffic.
localClusterName := envoy.GetLocalClusterNameForService(proxyServiceName)
localCluster, err := getLocalServiceCluster(meshCatalog, proxyServiceName, localClusterName)
if err != nil {
log.Error().Err(err).Msgf("Failed to get local cluster config for proxy %s", proxyServiceName)
return nil, err
var localCluster *xds_cluster.Cluster
if proxyServiceName.IsSyntheticService() {
localCluster = getSyntheticCluster(localClusterName)
} else {
localCluster, err = getLocalServiceCluster(meshCatalog, proxyServiceName, localClusterName)
if err != nil {
log.Error().Err(err).Msgf("Failed to get local cluster config for proxy %s", proxyServiceName)
return nil, err
}
}
clusters = append(clusters, localCluster)

Expand Down
6 changes: 6 additions & 0 deletions pkg/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func (sa K8sServiceAccount) GetSyntheticService() MeshService {
}
}

// IsSyntheticService evaluates the given service name and returns a boolean to denote whether the name
// looks like the name of a synthetic service or not
func (ms MeshService) IsSyntheticService() bool {
return strings.Contains(ms.Name, ".osm.synthetic-")
}

// ClusterName is a type for a service name
type ClusterName string

Expand Down

0 comments on commit 9e7b53f

Please sign in to comment.