Skip to content

Commit

Permalink
EgressQoS controller implementation
Browse files Browse the repository at this point in the history
Add the EgressQoS controller as described in openshift/enhancements#1035

Signed-off-by: Ori Braunshtein <obraunsh@redhat.com>
  • Loading branch information
oribon committed Apr 26, 2022
1 parent 0702904 commit a3e7a78
Show file tree
Hide file tree
Showing 14 changed files with 1,836 additions and 11 deletions.
7 changes: 7 additions & 0 deletions go-controller/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ type MetricsConfig struct {
type OVNKubernetesFeatureConfig struct {
EnableEgressIP bool `gcfg:"enable-egress-ip"`
EnableEgressFirewall bool `gcfg:"enable-egress-firewall"`
EnableEgressQoS bool `gcfg:"enable-egress-qos"`
}

// GatewayMode holds the node gateway mode
Expand Down Expand Up @@ -834,6 +835,12 @@ var OVNK8sFeatureFlags = []cli.Flag{
Destination: &cliConfig.OVNKubernetesFeature.EnableEgressFirewall,
Value: OVNKubernetesFeature.EnableEgressFirewall,
},
&cli.BoolFlag{
Name: "enable-egress-qos",
Usage: "Configure to use EgressQoS CRD feature with ovn-kubernetes.",
Destination: &cliConfig.OVNKubernetesFeature.EnableEgressQoS,
Value: OVNKubernetesFeature.EnableEgressQoS,
},
}

// K8sFlags capture Kubernetes-related options
Expand Down
66 changes: 55 additions & 11 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
ocpcloudnetworkinformerfactory "github.com/openshift/client-go/cloudnetwork/informers/externalversions"
ocpcloudnetworklister "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1"

egressqosapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1"
egressqosscheme "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/scheme"
egressqosinformerfactory "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/informers/externalversions"
egressqosinformer "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/informers/externalversions/egressqos/v1"

kapi "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,11 +48,12 @@ type WatchFactory struct {
// requirements with atomic accesses
handlerCounter uint64

iFactory informerfactory.SharedInformerFactory
eipFactory egressipinformerfactory.SharedInformerFactory
efFactory egressfirewallinformerfactory.SharedInformerFactory
cpipcFactory ocpcloudnetworkinformerfactory.SharedInformerFactory
informers map[reflect.Type]*informer
iFactory informerfactory.SharedInformerFactory
eipFactory egressipinformerfactory.SharedInformerFactory
efFactory egressfirewallinformerfactory.SharedInformerFactory
cpipcFactory ocpcloudnetworkinformerfactory.SharedInformerFactory
egressQoSFactory egressqosinformerfactory.SharedInformerFactory
informers map[reflect.Type]*informer

stopChan chan struct{}
}
Expand Down Expand Up @@ -88,6 +94,7 @@ var (
EgressFirewallType reflect.Type = reflect.TypeOf(&egressfirewallapi.EgressFirewall{})
EgressIPType reflect.Type = reflect.TypeOf(&egressipapi.EgressIP{})
CloudPrivateIPConfigType reflect.Type = reflect.TypeOf(&ocpcloudnetworkapi.CloudPrivateIPConfig{})
EgressQoSType reflect.Type = reflect.TypeOf(&egressqosapi.EgressQoS{})
PeerServiceType reflect.Type = reflect.TypeOf(&peerService{})
PeerNamespaceAndPodSelectorType reflect.Type = reflect.TypeOf(&peerNamespaceAndPodSelector{})
PeerPodForNamespaceAndPodSelectorType reflect.Type = reflect.TypeOf(&peerPodForNamespaceAndPodSelector{})
Expand All @@ -105,12 +112,13 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro
// the downside of making it tight (like 10 minutes) is needless spinning on all resources
// However, AddEventHandlerWithResyncPeriod can specify a per handler resync period
wf := &WatchFactory{
iFactory: informerfactory.NewSharedInformerFactory(ovnClientset.KubeClient, resyncInterval),
eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval),
efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ovnClientset.EgressFirewallClient, resyncInterval),
cpipcFactory: ocpcloudnetworkinformerfactory.NewSharedInformerFactory(ovnClientset.CloudNetworkClient, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
iFactory: informerfactory.NewSharedInformerFactory(ovnClientset.KubeClient, resyncInterval),
eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval),
efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ovnClientset.EgressFirewallClient, resyncInterval),
cpipcFactory: ocpcloudnetworkinformerfactory.NewSharedInformerFactory(ovnClientset.CloudNetworkClient, resyncInterval),
egressQoSFactory: egressqosinformerfactory.NewSharedInformerFactory(ovnClientset.EgressQoSClient, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
}

