Skip to content
This repository has been archived by the owner on Apr 14, 2024. It is now read-only.

Fix Multi Cluster Feature issues. #119

Merged
merged 11 commits into from
Nov 25, 2022
4 changes: 2 additions & 2 deletions charts/osm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 1.2.0
version: 1.3.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: v1.2.0
appVersion: v1.3.0

# This specifies the minimum Kubernetes version OSM is compatible with.
kubeVersion: ">= 1.19.0-0"
Expand Down
8 changes: 4 additions & 4 deletions charts/osm/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Open Service Mesh Edge Helm Chart

![Version: 1.2.0](https://img.shields.io/badge/Version-1.2.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: v1.2.0](https://img.shields.io/badge/AppVersion-v1.2.0-informational?style=flat-square)
![Version: 1.3.0](https://img.shields.io/badge/Version-1.3.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: v1.3.0](https://img.shields.io/badge/AppVersion-v1.3.0-informational?style=flat-square)

A Helm chart to install the [osm-edge](https://github.com/flomesh-io/osm-edge) control plane on Kubernetes.

Expand Down Expand Up @@ -224,7 +224,7 @@ The following table lists the configurable parameters of the osm chart and their
| osm.outboundIPRangeExclusionList | list | `[]` | Specifies a global list of IP ranges to exclude from outbound traffic interception by the sidecar proxy. If specified, must be a list of IP ranges of the form a.b.c.d/x. |
| osm.outboundIPRangeInclusionList | list | `[]` | Specifies a global list of IP ranges to include for outbound traffic interception by the sidecar proxy. If specified, must be a list of IP ranges of the form a.b.c.d/x. |
| osm.outboundPortExclusionList | list | `[]` | Specifies a global list of ports to exclude from outbound traffic interception by the sidecar proxy. If specified, must be a list of positive integers. |
| osm.pipyRepoImage | string | `"flomesh/pipy-repo-nightly:latest"` | Pipy repo image for Pipy sidecar's proxy control plane container |
| osm.pipyRepoImage | string | `"flomesh/pipy-repo:0.70.0-46"` | Pipy repo image for Pipy sidecar's proxy control plane container |
| osm.preinstall.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].key | string | `"kubernetes.io/os"` | |
| osm.preinstall.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].operator | string | `"In"` | |
| osm.preinstall.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[0].matchExpressions[0].values[0] | string | `"linux"` | |
Expand Down Expand Up @@ -257,9 +257,9 @@ The following table lists the configurable parameters of the osm chart and their
| osm.remoteLogging.endpoint | string | `""` | Remote logging's API path where the spans will be sent to |
| osm.remoteLogging.port | int | `30514` | Port of the remote logging service |
| osm.sidecarClass | string | `"pipy"` | The class of the OSM Sidecar Driver |
| osm.sidecarDrivers | list | `[{"proxyServerPort":6060,"sidecarImage":"flomesh/pipy-nightly:latest","sidecarName":"pipy"},{"proxyServerPort":15128,"sidecarImage":"envoyproxy/envoy:v1.19.3","sidecarName":"envoy","sidecarWindowsImage":"envoyproxy/envoy-windows:latest"}]` | Sidecar drivers supported by osm-edge |
| osm.sidecarDrivers | list | `[{"proxyServerPort":6060,"sidecarImage":"flomesh/pipy:0.70.0-46","sidecarName":"pipy"},{"proxyServerPort":15128,"sidecarImage":"envoyproxy/envoy:v1.19.3","sidecarName":"envoy","sidecarWindowsImage":"envoyproxy/envoy-windows:latest"}]` | Sidecar drivers supported by osm-edge |
| osm.sidecarDrivers[0].proxyServerPort | int | `6060` | Remote destination port on which the Discovery Service listens for new connections from Sidecars. |
| osm.sidecarDrivers[0].sidecarImage | string | `"flomesh/pipy-nightly:latest"` | Sidecar image for Linux workloads |
| osm.sidecarDrivers[0].sidecarImage | string | `"flomesh/pipy:0.70.0-46"` | Sidecar image for Linux workloads |
| osm.sidecarDrivers[1].proxyServerPort | int | `15128` | Remote destination port on which the Discovery Service listens for new connections from Sidecars. |
| osm.sidecarDrivers[1].sidecarImage | string | `"envoyproxy/envoy:v1.19.3"` | Sidecar image for Linux workloads |
| osm.sidecarDrivers[1].sidecarWindowsImage | string | `"envoyproxy/envoy-windows:latest"` | Sidecar image for Windows workloads |
Expand Down
4 changes: 2 additions & 2 deletions charts/osm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ osm:
sidecarDrivers:
- sidecarName: pipy
# -- Sidecar image for Linux workloads
sidecarImage: flomesh/pipy-nightly:latest
sidecarImage: flomesh/pipy:0.70.0-46
# -- Remote destination port on which the Discovery Service listens for new connections from Sidecars.
proxyServerPort: 6060
- sidecarName: envoy
Expand All @@ -70,7 +70,7 @@ osm:
# -- Curl image for control plane init container
curlImage: curlimages/curl
# -- Pipy repo image for Pipy sidecar's proxy control plane container
pipyRepoImage: flomesh/pipy-repo-nightly:latest
pipyRepoImage: flomesh/pipy-repo:0.70.0-46
#
# -- OSM controller parameters
osmController:
Expand Down
6 changes: 6 additions & 0 deletions docs/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release Notes

## Release v1.3.0

### Notable changes

- Multi cluster service support

## Release v1.2.1

### Notable changes
Expand Down
9 changes: 9 additions & 0 deletions pkg/catalog/access_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)
Expand Down Expand Up @@ -41,13 +42,21 @@ func (mc *MeshCatalog) GetAccessControlTrafficPolicy(svc service.MeshService) (*
continue
}

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &svc})

