Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
feat(*): add traffic split in permissive mode
Browse files Browse the repository at this point in the history
resolves #2527

Signed-off-by: Michelle Noorali <minooral@microsoft.com>
  • Loading branch information
Michelle Noorali committed Aug 24, 2021
1 parent f78e72f commit 65a35eb
Show file tree
Hide file tree
Showing 11 changed files with 529 additions and 32 deletions.
34 changes: 32 additions & 2 deletions pkg/catalog/inbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ const (
// 2. for the given service account and upstream services from SMI Traffic Target and Traffic Split
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func (mc *MeshCatalog) ListInboundTrafficPolicies(upstreamIdentity identity.ServiceIdentity, upstreamServices []service.MeshService) []*trafficpolicy.InboundTrafficPolicy {
inboundPoliciesFromSplits := mc.listInboundPoliciesForTrafficSplits(upstreamIdentity, upstreamServices)

if mc.configurator.IsPermissiveTrafficPolicyMode() {
var inboundPolicies []*trafficpolicy.InboundTrafficPolicy
for _, svc := range upstreamServices {
inboundPolicies = trafficpolicy.MergeInboundPolicies(DisallowPartialHostnamesMatch, inboundPolicies, mc.buildInboundPermissiveModePolicies(svc)...)
inboundPolicies = trafficpolicy.MergeInboundPolicies(DisallowPartialHostnamesMatch, mc.buildInboundPermissiveModePolicies(svc), inboundPoliciesFromSplits...)
}
return inboundPolicies
}

inbound := mc.listInboundPoliciesFromTrafficTargets(upstreamIdentity, upstreamServices)
inboundPoliciesFromSplits := mc.listInboundPoliciesForTrafficSplits(upstreamIdentity, upstreamServices)
inbound = trafficpolicy.MergeInboundPolicies(AllowPartialHostnamesMatch, inbound, inboundPoliciesFromSplits...)
return inbound
}
Expand Down Expand Up @@ -73,6 +74,35 @@ func (mc *MeshCatalog) listInboundPoliciesForTrafficSplits(upstreamIdentity iden
upstreamServiceAccount := upstreamIdentity.ToK8sServiceAccount()
var inboundPolicies []*trafficpolicy.InboundTrafficPolicy

if mc.configurator.IsPermissiveTrafficPolicyMode() {
for _, upstreamSvc := range upstreamServices {
//check if the upstream service belong to a traffic split
if !mc.isTrafficSplitBackendService(upstreamSvc) {
continue
}

apexServices := mc.getApexServicesForBackendService(upstreamSvc)
for _, apexService := range apexServices {
// build an inbound policy for every apex service
locality := service.LocalCluster
if apexService.Namespace == upstreamServiceAccount.Namespace {
locality = service.LocalNS
}
hostnames, err := mc.GetServiceHostnames(apexService, locality)
if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrServiceHostnames)).
Msgf("Error getting service hostnames for apex service %v", apexService)
continue
}
servicePolicy := trafficpolicy.NewInboundTrafficPolicy(apexService.FQDN(), hostnames)
weightedCluster := getDefaultWeightedClusterForService(upstreamSvc)
servicePolicy.AddRule(*trafficpolicy.NewRouteWeightedCluster(trafficpolicy.WildCardRouteMatch, []service.WeightedCluster{weightedCluster}), identity.WildcardServiceIdentity)
inboundPolicies = trafficpolicy.MergeInboundPolicies(AllowPartialHostnamesMatch, inboundPolicies, servicePolicy)
}
}
return inboundPolicies
}

