Skip to content

Commit

Permalink
backpressure: filter policy based on app label (openservicemesh#1454)
Browse files Browse the repository at this point in the history
Only apply the backpressure policy for an upstream cluster
if the namespace and app label in the policy match.
  • Loading branch information
shashankram authored Aug 11, 2020
1 parent cc75e51 commit d543df0
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 39 deletions.
38 changes: 38 additions & 0 deletions demo/deploy-backpressure-policy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash

set -aueo pipefail

# shellcheck disable=SC1091
source .env

echo "Apply backpressure policy for the bookstore-v1 service"
kubectl apply -f - <<EOF
apiVersion: policy.openservicemesh.io/v1alpha1
kind: Backpressure
metadata:
name: max-connections-bookstore-v1
namespace: "${BOOKSTORE_NAMESPACE}"
labels:
app: bookstore-v1
spec:
maxConnections: 5
EOF

echo "Apply backpressure policy for the bookstore-v2 service"
kubectl apply -f - <<EOF
apiVersion: policy.openservicemesh.io/v1alpha1
kind: Backpressure
metadata:
name: max-connections-connections-bookstore-v2
namespace: "${BOOKSTORE_NAMESPACE}"
labels:
app: bookstore-v2
spec:
maxConnections: 5
EOF
23 changes: 0 additions & 23 deletions demo/deploy-backpressure-spec.sh

This file was deleted.

23 changes: 10 additions & 13 deletions pkg/envoy/cds/backpressure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,21 @@ import (
xds_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/service"
)

func enableBackpressure(catalog catalog.MeshCataloger, remoteCluster *xds_cluster.Cluster) {
func enableBackpressure(catalog catalog.MeshCataloger, remoteCluster *xds_cluster.Cluster, svc service.MeshService) {
log.Info().Msgf("Enabling backpressure in service cluster")
// Backpressure CRD only has one backpressure obj as a global config
// TODO: Add specific backpressure settings for individual clients
backpressures := catalog.GetSMISpec().ListBackpressures()

// TODO: filter backpressures on labels (backpressures[i].ObjectMeta.Labels) that match that of the destination service (trafficPolicies.Destination)

log.Trace().Msgf("Backpressures (%d found): %+v", len(backpressures), backpressures)

if len(backpressures) > 0 {
log.Trace().Msgf("Backpressure Spec: %+v", backpressures[0].Spec)

remoteCluster.CircuitBreakers = &xds_cluster.CircuitBreakers{
Thresholds: makeThresholds(&backpressures[0].Spec.MaxConnections),
}
backpressure := catalog.GetSMISpec().GetBackpressurePolicy(svc)
if backpressure == nil {
log.Trace().Msgf("Backpressure policy not found for service %s", svc)
return
}

log.Trace().Msgf("Backpressure Spec: %+v", backpressure.Spec)
remoteCluster.CircuitBreakers = &xds_cluster.CircuitBreakers{
Thresholds: makeThresholds(&backpressure.Spec.MaxConnections),
}
}
2 changes: 1 addition & 1 deletion pkg/envoy/cds/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewResponse(_ context.Context, catalog catalog.MeshCataloger, proxy *envoy.
}

if featureflags.IsBackpressureEnabled() {
enableBackpressure(catalog, remoteCluster)
enableBackpressure(catalog, remoteCluster, dstService)
}

clusterFactories[remoteCluster.Name] = remoteCluster
Expand Down
33 changes: 32 additions & 1 deletion pkg/smi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *Client) ListBackpressures() []*backpressure.Backpressure {
for _, pressureIface := range c.caches.Backpressure.List() {
bpressure, ok := pressureIface.(*backpressure.Backpressure)
if !ok {
log.Error().Err(errInvalidObjectType).Msgf("Object obtained from cache is not *Backpressure")
log.Error().Err(errInvalidObjectType).Msgf("Failed type assertion for Backpressure in cache")
continue
}

Expand All @@ -243,6 +243,37 @@ func (c *Client) ListBackpressures() []*backpressure.Backpressure {
return backpressureList
}

// GetBackpressurePolicy gets the Backpressure policy corresponding to the MeshService
func (c *Client) GetBackpressurePolicy(svc service.MeshService) *backpressure.Backpressure {
if !featureflags.IsBackpressureEnabled() {
log.Info().Msgf("Backpressure turned off!")
return nil
}

for _, iface := range c.caches.Backpressure.List() {
backpressure, ok := iface.(*backpressure.Backpressure)
if !ok {
log.Error().Err(errInvalidObjectType).Msgf("Failed type assertion for Backpressure in cache")
continue
}

if !c.namespaceController.IsMonitoredNamespace(backpressure.Namespace) {
continue
}

app, ok := backpressure.Labels["app"]
if !ok {
continue
}

if svc.Namespace == backpressure.Namespace && svc.Name == app {
return backpressure
}
}

return nil
}

// ListTrafficSplitServices implements mesh.MeshSpec by returning the services observed from the given compute provider
func (c *Client) ListTrafficSplitServices() []service.WeightedService {
var services []service.WeightedService
Expand Down
13 changes: 13 additions & 0 deletions pkg/smi/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func (f fakeMeshSpec) ListBackpressures() []*backpressure.Backpressure {
return f.backpressures
}

func (f fakeMeshSpec) GetBackpressurePolicy(svc service.MeshService) *backpressure.Backpressure {
for _, backpressure := range f.backpressures {
app, ok := backpressure.Labels["app"]
if !ok {
continue
}
if svc.Namespace == backpressure.Namespace && svc.Name == app {
return backpressure
}
}
return nil
}

// GetAnnouncementsChannel returns the channel on which SMI makes announcements for the fake Mesh Spec.
func (f fakeMeshSpec) GetAnnouncementsChannel() <-chan interface{} {
return make(chan interface{})
Expand Down
5 changes: 4 additions & 1 deletion pkg/smi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ type MeshSpec interface {
// ListTrafficTargets lists TrafficTarget SMI resources.
ListTrafficTargets() []*target.TrafficTarget

// ListBackpressures lists Backpressure CRD resources.
// ListBackpressures lists Backpressure resources.
// This is an experimental feature, which will eventually
// in some shape or form make its way into SMI Spec.
ListBackpressures() []*backpressure.Backpressure

// GetBackpressurePolicy gets the Backpressure policy corresponding to the MeshService
GetBackpressurePolicy(service.MeshService) *backpressure.Backpressure

// GetAnnouncementsChannel returns the channel on which SMI makes announcements
GetAnnouncementsChannel() <-chan interface{}

Expand Down

0 comments on commit d543df0

Please sign in to comment.