if err := egressipapi.AddToScheme(egressipscheme.Scheme); err != nil {
Expand All @@ -119,6 +127,9 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro
if err := egressfirewallapi.AddToScheme(egressfirewallscheme.Scheme); err != nil {
return nil, err
}
if err := egressqosapi.AddToScheme(egressqosscheme.Scheme); err != nil {
return nil, err
}

// For Services and Endpoints, pre-populate the shared Informer with one that
// has a label selector excluding headless services.
Expand Down Expand Up @@ -189,6 +200,13 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro
return nil, err
}
}
if config.OVNKubernetesFeature.EnableEgressQoS {
wf.informers[EgressQoSType], err = newInformer(EgressQoSType, wf.egressQoSFactory.K8s().V1().EgressQoSes().Informer())
if err != nil {
return nil, err
}
}

return wf, nil
}

Expand Down Expand Up @@ -224,6 +242,15 @@ func (wf *WatchFactory) Start() error {
}
}
}
if config.OVNKubernetesFeature.EnableEgressQoS && wf.egressQoSFactory != nil {
wf.egressQoSFactory.Start(wf.stopChan)
for oType, synced := range wf.egressQoSFactory.WaitForCacheSync(wf.stopChan) {
if !synced {
return fmt.Errorf("error in syncing cache for %v informer", oType)
}
}
}

return nil
}

Expand Down Expand Up @@ -499,6 +526,11 @@ func (wf *WatchFactory) RemoveEgressFirewallHandler(handler *Handler) {
wf.removeHandler(EgressFirewallType, handler)
}

// RemoveEgressQoSHandler removes an EgressQoS object event handler function
func (wf *WatchFactory) RemoveEgressQoSHandler(handler *Handler) {
wf.removeHandler(EgressQoSType, handler)
}

// AddEgressIPHandler adds a handler function that will be executed on EgressIP object changes
func (wf *WatchFactory) AddEgressIPHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) *Handler {
return wf.addHandler(EgressIPType, "", nil, handlerFuncs, processExisting)
Expand Down Expand Up @@ -659,6 +691,10 @@ func (wf *WatchFactory) NodeInformer() cache.SharedIndexInformer {
return wf.informers[NodeType].inf
}

func (wf *WatchFactory) NodeCoreInformer() v1coreinformers.NodeInformer {
return wf.iFactory.Core().V1().Nodes()
}

// LocalPodInformer returns a shared Informer that may or may not only
// return pods running on the local node.
func (wf *WatchFactory) LocalPodInformer() cache.SharedIndexInformer {
Expand All @@ -669,6 +705,10 @@ func (wf *WatchFactory) PodInformer() cache.SharedIndexInformer {
return wf.informers[PodType].inf
}

func (wf *WatchFactory) PodCoreInformer() v1coreinformers.PodInformer {
return wf.iFactory.Core().V1().Pods()
}

func (wf *WatchFactory) NamespaceInformer() cache.SharedIndexInformer {
return wf.informers[NamespaceType].inf
}
Expand All @@ -677,6 +717,10 @@ func (wf *WatchFactory) ServiceInformer() cache.SharedIndexInformer {
return wf.informers[ServiceType].inf
}

func (wf *WatchFactory) EgressQoSInformer() egressqosinformer.EgressQoSInformer {
return wf.egressQoSFactory.K8s().V1().EgressQoSes()
}

// noHeadlessServiceSelector is a LabelSelector added to the watch for
// Endpoints (and, eventually, EndpointSlices) that excludes endpoints
// for headless services.
Expand Down
97 changes: 97 additions & 0 deletions go-controller/pkg/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"

egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake"
Expand All @@ -26,6 +27,9 @@ import (
egressip "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1"
egressipfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/fake"

egressqos "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1"
egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake"

ocpcloudnetworkapi "github.com/openshift/api/cloudnetwork/v1"
ocpconfigapi "github.com/openshift/api/config/v1"
ocpcloudnetworkclientsetfake "github.com/openshift/client-go/cloudnetwork/clientset/versioned/fake"
Expand Down Expand Up @@ -148,6 +152,20 @@ func newCloudPrivateIPConfig(name string) *ocpcloudnetworkapi.CloudPrivateIPConf
}
}

func newEgressQoS(name, namespace string) *egressqos.EgressQoS {
return &egressqos.EgressQoS{
ObjectMeta: newObjectMeta(name, namespace),
Spec: egressqos.EgressQoSSpec{
Egress: []egressqos.EgressQoSRule{
{
DSCP: 50,
DstCIDR: pointer.String("1.2.3.4/32"),
},
},
},
}
}

func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
w := watch.NewFake()
c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
Expand Down Expand Up @@ -176,6 +194,13 @@ func cloudPrivateIPConfigObjSetup(c *ocpcloudnetworkclientsetfake.Clientset, obj
return w
}

func egressQoSObjSetup(c *egressqosfake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher {
w := watch.NewFake()
c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil))
c.AddReactor("list", objType, listFn)
return w
}