trafficMatch := &trafficpolicy.AccessControlTrafficMatch{
Name: service.AccessControlTrafficMatchName(svc.Name, svc.Namespace, uint16(backend.Port.Number), backend.Port.Protocol),
Port: uint32(backend.Port.Number),
Protocol: backend.Port.Protocol,
TLS: backend.TLS,
}

if upstreamTrafficSetting != nil {
trafficMatch.RateLimit = upstreamTrafficSetting.Spec.RateLimit
trafficMatch.HeaderRateLimit = &upstreamTrafficSetting.Spec.HTTPHeaders
}

var sourceIPRanges []string
sourceIPSet := mapset.NewSet() // Used to avoid duplicate IP ranges
for _, source := range aclPolicy.Spec.Sources {
Expand Down
9 changes: 9 additions & 0 deletions pkg/catalog/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
)
Expand Down Expand Up @@ -47,13 +48,21 @@ func (mc *MeshCatalog) GetIngressTrafficPolicy(svc service.MeshService) (*traffi
continue
}

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &svc})

trafficMatch := &trafficpolicy.IngressTrafficMatch{
Name: service.IngressTrafficMatchName(svc.Name, svc.Namespace, uint16(backend.Port.Number), backend.Port.Protocol),
Port: uint32(backend.Port.Number),
Protocol: backend.Port.Protocol,
TLS: backend.TLS,
}

if upstreamTrafficSetting != nil {
trafficMatch.RateLimit = upstreamTrafficSetting.Spec.RateLimit
trafficMatch.HeaderRateLimit = &upstreamTrafficSetting.Spec.HTTPHeaders
}

