Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

traffic-split: update root service selector & targetPort usage #4902

Merged
merged 2 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions ci/cmd/maestro.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ const (
// Pod labels
bookBuyerLabel = "bookbuyer"
bookThiefLabel = "bookthief"
bookstoreV1Label = "bookstore-v1"
bookstoreV2Label = "bookstore-v2"
bookstoreLabel = "bookstore"
bookWarehouseLabel = "bookwarehouse"
mySQLLabel = "mysql"
)
Expand All @@ -37,8 +36,8 @@ var (
osmControllerPodSelector = fmt.Sprintf("%s=%s", constants.AppLabel, constants.OSMControllerName)
bookThiefSelector = fmt.Sprintf("%s=%s", constants.AppLabel, bookThiefLabel)
bookBuyerSelector = fmt.Sprintf("%s=%s", constants.AppLabel, bookBuyerLabel)
bookstoreV1Selector = fmt.Sprintf("%s=%s", constants.AppLabel, bookstoreV1Label)
bookstoreV2Selector = fmt.Sprintf("%s=%s", constants.AppLabel, bookstoreV2Label)
bookstoreV1Selector = fmt.Sprintf("%s=%s,version=v1", constants.AppLabel, bookstoreLabel)
bookstoreV2Selector = fmt.Sprintf("%s=%s,version=v2", constants.AppLabel, bookstoreLabel)
bookWarehouseSelector = fmt.Sprintf("%s=%s", constants.AppLabel, bookWarehouseLabel)
mySQLSelector = fmt.Sprintf("%s=%s", constants.AppLabel, mySQLLabel)

Expand All @@ -61,7 +60,7 @@ var (
)

func main() {
log.Debug().Msgf("Looking for: %s/%s, %s/%s, %s/%s, %s/%s, %s/%s %s/%s", bookBuyerLabel, bookbuyerNS, bookThiefLabel, bookthiefNS, bookstoreV1Label, bookstoreNS, bookstoreV2Label, bookstoreNS, bookWarehouseLabel, bookWarehouseNS, mySQLLabel, bookWarehouseNS)
log.Debug().Msgf("Looking for: %s/%s, %s/%s, %s/%s, %s/%s %s/%s", bookBuyerLabel, bookbuyerNS, bookThiefLabel, bookthiefNS, bookstoreLabel, bookstoreNS, bookWarehouseLabel, bookWarehouseNS, mySQLLabel, bookWarehouseNS)

kubeClient := maestro.GetKubernetesClient()

Expand Down
13 changes: 7 additions & 6 deletions demo/deploy-bookstore.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ KUBE_CONTEXT=$(kubectl config current-context)

kubectl delete deployment "$SVC" -n "$BOOKSTORE_NAMESPACE" --ignore-not-found

echo -e "Deploy bookstore Service"
echo -e "Deploy root bookstore Service"
kubectl apply -f - <<EOF
apiVersion: v1
kind: Service
Expand Down Expand Up @@ -53,14 +53,15 @@ metadata:
name: $SVC
namespace: $BOOKSTORE_NAMESPACE
labels:
app: $SVC
app: bookstore
version: $VERSION
spec:
ports:
- port: 14001
name: bookstore-port

selector:
app: $SVC
app: bookstore
version: $VERSION
EOF

echo -e "Deploy $SVC Deployment"
Expand All @@ -74,12 +75,12 @@ spec:
replicas: 1
selector:
matchLabels:
app: $SVC
app: bookstore
version: $VERSION
template:
metadata:
labels:
app: $SVC
app: bookstore
version: $VERSION
spec:
serviceAccountName: "$SVC"
Expand Down
2 changes: 2 additions & 0 deletions pkg/catalog/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func NewFakeMeshCatalog(kubeClient kubernetes.Interface, meshConfigClient config
mockKubeController.EXPECT().ListServiceIdentitiesForService(tests.BookstoreV1Service).Return([]identity.K8sServiceAccount{tests.BookstoreServiceAccount}, nil).AnyTimes()
mockKubeController.EXPECT().ListServiceIdentitiesForService(tests.BookstoreV2Service).Return([]identity.K8sServiceAccount{tests.BookstoreV2ServiceAccount}, nil).AnyTimes()
mockKubeController.EXPECT().ListServiceIdentitiesForService(tests.BookbuyerService).Return([]identity.K8sServiceAccount{tests.BookbuyerServiceAccount}, nil).AnyTimes()
mockKubeController.EXPECT().GetTargetPortForServicePort(
gomock.Any(), gomock.Any()).Return(uint16(tests.ServicePort), nil).AnyTimes()

mockPolicyController.EXPECT().ListEgressPoliciesForSourceIdentity(gomock.Any()).Return(nil).AnyTimes()
mockPolicyController.EXPECT().GetIngressBackendPolicy(gomock.Any()).Return(nil).AnyTimes()
Expand Down
77 changes: 13 additions & 64 deletions pkg/catalog/outbound_traffic_policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package catalog

import (
mapset "github.com/deckarep/golang-set"
"k8s.io/apimachinery/pkg/types"

"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.

// For each service, build the traffic policies required to access it.
// It is important to aggregate HTTP route configs by the service's port.
for _, meshSvc := range mc.listAllowedUpstreamServicesIncludeApex(downstreamIdentity) {
for _, meshSvc := range mc.ListOutboundServicesForIdentity(downstreamIdentity) {
meshSvc := meshSvc // To prevent loop variable memory aliasing in for loop

// Retrieve the destination IP address from the endpoints for this service
Expand Down Expand Up @@ -72,12 +73,20 @@ func (mc *MeshCatalog) GetOutboundMeshTrafficPolicy(downstreamIdentity identity.
if len(trafficSplits) != 0 {
// Program routes to the backends specified in the traffic split
split := trafficSplits[0] // TODO(#2759): support multiple traffic splits per apex service

for _, backend := range split.Spec.Backends {
backendMeshSvc := service.MeshService{
Namespace: meshSvc.Namespace, // Backends belong to the same namespace as the apex service
Name: backend.Service,
TargetPort: meshSvc.TargetPort,
Namespace: meshSvc.Namespace, // Backends belong to the same namespace as the apex service
Name: backend.Service,
}
targetPort, err := mc.kubeController.GetTargetPortForServicePort(
types.NamespacedName{Namespace: backendMeshSvc.Namespace, Name: backendMeshSvc.Name}, meshSvc.Port)
if err != nil {
log.Error().Err(err).Msgf("Error fetching target port for leaf service %s, ignoring it", backendMeshSvc)
continue
}
backendMeshSvc.TargetPort = targetPort

wc := service.WeightedCluster{
ClusterName: service.ClusterName(backendMeshSvc.EnvoyClusterName()),
Weight: backend.Weight,
Expand Down Expand Up @@ -166,63 +175,3 @@ func (mc *MeshCatalog) ListOutboundServicesForIdentity(serviceIdentity identity.

return allowedServices
}

// listAllowedUpstreamServicesIncludeApex returns a list of services the given downstream service identity
// is authorized to communicate with, including traffic split apex services that are not backed by
// pods as well as other sibling pods from the same headless service.
func (mc *MeshCatalog) listAllowedUpstreamServicesIncludeApex(downstreamIdentity identity.ServiceIdentity) []service.MeshService {
upstreamServices := mc.ListOutboundServicesForIdentity(downstreamIdentity)
if len(upstreamServices) == 0 {
log.Debug().Msgf("Downstream identity %s does not have any allowed upstream services", downstreamIdentity)
return nil
}

dstServicesSet := make(map[service.MeshService]struct{}) // mapset to avoid duplicates
for _, upstreamSvc := range upstreamServices {
// All upstreams with an endpoint are expected to have TargetPort set.
// Only a TrafficSplit apex service (virtual service) that does not have endpoints
// will have an unset TargetPort. We will not include such a service in the initial
// set because it will be correctly added to the set later on when each upstream
// service is matched to a TrafficSplit object. This is important to avoid duplicate
// TrafficSplit apex/virtual service from being computed with and without TargetPort set.
if upstreamSvc.TargetPort != 0 {
dstServicesSet[upstreamSvc] = struct{}{}
}
}

// Getting apex services referring to the outbound services
// We get possible apexes which could traffic split to any of the possible
// outbound services
splitPolicy := mc.meshSpec.ListTrafficSplits()

for upstreamSvc := range dstServicesSet {
for _, split := range splitPolicy {
// Split policy must be in the same namespace as the upstream service that is a backend
if split.Namespace != upstreamSvc.Namespace {
continue
}
for _, backend := range split.Spec.Backends {
if backend.Service == upstreamSvc.Name {
rootServiceName := k8s.GetServiceFromHostname(mc.kubeController, split.Spec.Service)
rootMeshService := service.MeshService{
Namespace: split.Namespace,
Name: rootServiceName,
Port: upstreamSvc.Port,
TargetPort: upstreamSvc.TargetPort,
Protocol: upstreamSvc.Protocol,
}

// Add this root service into the set
dstServicesSet[rootMeshService] = struct{}{}
}
}
}
}

dstServices := make([]service.MeshService, 0, len(dstServicesSet))
for svc := range dstServicesSet {
dstServices = append(dstServices, svc)
}

return dstServices
}
Loading