diff --git a/go-controller/pkg/config/config.go b/go-controller/pkg/config/config.go index ecdbe3915da..f7be7f8b8b9 100644 --- a/go-controller/pkg/config/config.go +++ b/go-controller/pkg/config/config.go @@ -315,6 +315,7 @@ type MetricsConfig struct { type OVNKubernetesFeatureConfig struct { EnableEgressIP bool `gcfg:"enable-egress-ip"` EnableEgressFirewall bool `gcfg:"enable-egress-firewall"` + EnableEgressQoS bool `gcfg:"enable-egress-qos"` } // GatewayMode holds the node gateway mode @@ -834,6 +835,12 @@ var OVNK8sFeatureFlags = []cli.Flag{ Destination: &cliConfig.OVNKubernetesFeature.EnableEgressFirewall, Value: OVNKubernetesFeature.EnableEgressFirewall, }, + &cli.BoolFlag{ + Name: "enable-egress-qos", + Usage: "Configure to use EgressQoS CRD feature with ovn-kubernetes.", + Destination: &cliConfig.OVNKubernetesFeature.EnableEgressQoS, + Value: OVNKubernetesFeature.EnableEgressQoS, + }, } // K8sFlags capture Kubernetes-related options diff --git a/go-controller/pkg/factory/factory.go b/go-controller/pkg/factory/factory.go index 45ea03b69db..b14cb52cd50 100644 --- a/go-controller/pkg/factory/factory.go +++ b/go-controller/pkg/factory/factory.go @@ -22,6 +22,11 @@ import ( ocpcloudnetworkinformerfactory "github.com/openshift/client-go/cloudnetwork/informers/externalversions" ocpcloudnetworklister "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1" + egressqosapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1" + egressqosscheme "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/scheme" + egressqosinformerfactory "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/informers/externalversions" + egressqosinformer "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/informers/externalversions/egressqos/v1" + kapi "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,11 +48,12 @@ type WatchFactory struct { // requirements with atomic accesses handlerCounter uint64 - iFactory informerfactory.SharedInformerFactory - eipFactory egressipinformerfactory.SharedInformerFactory - efFactory egressfirewallinformerfactory.SharedInformerFactory - cpipcFactory ocpcloudnetworkinformerfactory.SharedInformerFactory - informers map[reflect.Type]*informer + iFactory informerfactory.SharedInformerFactory + eipFactory egressipinformerfactory.SharedInformerFactory + efFactory egressfirewallinformerfactory.SharedInformerFactory + cpipcFactory ocpcloudnetworkinformerfactory.SharedInformerFactory + egressQoSFactory egressqosinformerfactory.SharedInformerFactory + informers map[reflect.Type]*informer stopChan chan struct{} } @@ -88,6 +94,7 @@ var ( EgressFirewallType reflect.Type = reflect.TypeOf(&egressfirewallapi.EgressFirewall{}) EgressIPType reflect.Type = reflect.TypeOf(&egressipapi.EgressIP{}) CloudPrivateIPConfigType reflect.Type = reflect.TypeOf(&ocpcloudnetworkapi.CloudPrivateIPConfig{}) + EgressQoSType reflect.Type = reflect.TypeOf(&egressqosapi.EgressQoS{}) PeerServiceType reflect.Type = reflect.TypeOf(&peerService{}) PeerNamespaceAndPodSelectorType reflect.Type = reflect.TypeOf(&peerNamespaceAndPodSelector{}) PeerPodForNamespaceAndPodSelectorType reflect.Type = reflect.TypeOf(&peerPodForNamespaceAndPodSelector{}) @@ -105,12 +112,13 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro // the downside of making it tight (like 10 minutes) is needless spinning on all resources // However, AddEventHandlerWithResyncPeriod can specify a per handler resync period wf := &WatchFactory{ - iFactory: informerfactory.NewSharedInformerFactory(ovnClientset.KubeClient, resyncInterval), - eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval), - efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ovnClientset.EgressFirewallClient, resyncInterval), - cpipcFactory: ocpcloudnetworkinformerfactory.NewSharedInformerFactory(ovnClientset.CloudNetworkClient, resyncInterval), - informers: make(map[reflect.Type]*informer), - stopChan: make(chan struct{}), + iFactory: informerfactory.NewSharedInformerFactory(ovnClientset.KubeClient, resyncInterval), + eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval), + efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ovnClientset.EgressFirewallClient, resyncInterval), + cpipcFactory: ocpcloudnetworkinformerfactory.NewSharedInformerFactory(ovnClientset.CloudNetworkClient, resyncInterval), + egressQoSFactory: egressqosinformerfactory.NewSharedInformerFactory(ovnClientset.EgressQoSClient, resyncInterval), + informers: make(map[reflect.Type]*informer), + stopChan: make(chan struct{}), } if err := egressipapi.AddToScheme(egressipscheme.Scheme); err != nil { @@ -119,6 +127,9 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro if err := egressfirewallapi.AddToScheme(egressfirewallscheme.Scheme); err != nil { return nil, err } + if err := egressqosapi.AddToScheme(egressqosscheme.Scheme); err != nil { + return nil, err + } // For Services and Endpoints, pre-populate the shared Informer with one that // has a label selector excluding headless services. @@ -189,6 +200,13 @@ func NewMasterWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, erro return nil, err } } + if config.OVNKubernetesFeature.EnableEgressQoS { + wf.informers[EgressQoSType], err = newInformer(EgressQoSType, wf.egressQoSFactory.K8s().V1().EgressQoSes().Informer()) + if err != nil { + return nil, err + } + } + return wf, nil } @@ -224,6 +242,15 @@ func (wf *WatchFactory) Start() error { } } } + if config.OVNKubernetesFeature.EnableEgressQoS && wf.egressQoSFactory != nil { + wf.egressQoSFactory.Start(wf.stopChan) + for oType, synced := range wf.egressQoSFactory.WaitForCacheSync(wf.stopChan) { + if !synced { + return fmt.Errorf("error in syncing cache for %v informer", oType) + } + } + } + return nil } @@ -499,6 +526,11 @@ func (wf *WatchFactory) RemoveEgressFirewallHandler(handler *Handler) { wf.removeHandler(EgressFirewallType, handler) } +// RemoveEgressQoSHandler removes an EgressQoS object event handler function +func (wf *WatchFactory) RemoveEgressQoSHandler(handler *Handler) { + wf.removeHandler(EgressQoSType, handler) +} + // AddEgressIPHandler adds a handler function that will be executed on EgressIP object changes func (wf *WatchFactory) AddEgressIPHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) *Handler { return wf.addHandler(EgressIPType, "", nil, handlerFuncs, processExisting) @@ -659,6 +691,11 @@ func (wf *WatchFactory) NodeInformer() cache.SharedIndexInformer { return wf.informers[NodeType].inf } +// TODO/WIP: should be obsoleted when node controller moves to level-driven +func (wf *WatchFactory) WIPNodeInformer() v1coreinformers.NodeInformer { + return wf.iFactory.Core().V1().Nodes() +} + // LocalPodInformer returns a shared Informer that may or may not only // return pods running on the local node. func (wf *WatchFactory) LocalPodInformer() cache.SharedIndexInformer { @@ -669,6 +706,11 @@ func (wf *WatchFactory) PodInformer() cache.SharedIndexInformer { return wf.informers[PodType].inf } +// TODO/WIP: should be obsoleted when pod controller moves to level-driven +func (wf *WatchFactory) WIPPodInformer() v1coreinformers.PodInformer { + return wf.iFactory.Core().V1().Pods() +} + func (wf *WatchFactory) NamespaceInformer() cache.SharedIndexInformer { return wf.informers[NamespaceType].inf } @@ -677,6 +719,10 @@ func (wf *WatchFactory) ServiceInformer() cache.SharedIndexInformer { return wf.informers[ServiceType].inf } +func (wf *WatchFactory) EgressQoSInformer() egressqosinformer.EgressQoSInformer { + return wf.egressQoSFactory.K8s().V1().EgressQoSes() +} + // noHeadlessServiceSelector is a LabelSelector added to the watch for // Endpoints (and, eventually, EndpointSlices) that excludes endpoints // for headless services. diff --git a/go-controller/pkg/factory/factory_test.go b/go-controller/pkg/factory/factory_test.go index ebec52bd322..2137eebd0e3 100644 --- a/go-controller/pkg/factory/factory_test.go +++ b/go-controller/pkg/factory/factory_test.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/utils/pointer" egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" @@ -26,6 +27,9 @@ import ( egressip "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1" egressipfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/fake" + egressqos "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1" + egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake" + ocpcloudnetworkapi "github.com/openshift/api/cloudnetwork/v1" ocpconfigapi "github.com/openshift/api/config/v1" ocpcloudnetworkclientsetfake "github.com/openshift/client-go/cloudnetwork/clientset/versioned/fake" @@ -148,6 +152,20 @@ func newCloudPrivateIPConfig(name string) *ocpcloudnetworkapi.CloudPrivateIPConf } } +func newEgressQoS(name, namespace string) *egressqos.EgressQoS { + return &egressqos.EgressQoS{ + ObjectMeta: newObjectMeta(name, namespace), + Spec: egressqos.EgressQoSSpec{ + Egress: []egressqos.EgressQoSRule{ + { + DSCP: 50, + DstCIDR: pointer.String("1.2.3.4/32"), + }, + }, + }, + } +} + func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { w := watch.NewFake() c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) @@ -176,6 +194,13 @@ func cloudPrivateIPConfigObjSetup(c *ocpcloudnetworkclientsetfake.Clientset, obj return w } +func egressQoSObjSetup(c *egressqosfake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { + w := watch.NewFake() + c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) + c.AddReactor("list", objType, listFn) + return w +} + type handlerCalls struct { added int32 updated int32 @@ -201,11 +226,13 @@ var _ = Describe("Watch Factory Operations", func() { egressIPFakeClient *egressipfake.Clientset egressFirewallFakeClient *egressfirewallfake.Clientset cloudNetworkFakeClient *ocpcloudnetworkclientsetfake.Clientset + egressQoSFakeClient *egressqosfake.Clientset podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher egressFirewallWatch *watch.FakeWatcher egressIPWatch *watch.FakeWatcher cloudPrivateIPConfigWatch *watch.FakeWatcher + egressQoSWatch *watch.FakeWatcher pods []*v1.Pod namespaces []*v1.Namespace nodes []*v1.Node @@ -216,6 +243,7 @@ var _ = Describe("Watch Factory Operations", func() { cloudPrivateIPConfigs []*ocpcloudnetworkapi.CloudPrivateIPConfig wf *WatchFactory egressFirewalls []*egressfirewall.EgressFirewall + egressQoSes []*egressqos.EgressQoS err error ) @@ -225,18 +253,21 @@ var _ = Describe("Watch Factory Operations", func() { config.PrepareTestConfig() config.OVNKubernetesFeature.EnableEgressIP = true config.OVNKubernetesFeature.EnableEgressFirewall = true + config.OVNKubernetesFeature.EnableEgressQoS = true config.Kubernetes.PlatformType = string(ocpconfigapi.AWSPlatformType) fakeClient = &fake.Clientset{} egressFirewallFakeClient = &egressfirewallfake.Clientset{} egressIPFakeClient = &egressipfake.Clientset{} cloudNetworkFakeClient = &ocpcloudnetworkclientsetfake.Clientset{} + egressQoSFakeClient = &egressqosfake.Clientset{} ovnClientset = &util.OVNClientset{ KubeClient: fakeClient, EgressIPClient: egressIPFakeClient, EgressFirewallClient: egressFirewallFakeClient, CloudNetworkClient: cloudNetworkFakeClient, + EgressQoSClient: egressQoSFakeClient, } pods = make([]*v1.Pod, 0) @@ -319,6 +350,15 @@ var _ = Describe("Watch Factory Operations", func() { } return true, obj, nil }) + + egressQoSes = make([]*egressqos.EgressQoS, 0) + egressQoSWatch = egressQoSObjSetup(egressQoSFakeClient, "egressqoses", func(core.Action) (bool, runtime.Object, error) { + obj := &egressqos.EgressQoSList{} + for _, p := range egressQoSes { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) }) AfterEach(func() { @@ -383,6 +423,10 @@ var _ = Describe("Watch Factory Operations", func() { cloudPrivateIPConfigs = append(cloudPrivateIPConfigs, newCloudPrivateIPConfig("192.168.176.25")) testExisting(CloudPrivateIPConfigType, "", nil) }) + It("is called for each existing egressQoS", func() { + egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS", "default")) + testExisting(EgressQoSType, "", nil) + }) It("is called for each existing pod that matches a given namespace and label", func() { pod := newPod("pod1", "default") @@ -469,6 +513,11 @@ var _ = Describe("Watch Factory Operations", func() { cloudPrivateIPConfigs = append(cloudPrivateIPConfigs, newCloudPrivateIPConfig("192.168.126.26")) testExisting(CloudPrivateIPConfigType) }) + It("calls ADD for each existing egressQoS", func() { + egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS", "default")) + egressQoSes = append(egressQoSes, newEgressQoS("myEgressQoS1", "default")) + testExisting(EgressQoSType) + }) }) Context("when EgressIP is disabled", func() { @@ -497,6 +546,19 @@ var _ = Describe("Watch Factory Operations", func() { testExisting(EgressFirewallType) }) }) + Context("when EgressQoS is disabled", func() { + testExisting := func(objType reflect.Type) { + wf, err = NewMasterWatchFactory(ovnClientset) + Expect(err).NotTo(HaveOccurred()) + err = wf.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(wf.informers).NotTo(HaveKey(objType)) + } + It("does not contain EgressQoS informer", func() { + config.OVNKubernetesFeature.EnableEgressQoS = false + testExisting(EgressQoSType) + }) + }) addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, sel labels.Selector, funcs cache.ResourceEventHandlerFuncs) (*Handler, *handlerCalls) { calls := handlerCalls{} @@ -1181,6 +1243,41 @@ var _ = Describe("Watch Factory Operations", func() { wf.RemoveCloudPrivateIPConfigHandler(h) }) + It("responds to egressQoS add/update/delete events", func() { + wf, err = NewMasterWatchFactory(ovnClientset) + Expect(err).NotTo(HaveOccurred()) + err = wf.Start() + Expect(err).NotTo(HaveOccurred()) + + added := newEgressQoS("myEgressQoS", "default") + h, c := addHandler(wf, EgressQoSType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + egressQoS := obj.(*egressqos.EgressQoS) + Expect(reflect.DeepEqual(egressQoS, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newEgressQoS := new.(*egressqos.EgressQoS) + Expect(reflect.DeepEqual(newEgressQoS, added)).To(BeTrue()) + Expect(newEgressQoS.Spec.Egress[0].DSCP).To(Equal(40)) + }, + DeleteFunc: func(obj interface{}) { + egressQoS := obj.(*egressqos.EgressQoS) + Expect(reflect.DeepEqual(egressQoS, added)).To(BeTrue()) + }, + }) + + egressQoSes = append(egressQoSes, added) + egressQoSWatch.Add(added) + Eventually(c.getAdded, 2).Should(Equal(1)) + added.Spec.Egress[0].DSCP = 40 + egressQoSWatch.Modify(added) + Eventually(c.getUpdated, 2).Should(Equal(1)) + egressQoSes = egressQoSes[:0] + egressQoSWatch.Delete(added) + Eventually(c.getDeleted, 2).Should(Equal(1)) + + wf.RemoveEgressQoSHandler(h) + }) It("stops processing events after the handler is removed", func() { wf, err = NewMasterWatchFactory(ovnClientset) Expect(err).NotTo(HaveOccurred()) diff --git a/go-controller/pkg/factory/handler.go b/go-controller/pkg/factory/handler.go index e84e38b4501..b07a5ff8efb 100644 --- a/go-controller/pkg/factory/handler.go +++ b/go-controller/pkg/factory/handler.go @@ -11,6 +11,7 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics" egressfirewalllister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/listers/egressfirewall/v1" + egressqoslister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/listers/egressqos/v1" cloudprivateipconfiglister "github.com/openshift/client-go/cloudnetwork/listers/cloudnetwork/v1" egressiplister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/listers/egressip/v1" @@ -374,6 +375,8 @@ func newInformerLister(oType reflect.Type, sharedInformer cache.SharedIndexInfor return egressiplister.NewEgressIPLister(sharedInformer.GetIndexer()), nil case CloudPrivateIPConfigType: return cloudprivateipconfiglister.NewCloudPrivateIPConfigLister(sharedInformer.GetIndexer()), nil + case EgressQoSType: + return egressqoslister.NewEgressQoSLister(sharedInformer.GetIndexer()), nil } return nil, fmt.Errorf("cannot create lister from type %v", oType) diff --git a/go-controller/pkg/libovsdbops/model.go b/go-controller/pkg/libovsdbops/model.go index 1a493ba79b0..a7c54fac1b3 100644 --- a/go-controller/pkg/libovsdbops/model.go +++ b/go-controller/pkg/libovsdbops/model.go @@ -56,6 +56,8 @@ func getUUID(model model.Model) string { return t.UUID case *sbdb.SBGlobal: return t.UUID + case *nbdb.QoS: + return t.UUID default: panic(fmt.Sprintf("getUUID: unknown model %T", t)) } @@ -105,6 +107,8 @@ func setUUID(model model.Model, uuid string) { t.UUID = uuid case *sbdb.SBGlobal: t.UUID = uuid + case *nbdb.QoS: + t.UUID = uuid default: panic(fmt.Sprintf("setUUID: unknown model %T", t)) } @@ -209,6 +213,10 @@ func copyIndexes(model model.Model) model.Model { return &sbdb.SBGlobal{ UUID: t.UUID, } + case *nbdb.QoS: + return &nbdb.QoS{ + UUID: t.UUID, + } default: panic(fmt.Sprintf("copyIndexes: unknown model %T", t)) } @@ -256,6 +264,8 @@ func getListFromModel(model model.Model) interface{} { return &[]*sbdb.Chassis{} case *sbdb.MACBinding: return &[]*sbdb.MACBinding{} + case *nbdb.QoS: + return &[]nbdb.QoS{} default: panic(fmt.Sprintf("getModelList: unknown model %T", t)) } diff --git a/go-controller/pkg/libovsdbops/qos.go b/go-controller/pkg/libovsdbops/qos.go new file mode 100644 index 00000000000..2dded667d3c --- /dev/null +++ b/go-controller/pkg/libovsdbops/qos.go @@ -0,0 +1,107 @@ +package libovsdbops + +import ( + "context" + "strings" + + libovsdbclient "github.com/ovn-org/libovsdb/client" + libovsdb "github.com/ovn-org/libovsdb/ovsdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" +) + +type QoSPredicate func(*nbdb.QoS) bool + +// FindQoSesWithPredicate looks up QoSes from the cache based on a +// given predicate +func FindQoSesWithPredicate(nbClient libovsdbclient.Client, p QoSPredicate) ([]*nbdb.QoS, error) { + ctx, cancel := context.WithTimeout(context.Background(), types.OVSDBTimeout) + defer cancel() + found := []*nbdb.QoS{} + err := nbClient.WhereCache(p).List(ctx, &found) + return found, err +} + +// CreateOrUpdateQoSesOps returns the ops to create or update the provided QoSes. +func CreateOrUpdateQoSesOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, qoses ...*nbdb.QoS) ([]libovsdb.Operation, error) { + opModels := make([]operationModel, 0, len(qoses)) + for i := range qoses { + // can't use i in the predicate, for loop replaces it in-memory + qos := qoses[i] + opModel := operationModel{ + Model: qos, + ModelPredicate: func(q *nbdb.QoS) bool { + return strings.Contains(q.Match, qos.Match) && q.Priority == qos.Priority + }, + OnModelUpdates: []interface{}{}, // update all fields + ErrNotFound: false, + BulkOp: false, + } + opModels = append(opModels, opModel) + } + + modelClient := newModelClient(nbClient) + return modelClient.CreateOrUpdateOps(ops, opModels...) +} + +// AddQoSesToLogicalSwitchOps returns the ops to add the provided QoSes to the switch +func AddQoSesToLogicalSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, name string, qoses ...*nbdb.QoS) ([]libovsdb.Operation, error) { + sw := &nbdb.LogicalSwitch{ + Name: name, + QOSRules: make([]string, 0, len(qoses)), + } + for _, qos := range qoses { + sw.QOSRules = append(sw.QOSRules, qos.UUID) + } + + opModels := operationModel{ + Model: sw, + ModelPredicate: func(item *nbdb.LogicalSwitch) bool { return item.Name == sw.Name }, + OnModelMutations: []interface{}{&sw.QOSRules}, + ErrNotFound: true, + BulkOp: false, + } + + modelClient := newModelClient(nbClient) + return modelClient.CreateOrUpdateOps(ops, opModels) +} + +// DeleteQoSesOps returns the ops to delete the provided QoSes. +func DeleteQoSesOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, qoses ...*nbdb.QoS) ([]libovsdb.Operation, error) { + opModels := make([]operationModel, 0, len(qoses)) + for i := range qoses { + // can't use i in the predicate, for loop replaces it in-memory + qos := qoses[i] + opModel := operationModel{ + Model: qos, + ErrNotFound: false, + BulkOp: false, + } + opModels = append(opModels, opModel) + } + + modelClient := newModelClient(nbClient) + return modelClient.DeleteOps(ops, opModels...) +} + +// RemoveQoSesFromLogicalSwitchOps returns the ops to remove the provided QoSes from the provided switch. +func RemoveQoSesFromLogicalSwitchOps(nbClient libovsdbclient.Client, ops []libovsdb.Operation, name string, qoses ...*nbdb.QoS) ([]libovsdb.Operation, error) { + sw := &nbdb.LogicalSwitch{ + Name: name, + QOSRules: make([]string, 0, len(qoses)), + } + for _, qos := range qoses { + sw.QOSRules = append(sw.QOSRules, qos.UUID) + } + + opModels := operationModel{ + Model: sw, + ModelPredicate: func(item *nbdb.LogicalSwitch) bool { return item.Name == sw.Name }, + OnModelMutations: []interface{}{&sw.QOSRules}, + ErrNotFound: true, + BulkOp: false, + } + + modelClient := newModelClient(nbClient) + return modelClient.DeleteOps(ops, opModels) +} diff --git a/go-controller/pkg/node/gateway_init_linux_test.go b/go-controller/pkg/node/gateway_init_linux_test.go index 114f7fb875b..a94bcb6ac8b 100644 --- a/go-controller/pkg/node/gateway_init_linux_test.go +++ b/go-controller/pkg/node/gateway_init_linux_test.go @@ -33,6 +33,7 @@ import ( egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" egressipfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/fake" + egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -169,9 +170,11 @@ func shareGatewayInterfaceTest(app *cli.App, testNS ns.NetNS, }) egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } stop := make(chan struct{}) @@ -443,9 +446,11 @@ func shareGatewayInterfaceDPUTest(app *cli.App, testNS ns.NetNS, }) egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } _, nodeNet, err := net.ParseCIDR(nodeSubnet) @@ -784,9 +789,11 @@ OFPT_GET_CONFIG_REPLY (xid=0x4): frags=normal miss_send_len=0`, ) egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } stop := make(chan struct{}) diff --git a/go-controller/pkg/ovn/egressqos.go b/go-controller/pkg/ovn/egressqos.go new file mode 100644 index 00000000000..59e42e3baf9 --- /dev/null +++ b/go-controller/pkg/ovn/egressqos.go @@ -0,0 +1,909 @@ +package ovn + +import ( + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/ovn-org/libovsdb/ovsdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" + egressqosapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1" + egressqosinformer "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/informers/externalversions/egressqos/v1" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + "github.com/pkg/errors" + kapi "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + v1coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" +) + +const ( + maxEgressQoSRetries = 10 + defaultEgressQoSName = "default" + EgressQoSFlowStartPriority = 1000 +) + +type egressQoS struct { + sync.RWMutex + name string + namespace string + rules []*egressQoSRule + stale bool +} + +type egressQoSRule struct { + priority int + dscp int + destination string + addrSet addressset.AddressSet + pods *sync.Map // pods name -> ips in the addrSet + podSelector metav1.LabelSelector +} + +// shallow copies the EgressQoS object provided. +func (oc *Controller) cloneEgressQoS(raw *egressqosapi.EgressQoS) (*egressQoS, error) { + eq := &egressQoS{ + name: raw.Name, + namespace: raw.Namespace, + rules: make([]*egressQoSRule, 0), + } + + if len(raw.Spec.Egress) > EgressQoSFlowStartPriority { + return nil, fmt.Errorf("cannot create EgressQoS with %d rules - maximum is %d", len(raw.Spec.Egress), EgressQoSFlowStartPriority) + } + + addErrors := errors.New("") + for i, rule := range raw.Spec.Egress { + eqr, err := oc.cloneEgressQoSRule(rule, EgressQoSFlowStartPriority-i) + if err != nil { + dst := "any" + if rule.DstCIDR != nil { + dst = *rule.DstCIDR + } + addErrors = errors.Wrapf(addErrors, "error: cannot create egressqos Rule to destination %s for namespace %s - %v", + dst, eq.namespace, err) + continue + } + eq.rules = append(eq.rules, eqr) + } + + if addErrors.Error() == "" { + addErrors = nil + } + + return eq, addErrors +} + +// shallow copies the EgressQoSRule object provided. +func (oc *Controller) cloneEgressQoSRule(raw egressqosapi.EgressQoSRule, priority int) (*egressQoSRule, error) { + dst := "" + if raw.DstCIDR != nil { + _, _, err := net.ParseCIDR(*raw.DstCIDR) + if err != nil { + return nil, err + } + dst = *raw.DstCIDR + } + + _, err := metav1.LabelSelectorAsSelector(&raw.PodSelector) + if err != nil { + return nil, err + } + + eqr := &egressQoSRule{ + priority: priority, + dscp: raw.DSCP, + destination: dst, + podSelector: raw.PodSelector, + } + + return eqr, nil +} + +func (oc *Controller) createASForEgressQoSRule(podSelector metav1.LabelSelector, namespace string, priority int) (addressset.AddressSet, *sync.Map, error) { + var addrSet addressset.AddressSet + + selector, _ := metav1.LabelSelectorAsSelector(&podSelector) + if selector.Empty() { // empty selector means that the rule applies to all pods in the namespace + addrSet, err := oc.addressSetFactory.EnsureAddressSet(namespace) + if err != nil { + return nil, nil, fmt.Errorf("cannot ensure that addressSet for namespace %s exists %v", namespace, err) + } + return addrSet, nil, nil + } + + podsCache := sync.Map{} + + pods, err := oc.watchFactory.GetPodsBySelector(namespace, podSelector) + if err != nil { + return nil, nil, err + } + + addrSet, err = oc.addressSetFactory.EnsureAddressSet(fmt.Sprintf("%s%s-%d", types.EgressQoSRulePrefix, namespace, priority)) + if err != nil { + return nil, nil, err + } + + podsIps := []net.IP{} + for _, pod := range pods { + // we don't handle HostNetworked or completed pods + if util.PodWantsNetwork(pod) && !util.PodCompleted(pod) { + podIPs, err := util.GetAllPodIPs(pod) + if err != nil { + return nil, nil, err + } + podsCache.Store(pod.Name, podIPs) + podsIps = append(podsIps, podIPs...) + } + } + err = addrSet.SetIPs(podsIps) + if err != nil { + return nil, nil, err + } + + return addrSet, &podsCache, nil +} + +// initEgressQoSController initializes the EgressQoS controller. +func (oc *Controller) initEgressQoSController( + eqInformer egressqosinformer.EgressQoSInformer, + podInformer v1coreinformers.PodInformer, + nodeInformer v1coreinformers.NodeInformer) { + klog.Info("Setting up event handlers for EgressQoS") + oc.egressQoSLister = eqInformer.Lister() + oc.egressQoSSynced = eqInformer.Informer().HasSynced + oc.egressQoSQueue = workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemFastSlowRateLimiter(1*time.Second, 5*time.Second, 5), + "egressqos", + ) + eqInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: oc.onEgressQoSAdd, + UpdateFunc: oc.onEgressQoSUpdate, + DeleteFunc: oc.onEgressQoSDelete, + }) + + oc.egressQoSPodLister = podInformer.Lister() + oc.egressQoSPodSynced = podInformer.Informer().HasSynced + oc.egressQoSPodQueue = workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemFastSlowRateLimiter(1*time.Second, 5*time.Second, 5), + "egressqospods", + ) + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: oc.onEgressQoSPodAdd, + UpdateFunc: oc.onEgressQoSPodUpdate, + DeleteFunc: oc.onEgressQoSPodDelete, + }) + + oc.egressQoSNodeLister = nodeInformer.Lister() + oc.egressQoSNodeSynced = nodeInformer.Informer().HasSynced + oc.egressQoSNodeQueue = workqueue.NewNamedRateLimitingQueue( + workqueue.NewItemFastSlowRateLimiter(1*time.Second, 5*time.Second, 5), + "egressqosnodes", + ) + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: oc.onEgressQoSNodeAdd, // we only care about new logical switches being added + UpdateFunc: func(o, n interface{}) {}, + DeleteFunc: func(obj interface{}) {}, + }) +} + +func (oc *Controller) runEgressQoSController(threadiness int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + + klog.Infof("Starting EgressQoS Controller") + + if !cache.WaitForNamedCacheSync("egressqosnodes", stopCh, oc.egressQoSNodeSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + klog.Infof("Synchronization failed") + return + } + + if !cache.WaitForNamedCacheSync("egressqospods", stopCh, oc.egressQoSPodSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + klog.Infof("Synchronization failed") + return + } + + if !cache.WaitForNamedCacheSync("egressqos", stopCh, oc.egressQoSSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + klog.Infof("Synchronization failed") + return + } + + klog.Infof("Repairing EgressQoSes") + err := oc.repairEgressQoSes() + if err != nil { + klog.Errorf("Failed to delete stale EgressQoS entries: %v", err) + } + + wg := &sync.WaitGroup{} + for i := 0; i < threadiness; i++ { + wg.Add(1) + go func() { + defer wg.Done() + wait.Until(func() { + oc.runEgressQoSWorker(wg) + }, time.Second, stopCh) + }() + } + + for i := 0; i < threadiness; i++ { + wg.Add(1) + go func() { + defer wg.Done() + wait.Until(func() { + oc.runEgressQoSPodWorker(wg) + }, time.Second, stopCh) + }() + } + + for i := 0; i < threadiness; i++ { + wg.Add(1) + go func() { + defer wg.Done() + wait.Until(func() { + oc.runEgressQoSNodeWorker(wg) + }, time.Second, stopCh) + }() + } + + // wait until we're told to stop + <-stopCh + + klog.Infof("Shutting down EgressQoS controller") + oc.egressQoSQueue.ShutDown() + oc.egressQoSPodQueue.ShutDown() + oc.egressQoSNodeQueue.ShutDown() + + wg.Wait() +} + +// onEgressQoSAdd queues the EgressQoS for processing. +func (oc *Controller) onEgressQoSAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + klog.V(4).Infof("Adding EgressQoS %s", key) + oc.egressQoSQueue.Add(key) +} + +// onEgressQoSUpdate queues the EgressQoS for processing. +func (oc *Controller) onEgressQoSUpdate(oldObj, newObj interface{}) { + oldEQ := oldObj.(*egressqosapi.EgressQoS) + newEQ := newObj.(*egressqosapi.EgressQoS) + + if oldEQ.ResourceVersion == newEQ.ResourceVersion || + !newEQ.GetDeletionTimestamp().IsZero() { + return + } + + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err == nil { + oc.egressQoSQueue.Add(key) + } +} + +// onEgressQoSDelete queues the EgressQoS for processing. +func (oc *Controller) onEgressQoSDelete(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + klog.V(4).Infof("Deleting EgressQoS %s", key) + oc.egressQoSQueue.Add(key) +} + +func (oc *Controller) runEgressQoSWorker(wg *sync.WaitGroup) { + for oc.processNextEgressQoSWorkItem(wg) { + } +} + +func (oc *Controller) processNextEgressQoSWorkItem(wg *sync.WaitGroup) bool { + wg.Add(1) + defer wg.Done() + + key, quit := oc.egressQoSQueue.Get() + if quit { + return false + } + + defer oc.egressQoSQueue.Done(key) + + err := oc.syncEgressQoS(key.(string)) + if err == nil { + oc.egressQoSQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + + if oc.egressQoSQueue.NumRequeues(key) < maxEgressQoSRetries { + oc.egressQoSQueue.AddRateLimited(key) + return true + } + + oc.egressQoSQueue.Forget(key) + return true +} + +// This takes care of syncing stale data which we might have in OVN if +// there's no ovnkube-master running for a while. +// It deletes all QoSes and Address Sets from OVN that belong to EgressQoSes. +func (oc *Controller) repairEgressQoSes() error { + startTime := time.Now() + klog.V(4).Infof("Starting repairing loop for egressqos") + defer func() { + klog.V(4).Infof("Finished repairing loop for egressqos: %v", time.Since(startTime)) + }() + + p := func(q *nbdb.QoS) bool { + _, ok := q.ExternalIDs["EgressQoS"] + return ok + } + existingQoSes, err := libovsdbops.FindQoSesWithPredicate(oc.nbClient, p) + if err != nil { + return err + } + + if len(existingQoSes) > 0 { + allOps := []ovsdb.Operation{} + + ops, err := libovsdbops.DeleteQoSesOps(oc.nbClient, nil, existingQoSes...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + + logicalSwitches, err := oc.egressQoSSwitches() + if err != nil { + return err + } + + for _, sw := range logicalSwitches { + ops, err := libovsdbops.RemoveQoSesFromLogicalSwitchOps(oc.nbClient, nil, sw, existingQoSes...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + } + + if _, err := libovsdbops.TransactAndCheck(oc.nbClient, allOps); err != nil { + return fmt.Errorf("unable to remove stale qoses, err: %v", err) + } + } + + asPredicate := func(as *nbdb.AddressSet) bool { + return strings.Contains(as.ExternalIDs["name"], types.EgressQoSRulePrefix) + } + if err := libovsdbops.DeleteAddressSetsWithPredicate(oc.nbClient, asPredicate); err != nil { + return fmt.Errorf("failed to remove stale egress qos address sets, err: %v", err) + } + + return nil +} + +func (oc *Controller) syncEgressQoS(key string) error { + startTime := time.Now() + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + klog.Infof("Processing sync for EgressQoS %s/%s", namespace, name) + + defer func() { + klog.V(4).Infof("Finished syncing EgressQoS %s on namespace %s : %v", name, namespace, time.Since(startTime)) + }() + + eq, err := oc.egressQoSLister.EgressQoSes(namespace).Get(name) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + if name != defaultEgressQoSName { + klog.Errorf("EgressQoS name %s is invalid, must be %s", name, defaultEgressQoSName) + return nil // Return nil to avoid requeues + } + + err = oc.cleanEgressQoSNS(namespace) + if err != nil { + return fmt.Errorf("unable to delete EgressQoS %s/%s, err: %v", namespace, name, err) + } + + if eq == nil { // it was deleted no need to process further + return nil + } + + klog.V(5).Infof("EgressQoS %s retrieved from lister: %v", eq.Name, eq) + + return oc.addEgressQoS(eq) +} + +func (oc *Controller) cleanEgressQoSNS(namespace string) error { + obj, loaded := oc.egressQoSCache.Load(namespace) + if !loaded { + // the namespace is clean + klog.V(4).Infof("EgressQoS for namespace %s not found in cache", namespace) + return nil + } + + eq := obj.(*egressQoS) + + eq.Lock() + defer eq.Unlock() + + p := func(q *nbdb.QoS) bool { + eqNs, ok := q.ExternalIDs["EgressQoS"] + if !ok { // the QoS is not managed by an EgressQoS + return false + } + return eqNs == eq.namespace + } + existingQoSes, err := libovsdbops.FindQoSesWithPredicate(oc.nbClient, p) + if err != nil { + return err + } + + if len(existingQoSes) > 0 { + allOps := []ovsdb.Operation{} + + ops, err := libovsdbops.DeleteQoSesOps(oc.nbClient, nil, existingQoSes...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + + logicalSwitches, err := oc.egressQoSSwitches() + if err != nil { + return err + } + + for _, sw := range logicalSwitches { + ops, err := libovsdbops.RemoveQoSesFromLogicalSwitchOps(oc.nbClient, nil, sw, existingQoSes...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + } + + if _, err := libovsdbops.TransactAndCheck(oc.nbClient, allOps); err != nil { + return fmt.Errorf("failed to delete qos, err: %s", err) + } + } + + asPredicate := func(as *nbdb.AddressSet) bool { + return strings.Contains(as.ExternalIDs["name"], types.EgressQoSRulePrefix+eq.namespace) + } + if err := libovsdbops.DeleteAddressSetsWithPredicate(oc.nbClient, asPredicate); err != nil { + return fmt.Errorf("failed to remove egress qos address sets, err: %v", err) + } + + // we can delete the object from the cache now. + // we also mark it as stale to prevent pod processing if RLock + // acquired after removal from cache. + oc.egressQoSCache.Delete(namespace) + eq.stale = true + + return nil +} + +func (oc *Controller) addEgressQoS(eqObj *egressqosapi.EgressQoS) error { + eq, err := oc.cloneEgressQoS(eqObj) + if err != nil { + return err + } + + eq.Lock() + defer eq.Unlock() + eq.stale = true // until we finish processing successfully + + // there should not be an item in the cache for the given namespace + // as we first attempt to delete before create. + if _, loaded := oc.egressQoSCache.LoadOrStore(eq.namespace, eq); loaded { + return fmt.Errorf("error attempting to add egressQoS %s to namespace %s when it already has an EgressQoS", + eq.name, eq.namespace) + } + + for _, rule := range eq.rules { + rule.addrSet, rule.pods, err = oc.createASForEgressQoSRule(rule.podSelector, eq.namespace, rule.priority) + if err != nil { + return err + } + } + + logicalSwitches, err := oc.egressQoSSwitches() + if err != nil { + return err + } + + allOps := []ovsdb.Operation{} + qoses := []*nbdb.QoS{} + for _, r := range eq.rules { + hashedIPv4, hashedIPv6 := r.addrSet.GetASHashNames() + match := generateEgressQoSMatch(r, hashedIPv4, hashedIPv6) + qos := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match, + Priority: r.priority, + Action: map[string]int{nbdb.QoSActionDSCP: r.dscp}, + ExternalIDs: map[string]string{"EgressQoS": eq.namespace}, + } + qoses = append(qoses, qos) + } + + ops, err := libovsdbops.CreateOrUpdateQoSesOps(oc.nbClient, nil, qoses...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + + for _, sw := range logicalSwitches { + ops, err := libovsdbops.AddQoSesToLogicalSwitchOps(oc.nbClient, nil, sw, qoses...) + if err != nil { + return err + } + allOps = append(allOps, ops...) + } + + if _, err := libovsdbops.TransactAndCheck(oc.nbClient, allOps); err != nil { + return fmt.Errorf("failed to create qos, err: %s", err) + } + + eq.stale = false // we can mark it as "ready" now + return nil +} + +func generateEgressQoSMatch(eq *egressQoSRule, hashedAddressSetNameIPv4, hashedAddressSetNameIPv6 string) string { + var src string + var dst string + + switch { + case config.IPv4Mode && config.IPv6Mode: + src = fmt.Sprintf("(ip4.src == $%s || ip6.src == $%s)", hashedAddressSetNameIPv4, hashedAddressSetNameIPv6) + case config.IPv4Mode: + src = fmt.Sprintf("ip4.src == $%s", hashedAddressSetNameIPv4) + case config.IPv6Mode: + src = fmt.Sprintf("ip6.src == $%s", hashedAddressSetNameIPv6) + } + + dst = "ip4.dst == 0.0.0.0/0 || ip6.dst == ::/0" // if the dstCIDR field was not set we treat it as "any" destination + if eq.destination != "" { + dst = fmt.Sprintf("ip4.dst == %s", eq.destination) + if utilnet.IsIPv6CIDRString(eq.destination) { + dst = fmt.Sprintf("ip6.dst == %s", eq.destination) + } + } + + return fmt.Sprintf("(%s) && %s", dst, src) +} + +func (oc *Controller) egressQoSSwitches() ([]string, error) { + logicalSwitches := []string{} + + // Find all node switches + p := func(item *nbdb.LogicalSwitch) bool { + // Ignore external and Join switches(both legacy and current) + return !(strings.HasPrefix(item.Name, types.JoinSwitchPrefix) || item.Name == "join" || strings.HasPrefix(item.Name, types.ExternalSwitchPrefix)) + } + + nodeLocalSwitches, err := libovsdbops.FindLogicalSwitchesWithPredicate(oc.nbClient, p) + if err != nil { + return nil, fmt.Errorf("unable to fetch local switches for EgressQoS, err: %v", err) + } + + for _, nodeLocalSwitch := range nodeLocalSwitches { + logicalSwitches = append(logicalSwitches, nodeLocalSwitch.Name) + } + + return logicalSwitches, nil +} + +type mapOp int + +const ( + mapInsert mapOp = iota + mapDelete +) + +type mapAndOp struct { + m *sync.Map + op mapOp +} + +func (oc *Controller) syncEgressQoSPod(key string) error { + startTime := time.Now() + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + klog.Infof("Processing sync for EgressQoS pod %s/%s", namespace, name) + + defer func() { + klog.V(4).Infof("Finished syncing EgressQoS pod %s on namespace %s : %v", name, namespace, time.Since(startTime)) + }() + + obj, loaded := oc.egressQoSCache.Load(namespace) + if !loaded { // no EgressQoS in the namespace + return nil + } + + eq := obj.(*egressQoS) + eq.RLock() // allow multiple pods to sync + defer eq.RUnlock() + if eq.stale { // was deleted or not created properly + return nil + } + + pod, err := oc.egressQoSPodLister.Pods(namespace).Get(name) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + allOps := []ovsdb.Operation{} + + // on delete/complete we remove the pod from the relevant address sets + if pod == nil || util.PodCompleted(pod) { + podsCaches := []*sync.Map{} + for _, rule := range eq.rules { + obj, loaded := rule.pods.Load(name) + if !loaded { + continue + } + ips := obj.([]net.IP) + ops, err := rule.addrSet.DeleteIPsReturnOps(ips) + if err != nil { + return err + } + podsCaches = append(podsCaches, rule.pods) + allOps = append(allOps, ops...) + } + _, err = libovsdbops.TransactAndCheck(oc.nbClient, allOps) + if err != nil { + return err + } + + for _, pc := range podsCaches { + pc.Delete(name) + } + + return nil + } + + klog.V(5).Infof("Pod %s retrieved from lister: %v", pod.Name, pod) + + if !util.PodWantsNetwork(pod) { // we don't handle HostNetworked pods + return nil + } + + podIPs, err := util.GetAllPodIPs(pod) + if err != nil { + return err + } + + podLabels := labels.Set(pod.Labels) + podMapOps := []mapAndOp{} + for _, r := range eq.rules { + selector, _ := metav1.LabelSelectorAsSelector(&r.podSelector) + if selector.Empty() { // rule applies to all pods in the namespace, no need to modify address set + continue + } + + _, loaded := r.pods.Load(pod.Name) + if selector.Matches(podLabels) && !loaded { + ops, err := r.addrSet.AddIPsReturnOps(podIPs) + if err != nil { + return err + } + allOps = append(allOps, ops...) + podMapOps = append(podMapOps, mapAndOp{r.pods, mapInsert}) + } else if !selector.Matches(podLabels) && loaded { + ops, err := r.addrSet.DeleteIPsReturnOps(podIPs) + if err != nil { + return err + } + allOps = append(allOps, ops...) + podMapOps = append(podMapOps, mapAndOp{r.pods, mapDelete}) + } + } + + _, err = libovsdbops.TransactAndCheck(oc.nbClient, allOps) + if err != nil { + return err + } + + for _, mapOp := range podMapOps { + switch mapOp.op { + case mapInsert: + mapOp.m.Store(pod.Name, podIPs) + case mapDelete: + mapOp.m.Delete(pod.Name) + } + } + + return nil +} + +// onEgressQoSPodAdd queues the pod for processing. +func (oc *Controller) onEgressQoSPodAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + klog.V(4).Infof("Adding EgressQoS pod %s", key) + oc.egressQoSPodQueue.Add(key) +} + +// onEgressQoSPodUpdate queues the pod for processing. +func (oc *Controller) onEgressQoSPodUpdate(oldObj, newObj interface{}) { + oldPod := oldObj.(*kapi.Pod) + newPod := newObj.(*kapi.Pod) + + if oldPod.ResourceVersion == newPod.ResourceVersion || + !newPod.GetDeletionTimestamp().IsZero() { + return + } + + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", newObj, err)) + return + } + + oc.egressQoSPodQueue.Add(key) +} + +func (oc *Controller) onEgressQoSPodDelete(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + klog.V(4).Infof("Deleting EgressQoS Pod %s", key) + oc.egressQoSPodQueue.Add(key) +} + +func (oc *Controller) runEgressQoSPodWorker(wg *sync.WaitGroup) { + for oc.processNextEgressQoSPodWorkItem(wg) { + } +} + +func (oc *Controller) processNextEgressQoSPodWorkItem(wg *sync.WaitGroup) bool { + wg.Add(1) + defer wg.Done() + key, quit := oc.egressQoSPodQueue.Get() + if quit { + return false + } + defer oc.egressQoSPodQueue.Done(key) + + err := oc.syncEgressQoSPod(key.(string)) + if err == nil { + oc.egressQoSPodQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + + if oc.egressQoSPodQueue.NumRequeues(key) < maxEgressQoSRetries { + oc.egressQoSPodQueue.AddRateLimited(key) + return true + } + + oc.egressQoSPodQueue.Forget(key) + return true +} + +// onEgressQoSAdd queues the node for processing. +func (oc *Controller) onEgressQoSNodeAdd(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) + return + } + klog.V(4).Infof("Adding EgressQoS node %s", key) + oc.egressQoSNodeQueue.Add(key) +} + +func (oc *Controller) runEgressQoSNodeWorker(wg *sync.WaitGroup) { + for oc.processNextEgressQoSNodeWorkItem(wg) { + } +} + +func (oc *Controller) processNextEgressQoSNodeWorkItem(wg *sync.WaitGroup) bool { + wg.Add(1) + defer wg.Done() + key, quit := oc.egressQoSNodeQueue.Get() + if quit { + return false + } + defer oc.egressQoSNodeQueue.Done(key) + + err := oc.syncEgressQoSNode(key.(string)) + if err == nil { + oc.egressQoSNodeQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + + if oc.egressQoSNodeQueue.NumRequeues(key) < maxEgressQoSRetries { + oc.egressQoSNodeQueue.AddRateLimited(key) + return true + } + + oc.egressQoSNodeQueue.Forget(key) + return true +} + +func (oc *Controller) syncEgressQoSNode(key string) error { + startTime := time.Now() + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + klog.Infof("Processing sync for EgressQoS node %s", name) + + defer func() { + klog.V(4).Infof("Finished syncing EgressQoS node %s : %v", name, time.Since(startTime)) + }() + + n, err := oc.egressQoSNodeLister.Get(name) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + if n == nil { // we don't process node deletions, its logical switch will be deleted. + return nil + } + + klog.V(5).Infof("EgressQoS %s node retrieved from lister: %v", n.Name, n) + + nodeSw := &nbdb.LogicalSwitch{ + Name: n.Name, + } + nodeSw, err = libovsdbops.GetLogicalSwitch(oc.nbClient, nodeSw) + if err != nil { + return err + } + + p := func(q *nbdb.QoS) bool { + _, ok := q.ExternalIDs["EgressQoS"] + return ok + } + existingQoSes, err := libovsdbops.FindQoSesWithPredicate(oc.nbClient, p) + if err != nil { + return err + } + + if len(existingQoSes) == 0 { + return nil + } + + ops, err := libovsdbops.AddQoSesToLogicalSwitchOps(oc.nbClient, nil, nodeSw.Name, existingQoSes...) + if err != nil { + return err + } + + if _, err := libovsdbops.TransactAndCheck(oc.nbClient, ops); err != nil { + return fmt.Errorf("unable to add existing qoses to new node, err: %v", err) + } + + return nil +} diff --git a/go-controller/pkg/ovn/egressqos_test.go b/go-controller/pkg/ovn/egressqos_test.go new file mode 100644 index 00000000000..1fc19c13a0a --- /dev/null +++ b/go-controller/pkg/ovn/egressqos_test.go @@ -0,0 +1,575 @@ +package ovn + +import ( + "context" + "fmt" + "net" + + "github.com/onsi/ginkgo" + ginkgotable "github.com/onsi/ginkgo/extensions/table" + "github.com/onsi/gomega" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" + egressqosapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" + libovsdbtest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing/libovsdb" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + + "github.com/urfave/cli/v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" +) + +func newEgressQoSObject(name, namespace string, egressRules []egressqosapi.EgressQoSRule) *egressqosapi.EgressQoS { + return &egressqosapi.EgressQoS{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: egressqosapi.EgressQoSSpec{ + Egress: egressRules, + }, + } +} + +var _ = ginkgo.Describe("OVN EgressQoS Operations", func() { + var ( + app *cli.App + fakeOVN *FakeOVN + ) + const ( + node1Name string = "node1" + node2Name string = "node2" + ) + + ginkgo.BeforeEach(func() { + // Restore global default values before each testcase + config.PrepareTestConfig() + config.OVNKubernetesFeature.EnableEgressQoS = true + + app = cli.NewApp() + app.Name = "test" + app.Flags = config.Flags + + fakeOVN = NewFakeOVN() + }) + + ginkgo.AfterEach(func() { + fakeOVN.shutdown() + }) + + ginkgotable.DescribeTable("reconciles existing and non-existing egressqoses without PodSelectors", + func(ipv4Mode, ipv6Mode bool, dst1, dst2, match1, match2 string) { + app.Action = func(ctx *cli.Context) error { + config.IPv4Mode = ipv4Mode + config.IPv6Mode = ipv6Mode + namespaceT := *newNamespace("namespace1") + + staleQoS := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: "some-match", + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 50}, + ExternalIDs: map[string]string{"EgressQoS": "staleNS"}, + UUID: "staleQoS-UUID", + } + + staleAddrSet := &nbdb.AddressSet{ + Name: "egress-qos-pods-staleNS", + ExternalIDs: map[string]string{"name": "egress-qos-pods-staleNS"}, + UUID: "staleAS-UUID", + Addresses: []string{"1.2.3.4"}, + } + + node1Switch := &nbdb.LogicalSwitch{ + UUID: "node1-UUID", + Name: node1Name, + QOSRules: []string{staleQoS.UUID}, + } + + node2Switch := &nbdb.LogicalSwitch{ + UUID: "node2-UUID", + Name: node2Name, + } + + joinSwitch := &nbdb.LogicalSwitch{ + UUID: "join-UUID", + Name: types.OVNJoinSwitch, + QOSRules: []string{staleQoS.UUID}, + } + + dbSetup := libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + staleQoS, + staleAddrSet, + node1Switch, + node2Switch, + joinSwitch, + }, + } + + fakeOVN.startWithDBSetup(dbSetup, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + ) + + // Create one EgressQoS + eq := newEgressQoSObject("default", namespaceT.Name, []egressqosapi.EgressQoSRule{ + { + DstCIDR: &dst1, + DSCP: 50, + }, + { + DstCIDR: &dst2, + DSCP: 60, + }, + }) + eq.ResourceVersion = "1" + _, err := fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Create(context.TODO(), eq, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fakeOVN.InitAndRunEgressQoSController() + + qos1 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match1, + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 50}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos1-UUID", + } + qos2 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match2, + Priority: EgressQoSFlowStartPriority - 1, + Action: map[string]int{nbdb.QoSActionDSCP: 60}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos2-UUID", + } + node1Switch.QOSRules = []string{qos1.UUID, qos2.UUID} + node2Switch.QOSRules = []string{qos1.UUID, qos2.UUID} + expectedDatabaseState := []libovsdbtest.TestData{ + qos1, + qos2, + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + // Update the EgressQoS + eq.Spec.Egress = []egressqosapi.EgressQoSRule{ + { + DstCIDR: &dst1, + DSCP: 40, + }, + } + eq.ResourceVersion = "2" + _, err = fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Update(context.TODO(), eq, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + qos3 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match1, + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 40}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos3-UUID", + } + node1Switch.QOSRules = []string{qos3.UUID} + node2Switch.QOSRules = []string{qos3.UUID} + expectedDatabaseState = []libovsdbtest.TestData{ + qos3, + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + // Delete the EgressQoS + err = fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Delete(context.TODO(), eq.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + node1Switch.QOSRules = []string{} + node2Switch.QOSRules = []string{} + expectedDatabaseState = []libovsdbtest.TestData{ + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }, + ginkgotable.Entry("ipv4", true, false, "1.2.3.4/32", "5.6.7.8/32", + "(ip4.dst == 1.2.3.4/32) && ip4.src == $a10481622940199974102", + "(ip4.dst == 5.6.7.8/32) && ip4.src == $a10481622940199974102"), + ginkgotable.Entry("ipv6", false, true, "2001:0db8:85a3:0000:0000:8a2e:0370:7334/128", "2001:0db8:85a3:0000:0000:8a2e:0370:7335/128", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7334/128) && ip6.src == $a10481620741176717680", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7335/128) && ip6.src == $a10481620741176717680"), + ginkgotable.Entry("dual", true, true, "1.2.3.4/32", "2001:0db8:85a3:0000:0000:8a2e:0370:7335/128", + "(ip4.dst == 1.2.3.4/32) && (ip4.src == $a10481622940199974102 || ip6.src == $a10481620741176717680)", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7335/128) && (ip4.src == $a10481622940199974102 || ip6.src == $a10481620741176717680)"), + ) + + ginkgotable.DescribeTable("reconciles existing and non-existing egressqoses with PodSelectors", + func(ipv4Mode, ipv6Mode bool, dst1, dst2, match1, match2 string) { + app.Action = func(ctx *cli.Context) error { + config.IPv4Mode = ipv4Mode + config.IPv6Mode = ipv6Mode + namespaceT := *newNamespace("namespace1") + + staleQoS := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: "some-match", + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 50}, + ExternalIDs: map[string]string{"EgressQoS": "staleNS"}, + UUID: "staleQoS-UUID", + } + + staleAddrSet := &nbdb.AddressSet{ + Name: "egress-qos-pods-staleNS", + ExternalIDs: map[string]string{"name": "egress-qos-pods-staleNS"}, + UUID: "staleAS-UUID", + Addresses: []string{"1.2.3.4"}, + } + + podT := newPodWithLabels( + namespaceT.Name, + "myPod", + node1Name, + "10.128.1.3", + map[string]string{"app": "nice"}, + ) + + node1Switch := &nbdb.LogicalSwitch{ + UUID: node1Name, + Name: node1Name, + QOSRules: []string{staleQoS.UUID}, + } + + node2Switch := &nbdb.LogicalSwitch{ + UUID: node2Name, + Name: node2Name, + QOSRules: []string{}, + } + + joinSwitch := &nbdb.LogicalSwitch{ + UUID: "join-UUID", + Name: types.OVNJoinSwitch, + QOSRules: []string{staleQoS.UUID}, + } + + dbSetup := libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + staleQoS, + staleAddrSet, + node1Switch, + node2Switch, + joinSwitch, + }, + } + + fakeOVN.startWithDBSetup(dbSetup, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + &v1.PodList{ + Items: []v1.Pod{ + *podT, + }, + }, + ) + + i, n, _ := net.ParseCIDR("10.128.1.3" + "/23") + n.IP = i + fakeOVN.controller.logicalPortCache.add("", util.GetLogicalPortName(podT.Namespace, podT.Name), "", nil, []*net.IPNet{n}) + + // Create one EgressQoS + eq := newEgressQoSObject("default", namespaceT.Name, []egressqosapi.EgressQoSRule{ + { + DstCIDR: &dst1, + DSCP: 50, + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nice", + }, + }, + }, + { + DstCIDR: &dst2, + DSCP: 60, + }, + }) + eq.ResourceVersion = "1" + _, err := fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Create(context.TODO(), eq, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fakeOVN.InitAndRunEgressQoSController() + + qos1 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match1, + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 50}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos1-UUID", + } + qos2 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match2, + Priority: EgressQoSFlowStartPriority - 1, + Action: map[string]int{nbdb.QoSActionDSCP: 60}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos2-UUID", + } + node1Switch.QOSRules = []string{qos1.UUID, qos2.UUID} + node2Switch.QOSRules = []string{qos1.UUID, qos2.UUID} + expectedDatabaseState := []libovsdbtest.TestData{ + qos1, + qos2, + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + // Update the EgressQoS + eq.Spec.Egress = []egressqosapi.EgressQoSRule{ + { + DstCIDR: &dst1, + DSCP: 40, + PodSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nice", + }, + }, + }, + } + eq.ResourceVersion = "2" + _, err = fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Update(context.TODO(), eq, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + qos3 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: match1, + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 40}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos3-UUID", + } + node1Switch.QOSRules = []string{qos3.UUID} + node2Switch.QOSRules = []string{qos3.UUID} + expectedDatabaseState = []libovsdbtest.TestData{ + qos3, + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + // Delete the EgressQoS + err = fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Delete(context.TODO(), eq.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + node1Switch.QOSRules = []string{} + node2Switch.QOSRules = []string{} + expectedDatabaseState = []libovsdbtest.TestData{ + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }, + ginkgotable.Entry("ipv4", true, false, "1.2.3.4/32", "5.6.7.8/32", + "(ip4.dst == 1.2.3.4/32) && ip4.src == $a8797969223947225899", + "(ip4.dst == 5.6.7.8/32) && ip4.src == $a10481622940199974102"), + ginkgotable.Entry("ipv6", false, true, "2001:0db8:85a3:0000:0000:8a2e:0370:7334/128", "2001:0db8:85a3:0000:0000:8a2e:0370:7335/128", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7334/128) && ip6.src == $a8797971422970482321", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7335/128) && ip6.src == $a10481620741176717680"), + ginkgotable.Entry("dual", true, true, "1.2.3.4/32", "2001:0db8:85a3:0000:0000:8a2e:0370:7335/128", + "(ip4.dst == 1.2.3.4/32) && (ip4.src == $a8797969223947225899 || ip6.src == $a8797971422970482321)", + "(ip6.dst == 2001:0db8:85a3:0000:0000:8a2e:0370:7335/128) && (ip4.src == $a10481622940199974102 || ip6.src == $a10481620741176717680)"), + ) + + ginkgo.It("should respond to node events correctly", func() { + app.Action = func(ctx *cli.Context) error { + namespaceT := *newNamespace("namespace1") + + node1Switch := &nbdb.LogicalSwitch{ + UUID: "node1-UUID", + Name: node1Name, + } + + node2Switch := &nbdb.LogicalSwitch{ + UUID: "node2-UUID", + Name: node2Name, + } + + joinSwitch := &nbdb.LogicalSwitch{ + UUID: "join-UUID", + Name: types.OVNJoinSwitch, + } + + dbSetup := libovsdbtest.TestSetup{ + NBData: []libovsdbtest.TestData{ + node1Switch, + node2Switch, + joinSwitch, + }, + } + + fakeOVN.startWithDBSetup(dbSetup, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespaceT, + }, + }, + ) + + // Create one EgressQoS + eq := newEgressQoSObject("default", namespaceT.Name, []egressqosapi.EgressQoSRule{ + { + DstCIDR: pointer.String("1.2.3.4/32"), + DSCP: 50, + }, + { + DstCIDR: pointer.String("5.6.7.8/32"), + DSCP: 60, + }, + }) + _, err := fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Create(context.TODO(), eq, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fakeOVN.InitAndRunEgressQoSController() + + qos1 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: "(ip4.dst == 1.2.3.4/32) && ip4.src == $a10481622940199974102", + Priority: EgressQoSFlowStartPriority, + Action: map[string]int{nbdb.QoSActionDSCP: 50}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos1-UUID", + } + qos2 := &nbdb.QoS{ + Direction: nbdb.QoSDirectionToLport, + Match: "(ip4.dst == 5.6.7.8/32) && ip4.src == $a10481622940199974102", + Priority: EgressQoSFlowStartPriority - 1, + Action: map[string]int{nbdb.QoSActionDSCP: 60}, + ExternalIDs: map[string]string{"EgressQoS": namespaceT.Name}, + UUID: "qos2-UUID", + } + node1Switch.QOSRules = append(node1Switch.QOSRules, qos1.UUID, qos2.UUID) + node2Switch.QOSRules = append(node2Switch.QOSRules, qos1.UUID, qos2.UUID) + expectedDatabaseState := []libovsdbtest.TestData{ + qos1, + qos2, + node1Switch, + node2Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + node3Switch, err := createNodeAndLS(fakeOVN, "node3") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + node3Switch.QOSRules = []string{qos1.UUID, qos2.UUID} + expectedDatabaseState = []libovsdbtest.TestData{ + qos1, + qos2, + node1Switch, + node2Switch, + node3Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient, 3).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + // Delete the EgressQoS + err = fakeOVN.fakeClient.EgressQoSClient.K8sV1().EgressQoSes(namespaceT.Name).Delete(context.TODO(), eq.Name, metav1.DeleteOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + node1Switch.QOSRules = []string{} + node2Switch.QOSRules = []string{} + node3Switch.QOSRules = []string{} + expectedDatabaseState = []libovsdbtest.TestData{ + node1Switch, + node2Switch, + node3Switch, + joinSwitch, + } + + gomega.Eventually(fakeOVN.nbClient).Should(libovsdbtest.HaveDataIgnoringUUIDs(expectedDatabaseState)) + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) +}) + +func (o *FakeOVN) InitAndRunEgressQoSController() { + klog.Warningf("#### [%p] INIT EgressQoS", o) + o.controller.initEgressQoSController(o.watcher.EgressQoSInformer(), o.watcher.WIPPodInformer(), o.watcher.WIPNodeInformer()) + o.egressQoSWg.Add(1) + go func() { + defer o.egressQoSWg.Done() + o.controller.runEgressQoSController(1, o.stopChan) + }() +} + +func createNodeAndLS(fakeOVN *FakeOVN, name string) (*nbdb.LogicalSwitch, error) { + node := v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + } + _, err := fakeOVN.fakeClient.KubeClient.CoreV1().Nodes().Create(context.TODO(), &node, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + + logicalSwitch := &nbdb.LogicalSwitch{ + UUID: name + "-UUID", + Name: name, + } + + if err := libovsdbops.CreateOrUpdateLogicalSwitch(fakeOVN.nbClient, logicalSwitch); err != nil { + return nil, fmt.Errorf("failed to create logical switch %s, error: %v", name, err) + + } + + return logicalSwitch, nil +} diff --git a/go-controller/pkg/ovn/master_test.go b/go-controller/pkg/ovn/master_test.go index eb96385a1cd..db501c12f5e 100644 --- a/go-controller/pkg/ovn/master_test.go +++ b/go-controller/pkg/ovn/master_test.go @@ -15,6 +15,7 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" egressipfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/fake" + egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb" @@ -884,10 +885,12 @@ var _ = ginkgo.Describe("Gateway Init Operations", func() { }) egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressIPClient: egressIPFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } _, err := config.InitConfig(ctx, nil, nil) @@ -1083,10 +1086,12 @@ var _ = ginkgo.Describe("Gateway Init Operations", func() { }) egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressIPClient: egressIPFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } _, err := config.InitConfig(ctx, nil, nil) @@ -1860,10 +1865,12 @@ func TestController_allocateNodeSubnets(t *testing.T) { kubeFakeClient := fake.NewSimpleClientset() egressFirewallFakeClient := &egressfirewallfake.Clientset{} egressIPFakeClient := &egressipfake.Clientset{} + egressQoSFakeClient := &egressqosfake.Clientset{} fakeClient := &util.OVNClientset{ KubeClient: kubeFakeClient, EgressIPClient: egressIPFakeClient, EgressFirewallClient: egressFirewallFakeClient, + EgressQoSClient: egressQoSFakeClient, } f, err := factory.NewMasterWatchFactory(fakeClient) if err != nil { diff --git a/go-controller/pkg/ovn/ovn.go b/go-controller/pkg/ovn/ovn.go index b62ccf2a6d1..24de30d50a6 100644 --- a/go-controller/pkg/ovn/ovn.go +++ b/go-controller/pkg/ovn/ovn.go @@ -22,6 +22,7 @@ import ( addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set" svccontroller "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/services" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/unidling" + corev1listers "k8s.io/client-go/listers/core/v1" lsm "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/logical_switch_manager" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/subnetallocator" @@ -32,6 +33,7 @@ import ( utilnet "k8s.io/utils/net" + egressqoslisters "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/listers/egressqos/v1" kapi "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -44,6 +46,8 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" ref "k8s.io/client-go/tools/reference" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" ) @@ -132,6 +136,20 @@ type Controller struct { // egressFirewalls is a map of namespaces and the egressFirewall attached to it egressFirewalls sync.Map + // EgressQoS + egressQoSLister egressqoslisters.EgressQoSLister + egressQoSSynced cache.InformerSynced + egressQoSQueue workqueue.RateLimitingInterface + egressQoSCache sync.Map + + egressQoSPodLister corev1listers.PodLister + egressQoSPodSynced cache.InformerSynced + egressQoSPodQueue workqueue.RateLimitingInterface + + egressQoSNodeLister corev1listers.NodeLister + egressQoSNodeSynced cache.InformerSynced + egressQoSNodeQueue workqueue.RateLimitingInterface + // An address set factory that creates address sets addressSetFactory addressset.AddressSetFactory @@ -357,6 +375,18 @@ func (oc *Controller) Run(ctx context.Context, wg *sync.WaitGroup) error { } + if config.OVNKubernetesFeature.EnableEgressQoS { + oc.initEgressQoSController( + oc.watchFactory.EgressQoSInformer(), + oc.watchFactory.WIPPodInformer(), + oc.watchFactory.WIPNodeInformer()) + wg.Add(1) + go func() { + defer wg.Done() + oc.runEgressQoSController(1, oc.stopChan) + }() + } + klog.Infof("Completing all the Watchers took %v", time.Since(start)) if config.Kubernetes.OVNEmptyLbEvents { diff --git a/go-controller/pkg/ovn/ovn_test.go b/go-controller/pkg/ovn/ovn_test.go index d0e916e4bf2..fe135e4daa4 100644 --- a/go-controller/pkg/ovn/ovn_test.go +++ b/go-controller/pkg/ovn/ovn_test.go @@ -2,12 +2,16 @@ package ovn import ( "context" + "sync" + "github.com/onsi/gomega" libovsdbclient "github.com/ovn-org/libovsdb/client" egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" egressip "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1" egressipfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/fake" + egressqos "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1" + egressqosfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned/fake" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set" ovntest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing" @@ -43,12 +47,14 @@ type FakeOVN struct { sbClient libovsdbclient.Client dbSetup libovsdbtest.TestSetup nbsbCleanup *libovsdbtest.Cleanup + egressQoSWg *sync.WaitGroup } func NewFakeOVN() *FakeOVN { return &FakeOVN{ asf: addressset.NewFakeAddressSetFactory(), fakeRecorder: record.NewFakeRecorder(10), + egressQoSWg: &sync.WaitGroup{}, } } @@ -59,12 +65,15 @@ func (o *FakeOVN) start(objects ...runtime.Object) { egressIPObjects := []runtime.Object{} egressFirewallObjects := []runtime.Object{} + egressQoSObjects := []runtime.Object{} v1Objects := []runtime.Object{} for _, object := range objects { if _, isEgressIPObject := object.(*egressip.EgressIPList); isEgressIPObject { egressIPObjects = append(egressIPObjects, object) } else if _, isEgressFirewallObject := object.(*egressfirewall.EgressFirewallList); isEgressFirewallObject { egressFirewallObjects = append(egressFirewallObjects, object) + } else if _, isEgressQoSObject := object.(*egressqos.EgressQoSList); isEgressQoSObject { + egressQoSObjects = append(egressQoSObjects, object) } else { v1Objects = append(v1Objects, object) } @@ -73,6 +82,7 @@ func (o *FakeOVN) start(objects ...runtime.Object) { KubeClient: fake.NewSimpleClientset(v1Objects...), EgressIPClient: egressipfake.NewSimpleClientset(egressIPObjects...), EgressFirewallClient: egressfirewallfake.NewSimpleClientset(egressFirewallObjects...), + EgressQoSClient: egressqosfake.NewSimpleClientset(egressQoSObjects...), } o.init() } @@ -85,6 +95,7 @@ func (o *FakeOVN) startWithDBSetup(dbSetup libovsdbtest.TestSetup, objects ...ru func (o *FakeOVN) shutdown() { o.watcher.Shutdown() close(o.stopChan) + o.egressQoSWg.Wait() o.nbsbCleanup.Cleanup() } diff --git a/go-controller/pkg/types/const.go b/go-controller/pkg/types/const.go index 8b78f4bfe28..b62d5d4f1c0 100644 --- a/go-controller/pkg/types/const.go +++ b/go-controller/pkg/types/const.go @@ -111,6 +111,7 @@ const ( // OVN-K8S Address Sets Names HybridRoutePolicyPrefix = "hybrid-route-pods-" + EgressQoSRulePrefix = "egress-qos-pods-" // OVN-K8S Topology Versions OvnSingleJoinSwitchTopoVersion = 1 diff --git a/go-controller/pkg/util/kube.go b/go-controller/pkg/util/kube.go index 9ad9cab2c45..ffee00bcf48 100644 --- a/go-controller/pkg/util/kube.go +++ b/go-controller/pkg/util/kube.go @@ -25,6 +25,7 @@ import ( egressfirewallclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned" egressipclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned" + egressqosclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressqos/v1/apis/clientset/versioned" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" ocpcloudnetworkclientset "github.com/openshift/client-go/cloudnetwork/clientset/versioned" @@ -38,6 +39,7 @@ type OVNClientset struct { EgressIPClient egressipclientset.Interface EgressFirewallClient egressfirewallclientset.Interface CloudNetworkClient ocpcloudnetworkclientset.Interface + EgressQoSClient egressqosclientset.Interface } func adjustCommit() string { @@ -136,11 +138,17 @@ func NewOVNClientset(conf *config.KubernetesConfig) (*OVNClientset, error) { if err != nil { return nil, err } + egressqosClientset, err := egressqosclientset.NewForConfig(kconfig) + if err != nil { + return nil, err + } + return &OVNClientset{ KubeClient: kclientset, EgressIPClient: egressIPClientset, EgressFirewallClient: egressFirewallClientset, CloudNetworkClient: cloudNetworkClientset, + EgressQoSClient: egressqosClientset, }, nil }