if backend.TLS != nil {
trafficMatch.ServerNames = backend.TLS.SNIHosts
trafficMatch.SkipClientCertValidation = backend.TLS.SkipClientCertValidation
Expand Down
1 change: 1 addition & 0 deletions pkg/catalog/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ func TestGetIngressTrafficPolicy(t *testing.T) {
// Note: if AnyTimes() is used with a mock function, it implies the function may or may not be called
// depending on the test case.
mockPolicyController.EXPECT().GetIngressBackendPolicy(tc.meshSvc).Return(tc.ingressBackend).AnyTimes()
mockPolicyController.EXPECT().GetUpstreamTrafficSetting(policy.UpstreamTrafficSettingGetOpt{MeshService: &tc.meshSvc}).Return(nil).AnyTimes()
mockServiceProvider.EXPECT().GetID().Return("mock").AnyTimes()
mockEndpointsProvider.EXPECT().ListEndpointsForService(ingressSourceSvc).Return(ingressBackendSvcEndpoints).AnyTimes()
mockEndpointsProvider.EXPECT().ListEndpointsForService(sourceSvcWithoutEndpoints).Return(nil).AnyTimes()
Expand Down
108 changes: 83 additions & 25 deletions pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package catalog

import (
mapset "github.com/deckarep/golang-set"
split "github.com/servicemeshinterface/smi-sdk-go/pkg/apis/split/v1alpha2"
"k8s.io/apimachinery/pkg/types"

"github.com/openservicemesh/osm/pkg/constants"
Expand Down Expand Up @@ -34,10 +35,24 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
downstreamSvcAccount := downstreamIdentity.ToK8sServiceAccount()
servicesResolvableSet := make(map[string][]interface{})

var egressPolicy *trafficpolicy.EgressTrafficPolicy
var egressPolicyGetted bool

// For each service, build the traffic policies required to access it.
// It is important to aggregate HTTP route configs by the service's port.
for _, meshSvc := range mc.ListOutboundServicesForIdentity(downstreamIdentity) {
meshSvc := meshSvc // To prevent loop variable memory aliasing in for loop
egressEnabled := mc.configurator.IsEgressEnabled()
if !egressEnabled {
if !egressPolicyGetted {
egressPolicy, _ = mc.GetEgressTrafficPolicy(downstreamIdentity)
egressPolicyGetted = true
}
if egressPolicy != nil {
egressEnabled = mc.isEgressService(meshSvc, egressPolicy)
}
}
monitoredNamespace := mc.kubeController.IsMonitoredNamespace(meshSvc.Namespace)
existIntraEndpoints := false

// Retrieve the destination IP address from the endpoints for this service
Expand All @@ -51,7 +66,7 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
destinationIPRanges = append(destinationIPRanges, ipCIDR)
}
if !existIntraEndpoints {
if len(endp.ClusterKey) == 0 {
if len(endp.ClusterKey) == 0 && (monitoredNamespace || egressEnabled) {
existIntraEndpoints = true
}
}
Expand Down Expand Up @@ -93,8 +108,8 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
split := trafficSplits[0] // TODO(#2759): support multiple traffic splits per apex service

for _, backend := range split.Spec.Backends {
var cns []service.ClusterName
cnsLocal := make(map[service.ClusterName]bool)
var aas []service.ClusterName
var fos []service.ClusterName
{
backendMeshSvc := service.MeshService{
Expand All @@ -105,7 +120,7 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
types.NamespacedName{Namespace: backendMeshSvc.Namespace, Name: backendMeshSvc.Name}, meshSvc.Port)
if err == nil {
backendMeshSvc.TargetPort = targetPort
cns = append(cns, service.ClusterName(backendMeshSvc.SidecarClusterName()))
aas = append(aas, service.ClusterName(backendMeshSvc.SidecarClusterName()))
cnsLocal[service.ClusterName(backendMeshSvc.SidecarClusterName())] = true
}
}
Expand All @@ -122,35 +137,16 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
backendMeshSvc.TargetPort = targetPort
if _, exists := cnsLocal[service.ClusterName(backendMeshSvc.SidecarClusterName())]; !exists {
if aa {
cns = append(cns, service.ClusterName(backendMeshSvc.SidecarClusterName()))
aas = append(aas, service.ClusterName(backendMeshSvc.SidecarClusterName()))
} else {
fos = append(fos, service.ClusterName(backendMeshSvc.SidecarClusterName()))
}
}
}
}
}
if len(cns) > 0 {
totalWeight := backend.Weight
for index, cn := range cns {
weight := totalWeight / (len(cns) - index)
totalWeight -= weight
wc := service.WeightedCluster{
ClusterName: cn,
Weight: weight,
}
upstreamClusters = append(upstreamClusters, wc)
}
}
if len(fos) > 0 {
for _, cn := range fos {
wc := service.WeightedCluster{
ClusterName: cn,
Weight: constants.ClusterWeightFailOver,
}
upstreamClusters = append(upstreamClusters, wc)
}
}
upstreamClusters = activeUpstreamClusters(aas, backend, upstreamClusters)
upstreamClusters = failOverUpstreamClusters(fos, upstreamClusters)
}
} else {
wc := service.WeightedCluster{
Expand Down Expand Up @@ -210,6 +206,68 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
}
}