type handlerCalls struct {
added int32
updated int32
Expand All @@ -201,11 +226,13 @@ var _ = Describe("Watch Factory Operations", func() {
egressIPFakeClient *egressipfake.Clientset
egressFirewallFakeClient *egressfirewallfake.Clientset
cloudNetworkFakeClient *ocpcloudnetworkclientsetfake.Clientset
egressQoSFakeClient *egressqosfake.Clientset
podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher
policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher
egressFirewallWatch *watch.FakeWatcher
egressIPWatch *watch.FakeWatcher
cloudPrivateIPConfigWatch *watch.FakeWatcher
egressQoSWatch *watch.FakeWatcher
pods []*v1.Pod
namespaces []*v1.Namespace
nodes []*v1.Node
Expand All @@ -216,6 +243,7 @@ var _ = Describe("Watch Factory Operations", func() {
cloudPrivateIPConfigs []*ocpcloudnetworkapi.CloudPrivateIPConfig
wf *WatchFactory
egressFirewalls []*egressfirewall.EgressFirewall
egressQoSes []*egressqos.EgressQoS
err error
)

Expand All @@ -225,18 +253,21 @@ var _ = Describe("Watch Factory Operations", func() {
config.PrepareTestConfig()
config.OVNKubernetesFeature.EnableEgressIP = true
config.OVNKubernetesFeature.EnableEgressFirewall = true
config.OVNKubernetesFeature.EnableEgressQoS = true
config.Kubernetes.PlatformType = string(ocpconfigapi.AWSPlatformType)

fakeClient = &fake.Clientset{}
egressFirewallFakeClient = &egressfirewallfake.Clientset{}
egressIPFakeClient = &egressipfake.Clientset{}
cloudNetworkFakeClient = &ocpcloudnetworkclientsetfake.Clientset{}
egressQoSFakeClient = &egressqosfake.Clientset{}

ovnClientset = &util.OVNClientset{
KubeClient: fakeClient,
EgressIPClient: egressIPFakeClient,
EgressFirewallClient: egressFirewallFakeClient,
CloudNetworkClient: cloudNetworkFakeClient,
EgressQoSClient: egressQoSFakeClient,
}

pods = make([]*v1.Pod, 0)
Expand Down Expand Up @@ -319,6 +350,15 @@ var _ = Describe("Watch Factory Operations", func() {
}
return true, obj, nil
})

egressQoSes = make([]*egressqos.EgressQoS, 0)
egressQoSWatch = egressQoSObjSetup(egressQoSFakeClient, "egressqoses", func(core.Action) (bool, runtime.Object, error) {
obj := &egressqos.EgressQoSList{}
for _, p := range egressQoSes {
obj.Items = append(obj.Items, *p)
}
return true, obj, nil
})
})

AfterEach(func() {
Expand Down Expand Up @@ -383,6 +423,10 @@ var _ = Describe("Watch Factory Operations", func() {
cloudPrivateIPConfigs = append(cloudPrivateIPConfigs, newCloudPrivateIPConfig("192.168.176.25"))
testExisting(CloudPrivateIPConfigType, "", nil)
})
It("is called for each existing egressQoS", func() {
egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS", "default"))
testExisting(EgressQoSType, "", nil)
})

It("is called for each existing pod that matches a given namespace and label", func() {
pod := newPod("pod1", "default")
Expand Down Expand Up @@ -469,6 +513,11 @@ var _ = Describe("Watch Factory Operations", func() {
cloudPrivateIPConfigs = append(cloudPrivateIPConfigs, newCloudPrivateIPConfig("192.168.126.26"))
testExisting(CloudPrivateIPConfigType)
})
It("calls ADD for each existing egressQoS", func() {
egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS", "default"))
egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS1", "default"))
testExisting(EgressQoSType)
})
})

Context("when EgressIP is disabled", func() {
Expand Down Expand Up @@ -497,6 +546,19 @@ var _ = Describe("Watch Factory Operations", func() {
testExisting(EgressFirewallType)
})
})
Context("when EgressQoS is disabled", func() {
testExisting := func(objType reflect.Type) {
wf, err = NewMasterWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.Start()
Expect(err).NotTo(HaveOccurred())
Expect(wf.informers).NotTo(HaveKey(objType))
}
It("does not contain EgressQoS informer", func() {
config.OVNKubernetesFeature.EnableEgressQoS = false
testExisting(EgressQoSType)
})
})

addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, sel labels.Selector, funcs cache.ResourceEventHandlerFuncs) (*Handler, *handlerCalls) {
calls := handlerCalls{}
Expand Down Expand Up @@ -1181,6 +1243,41 @@ var _ = Describe("Watch Factory Operations", func() {

wf.RemoveCloudPrivateIPConfigHandler(h)
})
It("responds to egressQoS add/update/delete events", func() {
wf, err = NewMasterWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.Start()
Expect(err).NotTo(HaveOccurred())

added := newEgressQoS("myEgressQoS", "default")
h, c := addHandler(wf, EgressQoSType, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
egressQoS := obj.(*egressqos.EgressQoS)
Expect(reflect.DeepEqual(egressQoS, added)).To(BeTrue())
},
UpdateFunc: func(old, new interface{}) {
newEgressQoS := new.(*egressqos.EgressQoS)
Expect(reflect.DeepEqual(newEgressQoS, added)).To(BeTrue())
Expect(newEgressQoS.Spec.Egress[0].DSCP).To(Equal(40))
},
DeleteFunc: func(obj interface{}) {
egressQoS := obj.(*egressqos.EgressQoS)
Expect(reflect.DeepEqual(egressQoS, added)).To(BeTrue())
},
})

egressQoSes = append(egressQoSes, added)
egressQoSWatch.Add(added)
Eventually(c.getAdded, 2).Should(Equal(1))
added.Spec.Egress[0].DSCP = 40
egressQoSWatch.Modify(added)
Eventually(c.getUpdated, 2).Should(Equal(1))
egressQoSes = egressQoSes[:0]
egressQoSWatch.Delete(added)
Eventually(c.getDeleted, 2).Should(Equal(1))

wf.RemoveEgressQoSHandler(h)
})
It("stops processing events after the handler is removed", func() {
wf, err = NewMasterWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
Expand Down
3 changes: 3 additions & 0 deletions go-controller/pkg/factory/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"

egressfirewalllister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/listers/egressfirewall/v1"
egressqoslister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/listers/egressqos/v1"

cloudprivateipconfiglister "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1"
egressiplister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/listers/egressip/v1"
Expand Down Expand Up @@ -374,6 +375,8 @@ func newInformerLister(oType reflect.Type, sharedInformer cache.SharedIndexInfor
return egressiplister.NewEgressIPLister(sharedInformer.GetIndexer()), nil
case CloudPrivateIPConfigType:
return cloudprivateipconfiglister.NewCloudPrivateIPConfigLister(sharedInformer.GetIndexer()), nil
case EgressQoSType:
return egressqoslister.NewEgressQoSLister(sharedInformer.GetIndexer()), nil
}

return nil, fmt.Errorf("cannot create lister from type %v", oType)
Expand Down
Loading

0 comments on commit a3e7a78

Please sign in to comment.