for _, t := range mc.meshSpec.ListTrafficTargets() { // loop through all traffic targets
if !isValidTrafficTarget(t) {
continue
Expand Down
87 changes: 86 additions & 1 deletion pkg/catalog/inbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,90 @@ func TestListInboundTrafficPolicies(t *testing.T) {
},
permissiveMode: true,
},
{
name: "permissive mode with traffic split",
downstreamSA: tests.BookbuyerServiceIdentity,
upstreamSA: tests.BookstoreV2ServiceIdentity,
upstreamServices: []service.MeshService{tests.BookstoreV2Service},
meshServices: []service.MeshService{tests.BookbuyerService, tests.BookstoreV1Service, tests.BookstoreV2Service, tests.BookstoreApexService},
meshServiceAccounts: []identity.K8sServiceAccount{tests.BookbuyerServiceAccount, tests.BookstoreV2ServiceAccount},
trafficSpec: spec.HTTPRouteGroup{},
trafficSplit: split.TrafficSplit{
ObjectMeta: v1.ObjectMeta{
Namespace: "default",
},
Spec: split.TrafficSplitSpec{
Service: "bookstore-apex",
Backends: []split.TrafficSplitBackend{
{
Service: tests.BookstoreV1ServiceName,
Weight: tests.Weight90,
},
{
Service: tests.BookstoreV2ServiceName,
Weight: tests.Weight10,
},
},
},
},
expectedInboundPolicies: []*trafficpolicy.InboundTrafficPolicy{
{
Name: "bookstore-v2.default.svc.cluster.local",
Hostnames: []string{
"bookstore-v2",
"bookstore-v2.default",
"bookstore-v2.default.svc",
"bookstore-v2.default.svc.cluster",
"bookstore-v2.default.svc.cluster.local",
"bookstore-v2:8888",
"bookstore-v2.default:8888",
"bookstore-v2.default.svc:8888",
"bookstore-v2.default.svc.cluster:8888",
"bookstore-v2.default.svc.cluster.local:8888",
},
Rules: []*trafficpolicy.Rule{
{
Route: trafficpolicy.RouteWeightedClusters{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSet(service.WeightedCluster{
ClusterName: "default/bookstore-v2",
Weight: 100,
}),
},
AllowedServiceIdentities: mapset.NewSet(identity.WildcardServiceIdentity),
},
},
},
{
Name: "bookstore-apex.default.svc.cluster.local",
Hostnames: []string{
"bookstore-apex",
"bookstore-apex.default",
"bookstore-apex.default.svc",
"bookstore-apex.default.svc.cluster",
"bookstore-apex.default.svc.cluster.local",
"bookstore-apex:8888",
"bookstore-apex.default:8888",
"bookstore-apex.default.svc:8888",
"bookstore-apex.default.svc.cluster:8888",
"bookstore-apex.default.svc.cluster.local:8888",
},
Rules: []*trafficpolicy.Rule{
{
Route: trafficpolicy.RouteWeightedClusters{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSet(service.WeightedCluster{
ClusterName: "default/bookstore-v2",
Weight: 100,
}),
},
AllowedServiceIdentities: mapset.NewSet(identity.WildcardServiceIdentity),
},
},
},
},
permissiveMode: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -501,6 +585,8 @@ func TestListInboundTrafficPolicies(t *testing.T) {

mockEndpointProvider.EXPECT().GetID().Return("fake").AnyTimes()

mockMeshSpec.EXPECT().ListTrafficSplits().Return([]*split.TrafficSplit{&tc.trafficSplit}).AnyTimes()

if tc.permissiveMode {
var serviceAccounts []*corev1.ServiceAccount
for _, sa := range tc.meshServiceAccounts {
Expand All @@ -511,7 +597,6 @@ func TestListInboundTrafficPolicies(t *testing.T) {
mockKubeController.EXPECT().ListServiceAccounts().Return(serviceAccounts).AnyTimes()
} else {
mockMeshSpec.EXPECT().ListHTTPTrafficSpecs().Return([]*spec.HTTPRouteGroup{&tc.trafficSpec}).AnyTimes()
mockMeshSpec.EXPECT().ListTrafficSplits().Return([]*split.TrafficSplit{&tc.trafficSplit}).AnyTimes()
trafficTarget := tests.NewSMITrafficTarget(tc.downstreamSA, tc.upstreamSA)
mockMeshSpec.EXPECT().ListTrafficTargets().Return([]*access.TrafficTarget{&trafficTarget}).AnyTimes()
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ import (
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func (mc *MeshCatalog) ListOutboundTrafficPolicies(downstreamIdentity identity.ServiceIdentity) []*trafficpolicy.OutboundTrafficPolicy {
downstreamServiceAccount := downstreamIdentity.ToK8sServiceAccount()
if mc.configurator.IsPermissiveTrafficPolicyMode() {
var outboundPolicies []*trafficpolicy.OutboundTrafficPolicy
mergedPolicies := trafficpolicy.MergeOutboundPolicies(DisallowPartialHostnamesMatch, outboundPolicies, mc.buildOutboundPermissiveModePolicies(downstreamServiceAccount.Namespace)...)
outboundPolicies = mergedPolicies
return outboundPolicies
}

outbound := mc.listOutboundPoliciesForTrafficTargets(downstreamIdentity)
outboundPoliciesFromSplits := mc.listOutboundTrafficPoliciesForTrafficSplits(downstreamServiceAccount.Namespace)
outbound = trafficpolicy.MergeOutboundPolicies(AllowPartialHostnamesMatch, outbound, outboundPoliciesFromSplits...)

return outbound
if mc.configurator.IsPermissiveTrafficPolicyMode() {
permissiveOutboundPolicies := mc.buildOutboundPermissiveModePolicies(downstreamServiceAccount.Namespace)
//TODO: I don't see a reason to disallow partial hostname match. Will follow up on this.
return trafficpolicy.MergeOutboundPolicies(DisallowPartialHostnamesMatch, permissiveOutboundPolicies, outboundPoliciesFromSplits...)
}

allowedOutboundPolicies := mc.listOutboundPoliciesForTrafficTargets(downstreamIdentity)
return trafficpolicy.MergeOutboundPolicies(AllowPartialHostnamesMatch, allowedOutboundPolicies, outboundPoliciesFromSplits...)
}

// listOutboundPoliciesForTrafficTargets loops through all SMI Traffic Target resources and returns outbound traffic policies
Expand Down
90 changes: 90 additions & 0 deletions pkg/catalog/outbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,95 @@ func TestListOutboundTrafficPolicies(t *testing.T) {
},
permissiveMode: true,
},
{
name: "permissive mode with traffic splits",
downstreamSA: tests.BookbuyerServiceIdentity,
apexMeshServices: []service.MeshService{tests.BookstoreApexService},
meshServices: []service.MeshService{tests.BookstoreV1Service, tests.BookstoreV2Service, tests.BookbuyerService},
meshServiceAccounts: []identity.K8sServiceAccount{tests.BookbuyerServiceAccount, tests.BookstoreServiceAccount},
trafficsplits: []*split.TrafficSplit{&tests.TrafficSplit},
traffictargets: []*access.TrafficTarget{},
trafficspecs: []*spec.HTTPRouteGroup{},
expectedOutbound: []*trafficpolicy.OutboundTrafficPolicy{
{
Name: "bookstore-apex.default.svc.cluster.local",
Hostnames: tests.BookstoreApexHostnames,
Routes: []*trafficpolicy.RouteWeightedClusters{
{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSetFromSlice([]interface{}{
service.WeightedCluster{ClusterName: "default/bookstore-v1", Weight: 90},
service.WeightedCluster{ClusterName: "default/bookstore-v2", Weight: 10},
}),
},
},
},
{
Name: "bookstore-v1.default.svc.cluster.local",
Hostnames: []string{
"bookstore-v1",
"bookstore-v1.default",
"bookstore-v1.default.svc",
"bookstore-v1.default.svc.cluster",
"bookstore-v1.default.svc.cluster.local",
"bookstore-v1:8888",
"bookstore-v1.default:8888",
"bookstore-v1.default.svc:8888",
"bookstore-v1.default.svc.cluster:8888",
"bookstore-v1.default.svc.cluster.local:8888",
},
Routes: []*trafficpolicy.RouteWeightedClusters{
{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSet(tests.BookstoreV1DefaultWeightedCluster),
},
},
},
{
Name: "bookstore-v2.default.svc.cluster.local",
Hostnames: []string{
"bookstore-v2",
"bookstore-v2.default",
"bookstore-v2.default.svc",
"bookstore-v2.default.svc.cluster",
"bookstore-v2.default.svc.cluster.local",
"bookstore-v2:8888",
"bookstore-v2.default:8888",
"bookstore-v2.default.svc:8888",
"bookstore-v2.default.svc.cluster:8888",
"bookstore-v2.default.svc.cluster.local:8888",
},
Routes: []*trafficpolicy.RouteWeightedClusters{
{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSet(tests.BookstoreV2DefaultWeightedCluster),
},
},
},
{
Name: "bookbuyer.default.svc.cluster.local",
Hostnames: []string{
"bookbuyer",
"bookbuyer.default",
"bookbuyer.default.svc",
"bookbuyer.default.svc.cluster",
"bookbuyer.default.svc.cluster.local",
"bookbuyer:8888",
"bookbuyer.default:8888",
"bookbuyer.default.svc:8888",
"bookbuyer.default.svc.cluster:8888",
"bookbuyer.default.svc.cluster.local:8888",
},
Routes: []*trafficpolicy.RouteWeightedClusters{
{
HTTPRouteMatch: tests.WildCardRouteMatch,
WeightedClusters: mapset.NewSet(tests.BookbuyerDefaultWeightedCluster),
},
},
},
},
permissiveMode: true,
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -364,6 +453,7 @@ func TestListOutboundTrafficPolicies(t *testing.T) {
}
mockKubeController.EXPECT().ListServices().Return(services).AnyTimes()
mockKubeController.EXPECT().ListServiceAccounts().Return(serviceAccounts).AnyTimes()
mockMeshSpec.EXPECT().ListTrafficSplits().Return(tc.trafficsplits).AnyTimes()
} else {
mockMeshSpec.EXPECT().ListTrafficSplits().Return(tc.trafficsplits).AnyTimes()
mockMeshSpec.EXPECT().ListTrafficTargets().Return(tc.traffictargets).AnyTimes()
Expand Down
14 changes: 4 additions & 10 deletions pkg/envoy/cds/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,10 @@ func getUpstreamServiceCluster(downstreamIdentity identity.ServiceIdentity, upst
},
}

if o.permissive {
// Since no traffic policies exist with permissive mode, rely on cluster provided service discovery.
remoteCluster.ClusterDiscoveryType = &xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_ORIGINAL_DST}
remoteCluster.LbPolicy = xds_cluster.Cluster_CLUSTER_PROVIDED
} else {
// Configure service discovery based on traffic policies
remoteCluster.ClusterDiscoveryType = &xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_EDS}
remoteCluster.EdsClusterConfig = &xds_cluster.Cluster_EdsClusterConfig{EdsConfig: envoy.GetADSConfigSource()}
remoteCluster.LbPolicy = xds_cluster.Cluster_ROUND_ROBIN
}
// Configure service discovery based on traffic policies
remoteCluster.ClusterDiscoveryType = &xds_cluster.Cluster_Type{Type: xds_cluster.Cluster_EDS}
remoteCluster.EdsClusterConfig = &xds_cluster.Cluster_EdsClusterConfig{EdsConfig: envoy.GetADSConfigSource()}
remoteCluster.LbPolicy = xds_cluster.Cluster_ROUND_ROBIN

if o.withActiveHealthChecks {
enableHealthChecksOnCluster(remoteCluster, upstreamSvc)
Expand Down
1 change: 1 addition & 0 deletions pkg/envoy/rds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func NewResponse(cataloger catalog.MeshCataloger, proxy *envoy.Proxy, discoveryR
}

services, err := proxyRegistry.ListProxyServices(proxy)

if err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrFetchingServiceList)).
Msgf("Error looking up services for Envoy with serial number=%q", proxy.GetCertificateSerialNumber())
Expand Down
4 changes: 2 additions & 2 deletions pkg/trafficpolicy/trafficpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,15 @@ func mergeRules(originalRules, latestRules []*Rule) []*Rule {

// mergeRoutesWeightedClusters merges two slices of RouteWeightedClusters and returns a slice where there is one RouteWeightedCluster
// for any HTTPRouteMatch. Where there is an overlap in HTTPRouteMatch between the originalRoutes and latestRoutes, the WeightedClusters
// will be unioned as there can only be one set of WeightedClusters per HTTPRouteMatch.
// from latest will be used as there can only be one set of WeightedClusters per HTTPRouteMatch.
func mergeRoutesWeightedClusters(originalRoutes, latestRoutes []*RouteWeightedClusters) []*RouteWeightedClusters {
for _, latest := range latestRoutes {
foundRoute := false
for _, original := range originalRoutes {
if reflect.DeepEqual(original.HTTPRouteMatch, latest.HTTPRouteMatch) {
foundRoute = true
if !reflect.DeepEqual(original.WeightedClusters, latest.WeightedClusters) {
original.WeightedClusters = original.WeightedClusters.Union(latest.WeightedClusters)
original.WeightedClusters = latest.WeightedClusters
}
continue
}
Expand Down
Loading

0 comments on commit 65a35eb

Please sign in to comment.