func activeUpstreamClusters(aas []service.ClusterName, backend split.TrafficSplitBackend, upstreamClusters []service.WeightedCluster) []service.WeightedCluster {
if len(aas) > 0 {
totalWeight := backend.Weight
for index, cn := range aas {
weight := totalWeight / (len(aas) - index)
totalWeight -= weight
wc := service.WeightedCluster{
ClusterName: cn,
Weight: weight,
}
upstreamClusters = append(upstreamClusters, wc)
}
}
return upstreamClusters
}

func failOverUpstreamClusters(fos []service.ClusterName, upstreamClusters []service.WeightedCluster) []service.WeightedCluster {
if len(fos) > 0 {
for _, cn := range fos {
wc := service.WeightedCluster{
ClusterName: cn,
Weight: constants.ClusterWeightFailOver,
}
upstreamClusters = append(upstreamClusters, wc)
}
}
return upstreamClusters
}

func (mc *MeshCatalog) isEgressService(meshSvc service.MeshService, egressPolicy *trafficpolicy.EgressTrafficPolicy) bool {
egressEnabled := false
hostnames := k8s.GetHostnamesForService(meshSvc, true)
for _, routeConfigs := range egressPolicy.HTTPRouteConfigsPerPort {
if egressEnabled {
break
}
if len(routeConfigs) == 0 {
continue
}
for _, routeConfig := range routeConfigs {
if egressEnabled {
break
}
if len(routeConfig.Hostnames) == 0 {
continue
}
for _, host := range routeConfig.Hostnames {
if egressEnabled {
break
}
for _, hostname := range hostnames {
if hostname == host {
egressEnabled = true
break
}
}
}
}
}
return egressEnabled
}

// ListOutboundServicesForIdentity list the services the given service account is allowed to initiate outbound connections to
// Note: ServiceIdentity must be in the format "name.namespace" [https://github.com/openservicemesh/osm/issues/3188]
func (mc *MeshCatalog) ListOutboundServicesForIdentity(serviceIdentity identity.ServiceIdentity) []service.MeshService {
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/outbound_traffic_policies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {

// Mock calls to k8s client caches
mockCfg.EXPECT().IsPermissiveTrafficPolicyMode().Return(tc.permissiveMode).AnyTimes()
mockCfg.EXPECT().IsEgressEnabled().Return(false).AnyTimes()
mockCfg.EXPECT().GetFeatureFlags().Return(configv1alpha2.FeatureFlags{}).AnyTimes()
mockServiceProvider.EXPECT().ListServices().Return(allMeshServices).AnyTimes()
mockMeshSpec.EXPECT().ListTrafficTargets().Return(trafficTargets).AnyTimes()
Expand All @@ -616,6 +617,7 @@ func TestGetOutboundMeshTrafficPolicy(t *testing.T) {
}
return nil
}).AnyTimes()
mockKubeController.EXPECT().IsMonitoredNamespace(gomock.Any()).Return(true).AnyTimes()
mockKubeController.EXPECT().GetTargetPortForServicePort(
types.NamespacedName{Namespace: meshSvc3V1.Namespace, Name: meshSvc3V1.Name}, meshSvc3.Port).Return(meshSvc3V1.TargetPort, nil).AnyTimes()
mockKubeController.EXPECT().GetTargetPortForServicePort(
Expand Down
10 changes: 5 additions & 5 deletions pkg/multicluster/service_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *Client) GetEndpoints(svc service.MeshService) (*corev1.Endpoints, error
targetEndpoints := new(corev1.Endpoints)
targetEndpoints.Namespace = importedService.Namespace
targetEndpoints.Name = importedService.Name
targetEndpoints.Annotations = make(map[string]string)
for _, endpoint := range port.Endpoints {
if svc.TargetPort > 0 && svc.TargetPort != uint16(endpoint.Target.Port) {
continue
Expand All @@ -215,11 +216,10 @@ func (c *Client) GetEndpoints(svc service.MeshService) (*corev1.Endpoints, error
lbWeight = weight
}
}
targetEndpoints.Annotations = make(map[string]string)
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportClusterKeyAnnotation, endpoint.Target.Port)] = endpoint.ClusterKey
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportContextPathAnnotation, endpoint.Target.Port)] = endpoint.Target.Path
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportLBTypeAnnotation, endpoint.Target.Port)] = string(lbType)
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportLBWeightAnnotation, endpoint.Target.Port)] = fmt.Sprintf("%d", lbWeight)
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportClusterKeyAnnotation, endpoint.Target.IP, endpoint.Target.Port)] = endpoint.ClusterKey
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportContextPathAnnotation, endpoint.Target.IP, endpoint.Target.Port)] = endpoint.Target.Path
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportLBTypeAnnotation, endpoint.Target.IP, endpoint.Target.Port)] = string(lbType)
targetEndpoints.Annotations[fmt.Sprintf(ServiceImportLBWeightAnnotation, endpoint.Target.IP, endpoint.Target.Port)] = fmt.Sprintf("%d", lbWeight)
targetEndpoints.Subsets = append(targetEndpoints.Subsets, corev1.EndpointSubset{
Addresses: []corev1.EndpointAddress{
{
Expand Down
8 changes: 4 additions & 4 deletions pkg/multicluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import (

const (
// ServiceImportClusterKeyAnnotation is the annotation used to configure context path for imported service
ServiceImportClusterKeyAnnotation = "flomesh.io/ServiceImport/ClusterKey/%d"
ServiceImportClusterKeyAnnotation = "flomesh.io/ServiceImport/ClusterKey/%s/%d"

// ServiceImportContextPathAnnotation is the annotation used to configure context path for imported service
ServiceImportContextPathAnnotation = "flomesh.io/ServiceImport/ContextPath/%d"
ServiceImportContextPathAnnotation = "flomesh.io/ServiceImport/ContextPath/%s/%d"

// ServiceImportLBTypeAnnotation is the annotation used to configure load balancer type for imported service
ServiceImportLBTypeAnnotation = "flomesh.io/ServiceImport/LBType/%d"
ServiceImportLBTypeAnnotation = "flomesh.io/ServiceImport/LBType/%s/%d"

// ServiceImportLBWeightAnnotation is the annotation used to configure load balancer weight for imported service
ServiceImportLBWeightAnnotation = "flomesh.io/ServiceImport/LBWeight/%d"
ServiceImportLBWeightAnnotation = "flomesh.io/ServiceImport/LBWeight/%s/%d"

// AnyServiceAccount defines wildcard service account
AnyServiceAccount = "*"
Expand Down
8 changes: 4 additions & 4 deletions pkg/providers/fsm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ func (c *client) ListEndpointsForService(svc service.MeshService) []endpoint.End
log.Error().Msgf("Error parsing endpoint IP address %s for MeshService %s", address.IP, svc)
continue
}
weight, _ := strconv.ParseUint(kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportLBWeightAnnotation, port.Port)], 10, 32)
weight, _ := strconv.ParseUint(kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportLBWeightAnnotation, address.IP, port.Port)], 10, 32)
ept := endpoint.Endpoint{
IP: ip,
Port: endpoint.Port(port.Port),
ClusterKey: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportClusterKeyAnnotation, port.Port)],
LBType: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportLBTypeAnnotation, port.Port)],
ClusterKey: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportClusterKeyAnnotation, address.IP, port.Port)],
LBType: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportLBTypeAnnotation, address.IP, port.Port)],
Weight: endpoint.Weight(weight),
Path: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportContextPathAnnotation, port.Port)],
Path: kubernetesEndpoints.Annotations[fmt.Sprintf(multicluster.ServiceImportContextPathAnnotation, address.IP, port.Port)],
}
endpoints = append(endpoints, ept)
}
Expand Down
Loading